import copy import numpy as np import os import cv2 import time from datetime import datetime from zoneinfo import ZoneInfo from flask import Flask, request from flask_restx import Api, Resource, fields import requests from requests_toolbelt import MultipartEncoder import base64 from yoloseg.inference_gpu_ import Inference, overlay_mask from config_files.endpoints import POSTPROCESS_ENDPOINT # from config_files import API_ENDPOINT_MAIN app = Flask(__name__) api = Api(app, version='1.0', title='CCTV Image Upload API', description='A simple API for receiving CCTV images') # Namespace definition ns = api.namespace('cctv', description='CCTV operations') model_path = 'yoloseg/weight/best.onnx' classes_txt_file = 'config_files/yolo_config.txt' image_path = 'yoloseg/img3.jpg' model_input_shape = (640, 640) inference_engine = Inference( onnx_model_path=model_path, model_input_shape=model_input_shape, classes_txt_file=classes_txt_file, run_with_cuda=True ) # Define the expected model for incoming data image_upload_model = api.model('ImageUpload', { 'image': fields.String(required=True, description='Image file', dt='File'), 'x-cctv-info': fields.String(required=False, description='CCTV identifier'), 'x-time-sent': fields.String(required=False, description='Time image was sent'), 'x-cctv-latitude': fields.String(required=False, description='Latitude of CCTV'), 'x-cctv-longitude': fields.String(required=False, description='Longitude of CCTV') }) # Define the directory where images will be saved IMAGE_DIR = "network_test" if not os.path.exists(IMAGE_DIR): os.makedirs(IMAGE_DIR) @ns.route('/infer', ) class ImageUpload(Resource): # @ns.expect(image_upload_model, validate=True) def __init__(self, *args, **kargs): super().__init__(*args, **kargs) self.time_sent = None self.cctv_latitude = None self.cctv_longitude = None self.cctv_name = None self.mask = None self.mask_blob = None self.image = None self.image_binary = None self.image_type = None self.seg_image = None self.flag_detected = False self.area_percent = 0 self.time_zone = self.time_zone = ZoneInfo("Asia/Seoul") self.endpoint = POSTPROCESS_ENDPOINT @ns.response(200, 'Success') @ns.response(400, 'Validation Error') def post(self): if 'file' not in request.files: ns.abort(400, 'No image part in the request') self.image = request.files['file'] self.image_type = request.headers.get('Content-Type') self.cctv_name = base64.b64decode(request.headers.get('x-cctv-name', '')).decode('UTF-8') self.time_sent = request.headers.get('x-time-sent', '') self.cctv_latitude = request.headers.get('x-cctv-latitude', 'Not provided') self.cctv_longitude = request.headers.get('x-cctv-longitude', 'Not provided') # timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.image = self.image.read() # self.image.seek(0) self.image_binary = copy.deepcopy(self.image) self.image = np.frombuffer(self.image, np.uint8) self.image = cv2.imdecode(self.image, cv2.IMREAD_COLOR) # filename = f"{timestamp}_{self.cctv_name}.png" t1 = time.time() detections, self.mask = inference_engine.run_inference(cv2.resize(self.image, model_input_shape)) t2 = time.time() print(t2 - t1) if len(self.mask) != 0: self.flag_detected = True else: self.flag_detected = False if self.flag_detected: print(self.mask.shape) self.mask = cv2.resize(self.mask, (self.image.shape[1], self.image.shape[0])) # cv2 saves image with w,h order self.mask = self.mask[..., np.newaxis] print(self.mask.shape) self.mask_blob = cv2.imencode('.png', self.mask) self.mask_blob = self.mask.tobytes() self.seg_image = overlay_mask(self.image, self.mask[:,:,0], color=(0, 255, 0), alpha=0.3) self.area_percent = np.sum(self.mask) / self.image.shape[0] * self.image.shape[1] else : self.area_percent = 0 self.send_result() # write another post request for pushing a detection result return {"message": f"Image {self.mask} uploaded successfully!"} def send_result(self): time_sent = datetime.now(self.time_zone).strftime("%Y-%m-%dT%H:%M:%SZ") # print(str(self.flag_detected)) header = { 'X-Time-Sent': time_sent, 'X-CCTV-Name': base64.b64encode(str(self.cctv_name).encode('utf-8')).decode('ascii'), 'X-CCTV-Latitude': str(self.cctv_latitude), 'X-CCTV-Longitude': str(self.cctv_longitude), 'X-Area-Percentage' : str(self.area_percent), 'X-Flag-Detected' : str(self.flag_detected) #"True" or "False" } session = requests.Session() image_binary = cv2.imencode('.png', self.image) image_binary = image_binary[1].tobytes() try: if self.flag_detected: seg_binary = cv2.imencode('.png', self.seg_image) seg_binary = seg_binary[1].tobytes() multipart_data = MultipartEncoder( fields={ 'image': ( f'frame_{self.cctv_name}.{self.image_type}', image_binary, f'image/{self.image_type}' ), # 'mask' : ( # f'frame_mask_{self.cctv_name}.{self.image_type}', # self.mask_blob, # f'image/{self.image_type}' # ), 'seg_image' : ( f'frame_seg_{self.cctv_name}.{self.image_type}', seg_binary, f'image/{self.image_type}' ) } ) header["Content-Type"] = multipart_data.content_type response = session.post(self.endpoint, headers=header, data=multipart_data) print(response) else: multipart_data = MultipartEncoder( fields={ 'image': ( f'frame_{self.cctv_name}.{self.image_type}', image_binary, f'image/{self.image_type}' ), } ) header["Content-Type"] = multipart_data.content_type response = session.post(self.endpoint, headers=header, data=multipart_data) # def print_request(req): # from requests_toolbelt.utils import dump # data = dump.dump_all(req) # print(data.decode('utf-8')) # print_request(response) except Exception as e: print(e) print("Can not connect to the postprocessing server. Check the endpoint address or connection.\n" f"Can not connect to : {self.endpoint}") if __name__ == '__main__': app.run(debug=False, port=12345)