File name
Commit message
Commit date
1. code cleanup of inference_gpu_.py and inference_.py is now inference_cpu_.py 2. streaming_url_updator.py CORS fix 3. working DB INSERT of postprocess_draft.py
05-29
후처리 모듈 및 메인 서버 전달을 위한 수정 1. ITS cctv 스트리밍 정보를 하나의 프로세스가 하나의 영상을 담당하여 처리 및 실행하기 위한 스크립트와 bash 스크립트 2. FrameCapturer 객체에 위경도 정보 필수 arg
05-20
1. code cleanup of inference_gpu_.py and inference_.py is now inference_cpu_.py 2. streaming_url_updator.py CORS fix 3. working DB INSERT of postprocess_draft.py
05-29
1. code cleanup of inference_gpu_.py and inference_.py is now inference_cpu_.py 2. streaming_url_updator.py CORS fix 3. working DB INSERT of postprocess_draft.py
05-29
1. code cleanup of inference_gpu_.py and inference_.py is now inference_cpu_.py 2. streaming_url_updator.py CORS fix 3. working DB INSERT of postprocess_draft.py
05-29
import numpy as np
from flask import Flask, request, jsonify
from flask_restx import Api, Resource, fields
import os
import psycopg2
import time
import base64
import json
import cv2
import requests
import typing
from requests_toolbelt import MultipartEncoder
debug = True
with open('config_files/MAIN_DB_ENDPOINT.json', 'r') as file:
db_config = json.load(file)
app = Flask(__name__)
api = Api(app, version='1.0', title='CCTV Image Upload API',
description='A postprocessing and adaptive rate mainserver data pusher')
image_upload_model = api.model('ImageUpload', {
'image': fields.String(required=True, description='Image file', dt='File'),
'x-cctv-info': fields.String(required=False, description='CCTV identifier'),
'x-time-sent': fields.String(required=False, description='Time image was sent'),
'x-cctv-latitude': fields.String(required=False, description='Latitude of CCTV'),
'x-cctv-longitude': fields.String(required=False, description='Longitude of CCTV'),
'X-Flag-Detected' : fields.String(required=True, description='If detected')
})
# Namespace definition
ns = api.namespace('postprocess', description='Postprocessing of inference results')
with open('config_files/MAIN_DB_ENDPOINT.json', 'r') as file:
db_config = json.load(file)
class StreamSources():
def __init__(self, buffer_size, normal_send_interval, failure_mode_thres, failure_mode_check_past_n, normal_mode_thres, normal_mode_check_past_n):
assert failure_mode_thres <= failure_mode_check_past_n,\
(f"failure_mode checker condition is invaild!,"
f" failure_mode needs {failure_mode_thres} fails in {failure_mode_check_past_n}, which is not possible!")
assert normal_mode_thres <= normal_mode_check_past_n,\
(f"normal_mode checker condition is invaild!,"
f" normal_mode needs {normal_mode_thres} fails in {normal_mode_check_past_n}, which is not possible!")
assert buffer_size >= failure_mode_check_past_n,\
(f"'buffer_size' is smaller then failure_mode_thres! This is not possible!"
f" This will cause program to never enter failure mode!! \n"
f"Printing relevent args and shutting down!\n"
f" buffer_size : {buffer_size}\n failure_mode_thres : {failure_mode_thres}")
assert buffer_size >= normal_mode_check_past_n,\
(f"'buffer_size' is smaller then normal_mode_thres!"
f" This is will cause the program to never revert back to normal mode!! \n"
f"Printing relevent args and shutting down!\n"
f" buffer_size : {buffer_size}\n normal_mode_thres : {normal_mode_thres}")
self.sources = {}
self.buffer_size = buffer_size
self.normal_send_interval = normal_send_interval
if failure_mode_thres == failure_mode_check_past_n:
self.switching_fail_consecutive_mode = True
else:
self.switching_fail_consecutive_mode = False
if normal_mode_thres == normal_mode_check_past_n:
self.switching_normal_consecutive_mode = True
else:
self.switching_normal_consecutive_mode = False
self.failure_mode_thres = failure_mode_thres
self.failure_mode_check_past_n = failure_mode_check_past_n
self.normal_mode_thres = normal_mode_thres
self.normal_mode_check_past_n = normal_mode_check_past_n
def __setitem__(self, key, value):
if key not in self.sources:
self.sources[key] = {
"status_counts": [],
"ok_counts": 0,
"failure_counts" : 0,
"force_send_mode": False,
"most_recent_image" : None,
"most_recent_mask" : None,
"most_recent_seg_iamge" : None,
"cctv_info" : value,
"last_send_before" : 0,
"normal_to_failure_mode_change_alert" : False,
"failure_to_normal_mode_change_alert" : False
}
else :
raise KeyError(f"Error! Source {key} already initialized.")
# Update logic here if needed
def __getitem__(self, key):
return self.sources[key]
def __call__(self):
return self.sources
def add_status(self, source, status, image, seg_image):
assert status in ["OK", "FAIL"],\
f"Invalid status was given!, status must be one of 'OK' or 'FAIL', but given '{status}'!"
if source not in self.sources:
raise ValueError(f"No key found for source. Did you forgot to add it? \n source : {source}")
flag_send_event = False
status_value = 1 if status == "OK" else 0
self.sources[source]["status_counts"].append(status_value)
if len(self.sources[source]["status_counts"]) > self.buffer_size:
self.sources[source]["status_counts"].pop(0)
print(len(self.sources[source]["status_counts"]))
if self.sources[source]["force_send_mode"]:
seek_n_recent_memory = min(len(self.sources[source]["status_counts"]), self.failure_mode_check_past_n)
self.sources[source]['failure_counts'] = (seek_n_recent_memory
- sum(self.sources[source]["status_counts"][-seek_n_recent_memory:]))
self.sources[source]['ok_counts'] = sum(self.sources[source]["status_counts"][-seek_n_recent_memory:])
flag_send_event = True
# mode switching condition check
if self.sources[source]['ok_counts'] >= self.normal_mode_thres:
self.sources[source]["force_send_mode"] = False
flag_send_event = False
self.sources[source]["failure_to_normal_mode_change_alert"] = True
else:
seek_n_recent_memory = min(len(self.sources[source]["status_counts"]), self.normal_mode_check_past_n)
self.sources[source]['failure_counts'] = (seek_n_recent_memory
- sum(self.sources[source]["status_counts"][-seek_n_recent_memory:]))
self.sources[source]['ok_counts'] = sum(self.sources[source]["status_counts"][-seek_n_recent_memory:])
# print(self.sources[source]['failure_counts'])
# mode switching condition check
if self.sources[source]['failure_counts'] >= self.failure_mode_thres:
self.sources[source]["force_send_mode"] = True
flag_send_event = True
self.sources[source]["normal_to_failure_mode_change_alert"] = True
# regular interval message logic
if self.sources[source]["last_send_before"] > self.normal_send_interval:
flag_send_event =True
else :
self.sources[source]["last_send_before"] += 1
if flag_send_event:
self.sources[source]["most_recent_image"] = image
self.sources[source]["most_recent_seg_image"] = seg_image
self.send_event(source)
# alert alarms only once
if self.sources[source]["failure_to_normal_mode_change_alert"]:
self.sources[source]["failure_to_normal_mode_change_alert"] = False
if self.sources[source]["normal_to_failure_mode_change_alert"]:
self.sources[source]["normal_to_failure_mode_change_alert"] = False
def send_event(self, source):
try :
conn = psycopg2.connect(**db_config)
cursor = conn.cursor()
upload_data_sql_query = """
INSERT INTO
"""
cursor.close()
conn.close()
except ValueError as e:
print(e)
except Exception as e:
print(e)
self.sources[source]["last_send_before"] = 0
print(f"EVENT : SENDING {source}!!")
pass
memory = StreamSources(
buffer_size=15,
normal_send_interval=10,
failure_mode_thres=8,
failure_mode_check_past_n=12,
normal_mode_thres=8,
normal_mode_check_past_n=12,
)
def get_base64_encoded_image_from_file_binary(image):
image = np.frombuffer(image, np.uint8)
image = cv2.imdecode(image, cv2.IMREAD_COLOR)
_, image = cv2.imencode('.jpg', image)
image = image.tobytes()
image = base64.b64encode(image)
return image
@ns.route('/postprocess', )
class PostProcesser(Resource):
def __init__(self, *args, **kargs):
super().__init__(*args, **kargs)
self.time_sent = None
self.cctv_latitude = None
self.cctv_longitude = None
self.cctv_name = None
self.cctv_info = None
self.mask = None
self.mask_blob = None
self.image = None
self.image_type = None
self.seg_image = None
self.area_percent = 0
self.detected = False
pass
@ns.response(200, 'Success')
@ns.response(400, 'Validation Error')
def post(self):
try:
# Gathering values
self.image_type = request.headers.get('Content-Type')
self.cctv_name = base64.b64decode(request.headers.get('x-cctv-name', '')).decode('UTF-8')
self.time_sent = request.headers.get('x-time-sent', '')
self.cctv_latitude = request.headers.get('x-cctv-latitude', 'Not provided')
self.cctv_longitude = request.headers.get('x-cctv-longitude', 'Not provided')
self.detected = request.headers.get('X-Flag-Detected')
if self.detected == "True":
self.detected = True
elif self.detected == "False":
self.detected = False
else:
raise ValueError(f"Invalid value for x-flag-detected: {self.detected}")
self.area_percent = request.headers.get('x-area-percentage')
try:
self.area_percent = float(self.area_percent)
except (TypeError, ValueError) as e:
raise ValueError(f"Invalid value for x-area-percentage: {self.area_percent}")
# gathering files
self.image = request.files.get('image')
self.mask = request.files.get('mask')
self.seg_image = request.files.get('seg_mask')
if debug:
self.image.save(f"network_test/image_p{time.time()}.png")
if self.detected :
self.mask.save(f"network_test/mask_p{time.time()}.png")
self.seg_image.save(f"network_test/seg_p{time.time()}.png")
image_b64 = get_base64_encoded_image_from_file_binary(self.image)
seg_image_b64 = get_base64_encoded_image_from_file_binary(self.seg_image.read())
self.time_sent = time.time()
self.cctv_info = {
'cctv_name': self.cctv_name,
'cctv_latitude': self.cctv_latitude,
'cctv_longitude': self.cctv_longitude,
'source_frame': image_b64,
# 'frame_mask': self.mask,
'seg_frame': seg_image_b64,
'time_sent': self.time_sent
}
# if self.cctv_name in memory:
try :
memory[self.cctv_info['cctv_name']] = self.cctv_info
except :
pass
pass_fail = self.pass_fail()
memory.add_status(self.cctv_name, pass_fail, image_b64, seg_image_b64)
if debug:
print(memory())
except ValueError as e:
print(e)
except Exception as e:
print(e)
def pass_fail(self):
thres = 0.1
#TODO temporal pass_fail threshold
if self.area_percent > thres:
ret = 'FAIL'
else:
ret = 'OK'
return ret
if __name__ == "__main__":
print("Postprocess Online")
app.run(debug=True, port=13579)