File name
Commit message
Commit date
import numpy as np
from flask import Flask, request
from flask_restx import Api, Resource, fields
import os
from datetime import datetime
from yoloseg.inference_ import Inference, overlay_mask
import cv2
import time
import base64
app = Flask(__name__)
api = Api(app, version='1.0', title='CCTV Image Upload API',
description='A simple API for receiving CCTV images')
# Namespace definition
ns = api.namespace('cctv', description='CCTV operations')
model_path = 'yoloseg/weight/best.onnx'
classes_txt_file = 'config_files/yolo_config.txt'
image_path = 'yoloseg/img3.jpg'
model_input_shape = (640, 640)
inference_engine = Inference(
onnx_model_path=model_path,
model_input_shape=model_input_shape,
classes_txt_file=classes_txt_file,
run_with_cuda=True
)
# Define the expected model for incoming data
image_upload_model = api.model('ImageUpload', {
'image': fields.String(required=True, description='Image file', dt='File'),
'x-cctv-info': fields.String(required=False, description='CCTV identifier'),
'x-time-sent': fields.String(required=False, description='Time image was sent'),
'x-cctv-latitude': fields.String(required=False, description='Latitude of CCTV'),
'x-cctv-longitude': fields.String(required=False, description='Longitude of CCTV')
})
# Define the directory where images will be saved
IMAGE_DIR = "network_test"
if not os.path.exists(IMAGE_DIR):
os.makedirs(IMAGE_DIR)
@ns.route('/infer', )
class ImageUpload(Resource):
# @ns.expect(image_upload_model, validate=True)
@ns.response(200, 'Success')
@ns.response(400, 'Validation Error')
def post(self):
if 'file' not in request.files:
ns.abort(400, 'No image part in the request')
image = request.files['file']
cctv_info = base64.b64decode(request.headers.get('x-cctv-name', '')).decode('UTF-8')
time_sent = request.headers.get('x-time-sent', '')
cctv_latitude = request.headers.get('x-cctv-latitude', 'Not provided')
cctv_longitude = request.headers.get('x-cctv-longitude', 'Not provided')
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
image = image.read()
image = np.frombuffer(image, np.uint8)
image = cv2.imdecode(image, cv2.IMREAD_COLOR)
filename = f"{timestamp}_{cctv_info}.png"
t1 = time.time()
detections, mask_maps = inference_engine.run_inference(image)
t2 = time.time()
print(t2 - t1)
if len(mask_maps) != 0:
seg_image = overlay_mask(image, mask_maps[0], color=(0, 255, 0), alpha=0.3)
area_percent = 0
else :
area_percent = np.sum(mask_maps) / image.shape[0] * image.shape[1]
# write another post request for pushing a detection result
return {"message": f"Image {filename} uploaded successfully!"}
def send_result(self):
pass
if __name__ == '__main__':
app.run(debug=True, port=12345)