File name
Commit message
Commit date
import numpy as np
from flask import Flask, request
from flask_restx import Api, Resource, fields
import os
from datetime import datetime
from yoloseg.inference_ import Inference, overlay_mask
import cv2
import time
import base64
import requests
import typing
from requests_toolbelt import MultipartEncoder
# from config_files import API_ENDPOINT_MAIN
app = Flask(__name__)
api = Api(app, version='1.0', title='CCTV Image Upload API',
description='A postprocessing and adaptive rate mainserver pusher')
# Namespace definition
ns = api.namespace('cctv', description='CCTV operations')
class StreamSources():
def __init__(self, buffer_size, normal_send_interval, failure_mode_thres, failure_mode_check_past_n, normal_mode_thres, normal_mode_check_past_n):
assert failure_mode_thres <= failure_mode_check_past_n, f"failure_mode checker condition is invaild!, failure_mode needs {failure_mode_thres} fails in {failure_mode_check_past_n}, which is not possible!"
assert normal_mode_thres <= normal_mode_check_past_n, f"normal_mode checker condition is invaild!, normal_mode needs {normal_mode_thres} fails in {normal_mode_check_past_n}, which is not possible!"
assert buffer_size < failure_mode_check_past_n, f"'buffer_size' is smaller then failure_mode_thres! This is not possible! This will cause program to never enter failure mode!! \nPrinting relevent args and shutting down!\n buffer_size : {buffer_size}\n failure_mode_thres : {failure_mode_thres}"
assert buffer_size < normal_mode_thres, f"'buffer_size' is smaller then normal_mode_thres! This is will cause the program to never revert back to normal mode!! \nPrinting relevent args and shutting down!\n buffer_size : {buffer_size}\n normal_mode_thres : {normal_mode_thres}"
self.sources = {}
self.buffer_size = buffer_size
self.normal_send_interval = normal_send_interval
if failure_mode_thres == failure_mode_check_past_n:
self.switching_fail_consecutive_mode = True
else:
self.switching_fail_consecutive_mode = False
if normal_mode_thres == normal_mode_check_past_n:
self.switching_normal_consecutive_mode = True
else:
self.switching_normal_consecutive_mode = False
self.failure_mode_thres = failure_mode_thres
self.failure_mode_check_past_n = failure_mode_check_past_n
self.normal_mode_thres = normal_mode_thres
def __setitem__(self, key, value):
if key not in self.sources:
self.sources[key] = {
"status_counts": [],
"ok_counts": 0,
"force_send_mode": False,
"most_recent_image" : None,
"most_recent_mask" : None,
"most_recent_seg_iamge" : None,
"cctv_name" : value,
}
else :
raise KeyError(f"Error! Source {key} already initialized.")
# Update logic here if needed
def __getitem__(self, key):
return self.sources[key]
def add_status(self, source, status):
assert status in ["OK", "FAIL"], f"Invalid status was given!, status must be one of 'OK' or 'FAIL', but given '{status}'!"
if source not in self.sources:
raise f"No key found for source. Did you forgot to add it? \n source : {source}"
self.sources[source]["status_counts"].append(status)
if len(self.sources[source]["status_counts"]) > self.buffer_size:
self.sources[source]["status_counts"].pop(0)
# Your existing logic for updating counts and checking statuses
if status == 'OK':
self.sources[source]["ok_counts"] += 1
if self.sources[source]["force_send_mode"] and self.sources[source]["ok_counts"] >= self.normal_mode_thres:
self.sources[source]["force_send_mode"] = False
self.send_message(source, "NORMAL SEND")
else:
self.sources[source]["ok_counts"] = 0 # Reset on FAIL
self.check_failures(source)
def check_failures(self, source):
if self.switching_fail_consecutive_mode:
if (len(self.sources[source]["status_counts"]) >= self.failure_mode_thres
and all(status == 'FAIL' for status in self.sources[source]["status_counts"][-self.failure_mode_thres:])):
print(f"Source {source} has 5 consecutive FAILs!")
self.sources[source]["force_send_mode"] = True
self.send_message(source, "FORCE SEND")
else :
pass
def send_message(self, source, message_type):
print(f"Sending message for {source} - Status: {message_type}")
# Reset the count after sending message
self.sources[source]["ok_counts"] = 0
@ns.route('/postprocess', )
class PostProcesser(Resource):
def __init__(self, *args, **kargs):
super().__init__(*args, **kargs)
self.time_sent = None
self.cctv_latitude = None
self.cctv_longitude = None
self.cctv_name = None
self.cctv_info = None
self.mask = None
self.mask_blob = None
self.image = None
self.image_type = None
self.seg_image = None
self.area_percent = 0
self.memory = StreamSources(
buffer_size=15,
normal_send_interval=10,
failure_mode_thres=8,
failure_mode_check_past_n=12,
normal_mode_thres=8,
normal_mode_check_past_n=12,
)
pass
@ns.response(200, 'Success')
@ns.response(400, 'Validation Error')
def post(self):
self.image_type = request.headers.get('Content-Type')
self.cctv_name = base64.b64decode(request.headers.get('x-cctv-name', '')).decode('UTF-8')
self.time_sent = request.headers.get('x-time-sent', '')
self.cctv_latitude = request.headers.get('x-cctv-latitude', 'Not provided')
self.cctv_longitude = request.headers.get('x-cctv-longitude', 'Not provided')
self.area_percent = request.headers.get('x-area-percentage')
self.cctv_info = {
'cctv_name' : self.cctv_name,
'cctv_latitude' : self.cctv_latitude,
'cctv_longitude' : self.cctv_longitude,
}
self.memory[self.cctv_info['cctv_name']] = self.cctv_info
pass_fail = self.pass_fail()
self.memory.add_status(self.cctv_name, pass_fail)
pass
def pass_fail(self):
thres = 0.1
#TODO temporal pass_fail threshold
if self.area_percent > thres:
ret = 'Fail'
else:
ret = 'OK'
return ret