File name
Commit message
Commit date
1. code cleanup of inference_gpu_.py and inference_.py is now inference_cpu_.py 2. streaming_url_updator.py CORS fix 3. working DB INSERT of postprocess_draft.py
05-29
후처리 모듈 및 메인 서버 전달을 위한 수정 1. ITS cctv 스트리밍 정보를 하나의 프로세스가 하나의 영상을 담당하여 처리 및 실행하기 위한 스크립트와 bash 스크립트 2. FrameCapturer 객체에 위경도 정보 필수 arg
05-20
1. code cleanup of inference_gpu_.py and inference_.py is now inference_cpu_.py 2. streaming_url_updator.py CORS fix 3. working DB INSERT of postprocess_draft.py
05-29
1. code cleanup of inference_gpu_.py and inference_.py is now inference_cpu_.py 2. streaming_url_updator.py CORS fix 3. working DB INSERT of postprocess_draft.py
05-29
1. code cleanup of inference_gpu_.py and inference_.py is now inference_cpu_.py 2. streaming_url_updator.py CORS fix 3. working DB INSERT of postprocess_draft.py
05-29
import numpy as np
import os
import cv2
import time
from datetime import datetime
from zoneinfo import ZoneInfo
from flask import Flask, request
from flask_restx import Api, Resource, fields
import requests
from requests_toolbelt import MultipartEncoder
import base64
from yoloseg.inference_gpu_ import Inference, overlay_mask
from config_files.endpoints import POSTPROCESS_ENDPOINT
# from config_files import API_ENDPOINT_MAIN
app = Flask(__name__)
api = Api(app, version='1.0', title='CCTV Image Upload API',
description='A simple API for receiving CCTV images')
# Namespace definition
ns = api.namespace('cctv', description='CCTV operations')
model_path = 'yoloseg/weight/best.onnx'
classes_txt_file = 'config_files/yolo_config.txt'
image_path = 'yoloseg/img3.jpg'
model_input_shape = (640, 640)
inference_engine = Inference(
onnx_model_path=model_path,
model_input_shape=model_input_shape,
classes_txt_file=classes_txt_file,
run_with_cuda=True
)
# Define the expected model for incoming data
image_upload_model = api.model('ImageUpload', {
'image': fields.String(required=True, description='Image file', dt='File'),
'x-cctv-info': fields.String(required=False, description='CCTV identifier'),
'x-time-sent': fields.String(required=False, description='Time image was sent'),
'x-cctv-latitude': fields.String(required=False, description='Latitude of CCTV'),
'x-cctv-longitude': fields.String(required=False, description='Longitude of CCTV')
})
# Define the directory where images will be saved
IMAGE_DIR = "network_test"
if not os.path.exists(IMAGE_DIR):
os.makedirs(IMAGE_DIR)
@ns.route('/infer', )
class ImageUpload(Resource):
# @ns.expect(image_upload_model, validate=True)
def __init__(self, *args, **kargs):
super().__init__(*args, **kargs)
self.time_sent = None
self.cctv_latitude = None
self.cctv_longitude = None
self.cctv_name = None
self.mask = None
self.mask_blob = None
self.image = None
self.image_type = None
self.seg_image = None
self.flag_detected = False
self.area_percent = 0
self.time_zone = self.time_zone = ZoneInfo("Asia/Seoul")
self.endpoint = POSTPROCESS_ENDPOINT
@ns.response(200, 'Success')
@ns.response(400, 'Validation Error')
def post(self):
if 'file' not in request.files:
ns.abort(400, 'No image part in the request')
self.image = request.files['file']
self.image_type = request.headers.get('Content-Type')
self.cctv_name = base64.b64decode(request.headers.get('x-cctv-name', '')).decode('UTF-8')
self.time_sent = request.headers.get('x-time-sent', '')
self.cctv_latitude = request.headers.get('x-cctv-latitude', 'Not provided')
self.cctv_longitude = request.headers.get('x-cctv-longitude', 'Not provided')
# timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
image = self.image.read()
image = np.frombuffer(image, np.uint8)
image = cv2.imdecode(image, cv2.IMREAD_COLOR)
# filename = f"{timestamp}_{self.cctv_name}.png"
t1 = time.time()
detections, self.mask = inference_engine.run_inference(cv2.resize(image, model_input_shape))
t2 = time.time()
print(t2 - t1)
if len(self.mask) != 0:
self.flag_detected = True
else:
self.flag_detected = False
if self.flag_detected:
print(image.shape)
print(self.mask.shape)
self.mask = cv2.resize(self.mask, (image.shape[1], image.shape[0])) # cv2 saves image with w,h order
self.mask = self.mask[..., np.newaxis]
print(self.mask.shape)
self.mask_blob = cv2.imencode('.png', self.mask)
self.mask_blob = self.mask.tobytes()
self.seg_image = overlay_mask(image, self.mask[:,:,0], color=(0, 255, 0), alpha=0.3)
self.area_percent = np.sum(self.mask) / image.shape[0] * image.shape[1]
else :
self.area_percent = 0
self.send_result()
# write another post request for pushing a detection result
return {"message": f"Image {self.mask} uploaded successfully!"}
def send_result(self):
time_sent = datetime.now(self.time_zone).strftime("%Y-%m-%dT%H:%M:%SZ")
# print(str(self.flag_detected))
header = {
'X-Time-Sent': time_sent,
'X-CCTV-Name': base64.b64encode(str(self.cctv_name).encode('utf-8')).decode('ascii'),
'X-CCTV-Latitude': str(self.cctv_latitude),
'X-CCTV-Longitude': str(self.cctv_longitude),
'X-Area-Percentage' : str(self.area_percent),
'X-Flag-Detected' : str(self.flag_detected) #"True" or "False"
}
session = requests.Session()
try:
if self.flag_detected:
seg_binary = cv2.imencode('.png', self.seg_image)
seg_binary = seg_binary[1].tobytes()
multipart_data = MultipartEncoder(
fields={
'image': (
f'frame_{self.cctv_name}.{self.image_type}',
self.image,
f'image/{self.image_type}'
),
'mask' : (
f'frame_mask_{self.cctv_name}.{self.image_type}',
self.mask_blob,
f'image/{self.image_type}'
),
'seg_mask' : (
f'frame_seg_{self.cctv_name}.{self.image_type}',
seg_binary,
f'image/{self.image_type}'
)
}
)
header["Content-Type"] = multipart_data.content_type
response = session.post(self.endpoint, headers=header, data=multipart_data)
print(response)
else:
multipart_data = MultipartEncoder(
fields={
'image': (
f'frame_{self.cctv_name}.{self.image_type}',
self.image,
f'image/{self.image_type}'
),
}
)
header["Content-Type"] = multipart_data.content_type
response = session.post(self.endpoint, headers=header, data=multipart_data)
print(response)
except Exception as e:
print(e)
print("Can not connect to the postprocessing server. Check the endpoint address or connection.\n"
f"Can not connect to : {self.endpoint}")
if __name__ == '__main__':
app.run(debug=False, port=12345)