File name
Commit message
Commit date
import time
import av
import cv2
import numpy as np
import requests
from zoneinfo import ZoneInfo
from datetime import datetime
from threading import Lock, Thread, Event
class FrameCapturer:
def __init__(self, hls_url, cctv_id, interval=5, buffer_duration=15, buffer_size=600, time_zone="Asia/Seoul", endpoint="localhost:12345"):
'''
:param hls_url: hls address
:param cctv_id: cctv_id number(whatever it is, this exists to distinguish from where. Further disscusion is needed with frontend developers.)
:param interval: interval of sampling in seconds
:param buffer_duration: video buffer, 15 seconds is default for ITS video streaming
:param time_zone: default Asia/Seoul
'''
self.hls_url = hls_url
self.interval = interval
self.buffer_duration = buffer_duration
self.buffer_size = 600
self.frame_buffer = []
self.frame_buffer_lock = Lock()
self.captured_frame_count = 0
self.last_capture_time = 0
self.start_time = time.time()
self.stop_event = Event()
self.setup_stream()
self.cctvid = cctv_id
self.time_zone = ZoneInfo(time_zone)
self.endpoint = endpoint
def setup_stream(self):
self.input_stream = av.open(self.hls_url)
self.video_stream = next(s for s in self.input_stream.streams if s.type == 'video')
self.fps = self.video_stream.guessed_rate.numerator
self.capture_interval = 1 / self.fps
# ```capture_frames``` and ```process_frames``` work asynchronously (called with Thread)
# so that it always run as intended (for every '''interval''' sec, send a photo)
# regardless of what buffer frames are now.
# They are triggered by ```start``` and halts by ```stop```
def capture_frames(self):
for packet in self.input_stream.demux(self.video_stream):
for frame in packet.decode():
with self.frame_buffer_lock:
self.frame_buffer.append(frame)
time.sleep(self.capture_interval)
def process_frames(self):
while not self.stop_event.is_set():
current_time = time.time()
if current_time - self.start_time >= self.buffer_duration:
if self.last_capture_time == 0 or current_time - self.last_capture_time >= self.interval:
with self.frame_buffer_lock:
if self.frame_buffer:
if len(self.frame_buffer) > self.buffer_size:
self.frame_buffer = self.frame_buffer[-self.buffer_size:]
buffered_frame = self.frame_buffer[-1]
print(len(self.frame_buffer))
img = buffered_frame.to_image()
img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
frame_name = f"captured_frame_{self.captured_frame_count}.jpg"
img_binary = cv2.imencode('.png', img)
self.send_image_to_server(img_binary, self.endpoint)
cv2.imwrite(f'hls_streaming/captured_frame_/{datetime.now()}_{frame_name}', img)
self.last_capture_time = current_time
print(f"Captured {frame_name} at time: {current_time - self.start_time:.2f}s")
self.captured_frame_count +=1
time.sleep(0.1)
def send_image_to_server(self, image, endpoint, image_type="png"):
time_sent = datetime.now(self.time_zone).strftime("yyyy-MM-dd'T'HH:mm:ss'Z'")
header = {
'Content-Type': f'image/{image_type}',
'x-time-sent': time_sent,
'x-cctv-info': str(self.cctvid),
}
try:
requests.post(endpoint, headers=header, files=image)
except:
print("Can not connect to the analyzer server. Check the endpoint address or connection.")
def start(self):
self.capture_thread = Thread(target=self.capture_frames)
self.process_thread = Thread(target=self.process_frames)
self.capture_thread.start()
self.process_thread.start()
def stop(self):
self.stop_event.set()
self.capture_thread.join()
self.process_thread.join()
self.input_stream.close()
# Example usage
if __name__ == "__main__":
capturer = FrameCapturer(
'http://cctvsec.ktict.co.kr/73496/'
'7xhDlyfDPK1AtaOUkAUDUJgZvfqvRXYYZUmRLxgPgKXk+eEtIJIfGkiC/gcQmysaz7zhDW2Jd8qhPCxgpo7cn5VqArnowyKjUePjdAmuQQ8=',
101, 300
)
t1 = time.time()
try:
capturer.start()
time.sleep(600000)
finally:
capturer.stop()
t2 = time.time()
with open("result.txt", "w") as file:
file.write(f'{t2-t1} seconds before terminating')