File name
Commit message
Commit date
bug fix : mismatch in request file fields name causing postprocess_draft.py not reading segmented image
06-03
1. code cleanup of inference_gpu_.py and inference_.py is now inference_cpu_.py 2. streaming_url_updator.py CORS fix 3. working DB INSERT of postprocess_draft.py
05-29
File name
Commit message
Commit date
import time
import cv2
import numpy as np
import random
import onnxruntime as ort
from config_files.yolo_config import CLASS_NUM
from typing import List, Tuple
class Inference:
def __init__(self, onnx_model_path, model_input_shape, classes_txt_file, run_with_cuda):
self.model_path = onnx_model_path
self.model_shape = model_input_shape
self.classes_path = classes_txt_file
self.cuda_enabled = run_with_cuda
self.letter_box_for_square = True
self.model_score_threshold = 0.3
self.model_nms_threshold = 0.6
self.classes = []
self.session = None
self.load_onnx_network()
self.load_classes_from_file()
def sigmoid(self, x):
return 1 / (1 + np.exp(-x))
def run_inference(self, input_image):
model_input = input_image
# print(input_image)
if self.letter_box_for_square and self.model_shape[0] == self.model_shape[1]:
model_input = self.format_to_square(model_input)
blob = cv2.dnn.blobFromImage(model_input, 1.0 / 255.0, self.model_shape, (0, 0, 0), True, False)
# Prepare input data as a dictionary
inputs = {self.session.get_inputs()[0].name: blob}
# Run model
outputs = self.session.run(None, inputs)
outputs_bbox = outputs[0]
outputs_mask = outputs[1]
detections = self.process_detections(outputs_bbox, model_input)
mask_maps = self.process_mask_output(detections, outputs_mask, model_input.shape)
return detections, mask_maps
def load_onnx_network(self):
# Set up the ONNX Runtime session with appropriate device settings
try:
if self.cuda_enabled:
providers = [('CUDAExecutionProvider', {'device_id': 0})]
else:
providers = ['CPUExecutionProvider']
self.session = ort.InferenceSession(self.model_path, providers=providers)
print(f"Running on {'CUDA' if self.cuda_enabled else 'CPU'}")
print(f"Model loaded successfully. Input name: {self.session.get_inputs()[0].name}")
except Exception as e:
print(f"Failed to load the ONNX model: {e}")
self.session = None
def load_classes_from_file(self):
with open(self.classes_path, 'r') as f:
self.classes = f.read().strip().split('\n')
def format_to_square(self, source):
col, row = source.shape[1], source.shape[0]
max_side = max(col, row)
result = np.zeros((max_side, max_side, 3), dtype=np.uint8)
result[0:row, 0:col] = source
return result
def process_detections(self, outputs_bbox, model_input):
# Assuming outputs_bbox is already in the (x, y, w, h, confidence, class_probs...) format
x_factor = model_input.shape[1] / self.model_shape[0]
y_factor = model_input.shape[0] / self.model_shape[1]
t1 = time.time()
# Assuming outputs_bbox is an array with shape (N, 4+CLASS_NUM+32) where N is the number of detections
# Example outputs_bbox.shape -> (batch_size, 4+CLASS_NUM+32, 8400)
# Extract basic bbox coordinates and scores
x, y, w, h = outputs_bbox[:, 0], outputs_bbox[:, 1], outputs_bbox[:, 2], outputs_bbox[:, 3]
scores = outputs_bbox[:, 4:4 + CLASS_NUM]
# Calculate confidences and class IDs
confidences = np.max(scores, axis=1)
class_ids = np.argmax(scores, axis=1)
# Filter out small boxes
min_width, min_height = 20, 20
valid_size = (w >= min_width) & (h >= min_height)
# Apply confidence threshold
valid_confidence = (confidences > self.model_score_threshold)
# Combine all conditions
valid_detections = valid_size & valid_confidence
# proto_mask_score
scores_segmentation = outputs_bbox[:, 4 + CLASS_NUM:]
# Filter arrays based on valid detections
filtered_x = x[valid_detections]
filtered_y = y[valid_detections]
filtered_w = w[valid_detections]
filtered_h = h[valid_detections]
filtered_confidences = confidences[valid_detections]
filtered_class_ids = class_ids[valid_detections]
filtered_mask_coefficient = np.transpose(scores_segmentation, (2,0,1))[valid_detections.T]
# Calculate adjusted box coordinates
left = (filtered_x - 0.5 * filtered_w) * x_factor
top = (filtered_y - 0.5 * filtered_h) * y_factor
width = filtered_w * x_factor
height = filtered_h * y_factor
# Prepare final arrays
boxes = np.vstack([left, top, width, height]).T
# Change it into int for mask operation
boxes = boxes.astype(int)
boxes = boxes.tolist()
filtered_confidences = filtered_confidences.tolist()
filtered_class_ids = filtered_class_ids.tolist()
if not len(boxes) <= 0 :
indices = cv2.dnn.NMSBoxes(boxes, filtered_confidences, self.model_score_threshold, self.model_nms_threshold)
else:
indices = []
detections = []
for i in indices:
idx = i
result = {
'class_id': filtered_class_ids[i],
'confidence': filtered_confidences[i],
'mask_coefficients': np.array(filtered_mask_coefficient[i]),
'box': boxes[idx],
'class_name': self.classes[filtered_class_ids[i]],
'color': (random.randint(100, 255), random.randint(100, 255), random.randint(100, 255))
}
detections.append(result)
return detections
def process_mask_output(self, detections, proto_masks, image_shape):
if not detections:
return []
batch_size, num_protos, proto_height, proto_width = proto_masks.shape
full_masks = np.zeros((len(detections), image_shape[0], image_shape[1]), dtype=np.float32)
for idx, det in enumerate(detections):
box = det['box']
x1, y1, w, h = self.adjust_box_coordinates(box, (image_shape[0], image_shape[1]))
if w <=1 or h <= 1:
continue
# Get the corresponding mask coefficients for this detection
coeffs = det["mask_coefficients"]
# Compute the linear combination of proto masks
# for now, plural batch operation is not supported, and this is the point where you should start.
# instead of hardcoded proto_masks[0], do some iterative/vectorize operation
mask = np.tensordot(coeffs, proto_masks[0], axes=[0, 0]) # Dot product along the number of prototypes
resized_mask = cv2.resize(mask,(image_shape[0], image_shape[1]))
# Resize mask to the bounding box size, using sigmoid to normalize
cropped_mask = resized_mask[y1:y1+h, x1:x1+w]
resized_mask = self.sigmoid(cropped_mask)
# Threshold to create a binary mask
final_mask = (resized_mask > 0.5).astype(np.uint8)
# Place the mask in the corresponding location on a full-sized mask image_binary
full_mask = np.zeros((image_shape[0], image_shape[1]), dtype=np.uint8)
full_mask[y1:y1+h, x1:x1+w] = final_mask
# Combine the mask with the masks of other detections
full_masks[idx] = full_mask
all_mask = full_masks.sum(axis=0)
all_mask = np.clip(all_mask, 0, 1)
# Append a dimension so that cv2 can understand ```all_mask``` argument as an image.
# This is because for this particular application, there is only single class ```water_body```
# However, if that is not the case, you must modify this part.
all_mask = all_mask.reshape((image_shape[0], image_shape[1], 1))
return all_mask.astype(np.uint8)
def adjust_box_coordinates(self, box: List[int], image_shape: Tuple[int, int]) -> Tuple[int, int, int, int]:
"""
Adjusts bounding box coordinates to ensure they lie within image boundaries.
"""
x1, y1, w, h = box
x2, y2 = x1 + w, y1 + h
# Clamp coordinates to image boundaries
x1 = max(0, x1)
y1 = max(0, y1)
x2 = min(image_shape[1], x2)
y2 = min(image_shape[0], y2)
# Recalculate width and height
w = x2 - x1
h = y2 - y1
return x1, y1, w, h
def load_classes_from_file(self):
with open(self.classes_path, 'r') as f:
self.classes = f.read().strip().split('\n')
def format_to_square(self, source):
col, row = source.shape[1], source.shape[0]
max_side = max(col, row)
result = np.zeros((max_side, max_side, 3), dtype=np.uint8)
result[0:row, 0:col] = source
return result
def overlay_mask(image, mask, color=(0, 255, 0), alpha=0.5):
"""
Overlays a mask onto an image_binary using a specified color and transparency level.
Parameters:
image (np.ndarray): The original image_binary.
mask (np.ndarray): The mask to overlay. Must be the same size as the image_binary.
color (tuple): The color for the mask overlay in BGR format (default is green).
alpha (float): Transparency factor for the mask; 0 is fully transparent, 1 is opaque.
Returns:
np.ndarray: The image_binary with the overlay.
"""
assert alpha <= 1 and 0 <= alpha, (f"Error! invalid alpha value, it must be float, inbetween including 0 to 1, "
f"\n given alpha : {alpha}")
# Ensure the mask is a binary mask
mask = (mask > 0).astype(np.uint8) # Convert mask to binary if not already
# Create an overlay with the same size as the image_binary but only using the mask area
overlay = np.zeros_like(image, dtype=np.uint8)
overlay[mask == 1] = color
# Blend the overlay with the image_binary using the alpha factor
return cv2.addWeighted(src1=overlay, alpha=alpha, src2=image, beta=1 - alpha, gamma=0)
def test():
import time
import glob
import os
# Path to your ONNX model and classes text file
model_path = 'yoloseg/weight/best.onnx'
classes_txt_file = 'config_files/yolo_config.txt'
# image_path = 'yoloseg/img3.jpg'
image_path = 'yoloseg/img.jpg'
model_input_shape = (480, 480)
inference_engine = Inference(
onnx_model_path=model_path,
model_input_shape=model_input_shape,
classes_txt_file=classes_txt_file,
run_with_cuda=True
)
# Load an image_binary
img = cv2.imread(image_path)
if img is None:
print("Error loading image_binary")
return
img = cv2.resize(img, model_input_shape)
# Run inference
# for i in range(10):
# t1 = time.time()
# detections, mask_maps = inference_engine.run_inference(img)
# t2 = time.time()
# print(t2 - t1)
images = glob.glob("/home/juni/사진/flood/out-/*.jpg")
images = sorted(images)
for k, image in enumerate(images):
image = cv2.imread(image)
t1 = time.time()
image = cv2.resize(image, model_input_shape)
detections, mask_maps = inference_engine.run_inference(image)
# Display results
for detection in detections:
x, y, w, h = detection['box']
class_name = detection['class_name']
confidence = detection['confidence']
cv2.rectangle(image, (x, y), (x+w, y+h), detection['color'], 2)
label = f"{class_name}: {confidence:.2f}"
cv2.putText(image, label, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, detection['color'], 2)
if len(mask_maps) != 0:
for i in range(mask_maps.shape[2]): # Iterate over each mask
seg_image = overlay_mask(image, mask_maps[:, :, i], color=(0, 255, 0), alpha=0.3)
# cv2.imshow(f"Segmentation {i + 1}", seg_image)
# cv2.waitKey(0) # Wait for a key press before showing the next mask
# cv2.destroyAllWindows()
t2 = time.time()
print(t2 - t1)
cv2.imwrite( f"/home/juni/사진/flood/infer/{k}.jpg", seg_image)
# Show the image_binary
# cv2.imshow('Detections', img)
# cv2.waitKey(0)
# cv2.destroyAllWindows()
def process_video(video_path, output_dir, model_input_shape=(480,480)):
import os
import av
import csv
model_path = 'yoloseg/weight/best.onnx'
classes_txt_file = 'config_files/yolo_config.txt'
model_input_shape = (480, 480)
inference_engine = Inference(
onnx_model_path=model_path,
model_input_shape=model_input_shape,
classes_txt_file=classes_txt_file,
run_with_cuda=True
)
# Open video using PyAV
container = av.open(video_path)
frame_times = [] # List to store inference time per frame
frame_count = 0
# Create output directory if it doesn't exist
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Get frame rate from video to control display speed
video_fps = container.streams.video[0].average_rate
if video_fps is None:
video_fps = 25 # Default to 25 FPS if unavailable
# Decode video frame by frame
for frame in container.decode(video=0):
frame_count += 1
# Convert PyAV frame to numpy array (OpenCV format)
img = frame.to_ndarray(format='bgr24')
t1 = time.time()
# Resize frame to match model input shape
resized_frame = cv2.resize(img, model_input_shape)
# Run inference
detections, mask_maps = inference_engine.run_inference(resized_frame)
# Display detections
for detection in detections:
x, y, w, h = detection['box']
class_name = detection['class_name']
confidence = detection['confidence']
cv2.rectangle(resized_frame, (x, y), (x+w, y+h), detection['color'], 2)
label = f"{class_name}: {confidence:.2f}"
cv2.putText(resized_frame, label, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, detection['color'], 2)
if len(mask_maps) != 0:
for i in range(mask_maps.shape[2]): # Iterate over each mask
resized_frame = overlay_mask(resized_frame, mask_maps[:, :, i], color=(0, 255, 0), alpha=0.3)
# Show the processed frame
# cv2.imshow('Processed Frame', resized_frame)
# # Save the processed frame
output_frame_path = os.path.join(output_dir, f"frame_{frame_count}.jpg")
cv2.imwrite(output_frame_path, resized_frame)
t2 = time.time()
frame_time = t2 - t1
frame_times.append(frame_time)
print(f"Frame {frame_count} inference time: {frame_time:.4f} seconds")
# Adjust frame display rate based on FPS
if cv2.waitKey(int(1000 / video_fps)) & 0xFF == ord('q'):
break
# Close OpenCV windows
cv2.destroyAllWindows()
# Calculate and save inference times to CSV
avg_inference_time = sum(frame_times) / len(frame_times) if frame_times else 0
output_csv_path = os.path.join(output_dir, "inference.csv")
with open(output_csv_path, mode='w', newline='') as csv_file:
writer = csv.writer(csv_file)
writer.writerow(["Frame", "Inference Time (seconds)"])
for i, time_val in enumerate(frame_times):
writer.writerow([i + 1, time_val])
writer.writerow(["Average", avg_inference_time])
print(f"Average inference time: {avg_inference_time:.4f} seconds")
print(f"Inference times saved to {output_csv_path}")
if __name__ == "__main__":
# test()
process_video("/home/juni/사진/flood/test_video.mp4", "/home/juni/사진/flood/infer/")