API for hikvision face detection DS-k1T320EWX

You are getting everything, you just need to use input parameters that shows authorized registration. For all acs major is 5 but minot varry depending on device, try 75, thats on k1t341
 
Hello,

I am facing a problem with Hikvision device DS-K1T320EWX.
The problem is in pagination.

I have done several attempts to renew the session but it appears that hikvision device is blocking.
Secondly it only allows to retrieve 30 records at a time.

The order of the functions in which the flow is as follows.

1. This is the top level function which tries to retrieve all the records of the events between a start date and an end date
. def _search_historical_events(self, search_id, start_time=None, end_time=None):

2. The function above calls the following function which tries to retrieve records from the device in one or more attempts through post call.
def post_with_retries(self, url, json=None, headers=None, max_retries=3):

3. The function above calls the next function only when it receives response 401.
def re_authenticate(self):

The problem is happening that we cannot get this session to be authenticated and continue pulling the records from the hikvision device.
The total number of events present is about 220 and everytime the retrieval is 30. Changing the max items have also not changed anything.

The call inside is with body
{
"AcsEventCond": {
"searchID": "dd7aa46e-e652-479d-14b4",
"searchResultPosition": 60,
"maxResults": 1000,
"major": 5,
"minor": 104,
"startTime": "2025-01-01T15:00:00+05:30",
"endTime": "2025-07-15T23:59:59+05:30"
}
}

Below is the code for reference which I am trying to run


from datetime import timedelta, datetime
from zoneinfo import ZoneInfo
import json
import threading
import time
import requests

class Event(object):

def init(self, last_event_time):
from . import auth, session # Lazy import

self._auth = auth
self._session = session # Store as instance variable
self._session.auth = auth # Set auth immediately
self._stop = False
self._last_event_time = last_event_time
print( f"Last Event Time {self._last_event_time}")

def re_authenticate(self):
"""Properly handles Hikvision's digest auth challenges"""
try:
# 1. Full cleanup
self._session.cookies.clear()
self._session.headers.pop('Authorization', None)

# 2. Force new auth challenge
probe_url = ""
try:
probe = self._session.get(probe_url, timeout=3)
if probe.status_code == 200:
return True # Auth still valid
except requests.RequestException:
pass

# 3. Full session renewal if probe fails
self._session._renew_session()

# 4. Verify auth works
verify = self._session.get(probe_url, timeout=3)
if verify.status_code == 200:
print(" Auth refreshed successfully")
return True

raise Exception("Auth verification failed after renewal")

except Exception as e:
print(f" Auth renewal failed: {str(e)}")
return False

def post_with_retries(self, url, json=None, headers=None, max_retries=3):
last_exception = None

for attempt in range(1, max_retries + 1):
try:
response = self._session.post(
url,
json=json,
headers=headers,
timeout=(5, 30) # 5s connect, 30s read
)
if response.status_code == 401:
if not self.re_authenticate():
raise requests.HTTPError("Permanent auth failure")
continue # Retry with fresh auth

response.raise_for_status()
return response

except requests.RequestException as e:
last_exception = e
print(f"Attempt {attempt} failed: {str(e)}")
if attempt < max_retries:
time.sleep(2 ** attempt) # Exponential backoff

raise last_exception or requests.RequestException("All retries failed")


def _search_historical_events(self,
search_id,
start_time=None,
end_time=None):
url = ''
print(f"Inside the function {name}")
# Use India timezone
tz = ZoneInfo("Asia/Kolkata")
# Make timestamps timezone-aware and truncate microseconds
startTime = (start_time or self._last_event_time).astimezone(tz).replace(microsecond=0)
endTime = (end_time or datetime.now(tz)).astimezone(tz).replace(microsecond=0)
headers = {
"Content-Type": "application/json"
}
all_events = []
position = 0
max_results = 30
while True:
payload = {
"AcsEventCond": {
"searchID": search_id,
"searchResultPosition": position,
"maxResults": max_results,
"major": 5,
"minor": 75,
"startTime": startTime.isoformat(),
"endTime": endTime.isoformat()
}
}
print(f" Sending POST to {url} with position: {position}")

response = self.post_with_retries(url, json=payload, headers=headers)
response.raise_for_status()
data = response.json()
print(f" Response: {json.dumps(data, indent=2)}")
events = data.get("AcsEvent", [])
all_events.extend(events)

if events.get("numOfMatches") < max_results:
break # No more data
position += max_results # Get the next page

return {
"data": all_events,
"end_time": endTime.isoformat()
}
 
Last edited:
You need to loop using max search parameter of 30 and startindex

so first is 0:30 then 30:30 then 60:30, you get the point

When you receive response NO MORE meaning you hit the end, make that as background service that polls every X seconds or restart upon end.
 
You need to loop using max search parameter of 30 and startindex

so first is 0:30 then 30:30 then 60:30, you get the point

When you receive response NO MORE meaning you hit the end, make that as background service that polls every X seconds or restart upon end.
That part what us suggest is already done in the code. The main problem is that after extracting 30 records and then second 30 records the session auth expires and it gives 401. I tried different ways of increasing but has been unsuccessful. The session becoming unauthorized after two calls seems to be a problem which does not seem to have a direct or easy solution.
 
Thats weird, as you do same ISAPI call with just different body. You are not holding the session, its POST all over again in loop. So each POST is new digest auth