Hello,
I am facing a problem with Hikvision device DS-K1T320EWX.
The problem is in pagination.
I have done several attempts to renew the session but it appears that hikvision device is blocking.
Secondly it only allows to retrieve 30 records at a time.
The order of the functions in which the flow is as follows.
1.
This is the top level function which tries to retrieve all the records of the events between a start date and an end date
. def _search_historical_events(self, search_id, start_time=None, end_time=None):
2.
The function above calls the following function which tries to retrieve records from the device in one or more attempts through post call.
def post_with_retries(self, url, json=None, headers=None, max_retries=3):
3.
The function above calls the next function only when it receives response 401.
def re_authenticate(self):
The problem is happening that we cannot get this session to be authenticated and continue pulling the records from the hikvision device.
The total number of events present is about 220 and everytime the retrieval is 30. Changing the max items have also not changed anything.
The call inside is
with body
{
"AcsEventCond": {
"searchID": "dd7aa46e-e652-479d-14b4",
"searchResultPosition": 60,
"maxResults": 1000,
"major": 5,
"minor": 104,
"startTime": "2025-01-01T15:00:00+05:30",
"endTime": "2025-07-15T23:59:59+05:30"
}
}
Below is the code for reference which I am trying to run
from datetime import timedelta, datetime
from zoneinfo import ZoneInfo
import json
import threading
import time
import requests
class Event(object):
def
init(self, last_event_time):
from . import auth, session # Lazy import
self._auth = auth
self._session = session # Store as instance variable
self._session.auth = auth # Set auth immediately
self._stop = False
self._last_event_time = last_event_time
print( f"Last Event Time {self._last_event_time}")
def re_authenticate(self):
"""Properly handles Hikvision's digest auth challenges"""
try:
# 1. Full cleanup
self._session.cookies.clear()
self._session.headers.pop('Authorization', None)
# 2. Force new auth challenge
probe_url = "
"
try:
probe = self._session.get(probe_url, timeout=3)
if probe.status_code == 200:
return True # Auth still valid
except requests.RequestException:
pass
# 3. Full session renewal if probe fails
self._session._renew_session()
# 4. Verify auth works
verify = self._session.get(probe_url, timeout=3)
if verify.status_code == 200:
print(" Auth refreshed successfully")
return True
raise Exception("Auth verification failed after renewal")
except Exception as e:
print(f" Auth renewal failed: {str(e)}")
return False
def post_with_retries(self, url, json=None, headers=None, max_retries=3):
last_exception = None
for attempt in range(1, max_retries + 1):
try:
response = self._session.post(
url,
json=json,
headers=headers,
timeout=(5, 30) # 5s connect, 30s read
)
if response.status_code == 401:
if not self.re_authenticate():
raise requests.HTTPError("Permanent auth failure")
continue # Retry with fresh auth
response.raise_for_status()
return response
except requests.RequestException as e:
last_exception = e
print(f"Attempt {attempt} failed: {str(e)}")
if attempt < max_retries:
time.sleep(2 ** attempt) # Exponential backoff
raise last_exception or requests.RequestException("All retries failed")
def _search_historical_events(self,
search_id,
start_time=None,
end_time=None):
url = '
'
print(f"Inside the function {
name}")
# Use India timezone
tz = ZoneInfo("Asia/Kolkata")
# Make timestamps timezone-aware and truncate microseconds
startTime = (start_time or self._last_event_time).astimezone(tz).replace(microsecond=0)
endTime = (end_time or datetime.now(tz)).astimezone(tz).replace(microsecond=0)
headers = {
"Content-Type": "application/json"
}
all_events = []
position = 0
max_results = 30
while True:
payload = {
"AcsEventCond": {
"searchID": search_id,
"searchResultPosition": position,
"maxResults": max_results,
"major": 5,
"minor": 75,
"startTime": startTime.isoformat(),
"endTime": endTime.isoformat()
}
}
print(f" Sending POST to {url} with position: {position}")
response = self.post_with_retries(url, json=payload, headers=headers)
response.raise_for_status()
data = response.json()
print(f" Response: {json.dumps(data, indent=2)}")
events = data.get("AcsEvent", [])
all_events.extend(events)
if events.get("numOfMatches") < max_results:
break # No more data
position += max_results # Get the next page
return {
"data": all_events,
"end_time": endTime.isoformat()
}