File name
Commit message
Commit date
import numpy as np
import os
import cv2
import time
from datetime import datetime
from zoneinfo import ZoneInfo
from flask import Flask, request
from flask_restx import Api, Resource, fields
import requests
from requests_toolbelt import MultipartEncoder
import base64
from yoloseg.inference_ import Inference, overlay_mask
from config_files.endpoints import POSTPROCESS_ENDPOINT
# from config_files import API_ENDPOINT_MAIN
app = Flask(__name__)
api = Api(app, version='1.0', title='CCTV Image Upload API',
description='A simple API for receiving CCTV images')
# Namespace definition
ns = api.namespace('cctv', description='CCTV operations')
model_path = 'yoloseg/weight/best.onnx'
classes_txt_file = 'config_files/yolo_config.txt'
image_path = 'yoloseg/img3.jpg'
model_input_shape = (640, 640)
inference_engine = Inference(
onnx_model_path=model_path,
model_input_shape=model_input_shape,
classes_txt_file=classes_txt_file,
run_with_cuda=True
)
# Define the expected model for incoming data
image_upload_model = api.model('ImageUpload', {
'image': fields.String(required=True, description='Image file', dt='File'),
'x-cctv-info': fields.String(required=False, description='CCTV identifier'),
'x-time-sent': fields.String(required=False, description='Time image was sent'),
'x-cctv-latitude': fields.String(required=False, description='Latitude of CCTV'),
'x-cctv-longitude': fields.String(required=False, description='Longitude of CCTV')
})
# Define the directory where images will be saved
IMAGE_DIR = "network_test"
if not os.path.exists(IMAGE_DIR):
os.makedirs(IMAGE_DIR)
@ns.route('/infer', )
class ImageUpload(Resource):
# @ns.expect(image_upload_model, validate=True)
def __init__(self, *args, **kargs):
super().__init__(*args, **kargs)
self.time_sent = None
self.cctv_latitude = None
self.cctv_longitude = None
self.cctv_name = None
self.mask = None
self.mask_blob = None
self.image = None
self.image_type = None
self.seg_image = None
self.flag_detected = False
self.area_percent = 0
self.time_zone = self.time_zone = ZoneInfo("Asia/Seoul")
self.endpoint = POSTPROCESS_ENDPOINT
@ns.response(200, 'Success')
@ns.response(400, 'Validation Error')
def post(self):
if 'file' not in request.files:
ns.abort(400, 'No image part in the request')
self.image = request.files['file']
self.image_type = request.headers.get('Content-Type')
self.cctv_name = base64.b64decode(request.headers.get('x-cctv-name', '')).decode('UTF-8')
self.time_sent = request.headers.get('x-time-sent', '')
self.cctv_latitude = request.headers.get('x-cctv-latitude', 'Not provided')
self.cctv_longitude = request.headers.get('x-cctv-longitude', 'Not provided')
# timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
image = self.image.read()
image = np.frombuffer(image, np.uint8)
image = cv2.imdecode(image, cv2.IMREAD_COLOR)
# filename = f"{timestamp}_{self.cctv_name}.png"
t1 = time.time()
detections, self.mask = inference_engine.run_inference(cv2.resize(image, model_input_shape))
t2 = time.time()
print(t2 - t1)
if len(self.mask) != 0:
self.flag_detected = True
else:
self.flag_detected = False
if self.flag_detected:
print(image.shape)
print(self.mask.shape)
self.mask = cv2.resize(self.mask, (image.shape[1], image.shape[0])) # cv2 saves image with w,h order
self.mask = self.mask[..., np.newaxis]
print(self.mask.shape)
self.mask_blob = cv2.imencode('.png', self.mask)
self.mask_blob = self.mask.tobytes()
self.seg_image = overlay_mask(image, self.mask[:,:,0], color=(0, 255, 0), alpha=0.3)
self.area_percent = np.sum(self.mask) / image.shape[0] * image.shape[1]
else :
self.area_percent = 0
self.send_result()
# write another post request for pushing a detection result
return {"message": f"Image {self.mask} uploaded successfully!"}
def send_result(self):
time_sent = datetime.now(self.time_zone).strftime("%Y-%m-%dT%H:%M:%SZ")
# print(str(self.flag_detected))
header = {
'X-Time-Sent': time_sent,
'X-CCTV-Name': base64.b64encode(str(self.cctv_name).encode('utf-8')).decode('ascii'),
'X-CCTV-Latitude': str(self.cctv_latitude),
'X-CCTV-Longitude': str(self.cctv_longitude),
'X-Area-Percentage' : str(self.area_percent),
'X-Flag-Detected' : str(self.flag_detected) #"True" or "False"
}
session = requests.Session()
try:
if self.flag_detected:
seg_binary = cv2.imencode('.png', self.seg_image)
seg_binary = seg_binary[1].tobytes()
multipart_data = MultipartEncoder(
fields={
'image': (
f'frame_{self.cctv_name}.{self.image_type}',
self.image,
f'image/{self.image_type}'
),
'mask' : (
f'frame_mask_{self.cctv_name}.{self.image_type}',
self.mask_blob,
f'image/{self.image_type}'
),
'seg_mask' : (
f'frame_seg_{self.cctv_name}.{self.image_type}',
seg_binary,
f'image/{self.image_type}'
)
}
)
header["Content-Type"] = multipart_data.content_type
response = session.post(self.endpoint, headers=header, data=multipart_data)
print(response)
else:
multipart_data = MultipartEncoder(
fields={
'image': (
f'frame_{self.cctv_name}.{self.image_type}',
self.image,
f'image/{self.image_type}'
),
}
)
header["Content-Type"] = multipart_data.content_type
response = session.post(self.endpoint, headers=header, data=multipart_data)
print(response)
except Exception as e:
print(e)
print("Can not connect to the postprocessing server. Check the endpoint address or connection.\n"
f"Can not connect to : {self.endpoint}")
if __name__ == '__main__':
app.run(debug=False, port=12345)