--- postprocess_draft.py
+++ postprocess_draft.py
... | ... | @@ -33,9 +33,6 @@ |
33 | 33 |
# Namespace definition |
34 | 34 |
ns = api.namespace('postprocess', description='Postprocessing of inference results') |
35 | 35 |
|
36 |
-with open('config_files/MAIN_DB_ENDPOINT.json', 'r') as file: |
|
37 |
- db_config = json.load(file) |
|
38 |
- |
|
39 | 36 |
class StreamSources(): |
40 | 37 |
def __init__(self, buffer_size, normal_send_interval, failure_mode_thres, failure_mode_check_past_n, normal_mode_thres, normal_mode_check_past_n): |
41 | 38 |
assert failure_mode_thres <= failure_mode_check_past_n,\ |
... | ... | @@ -89,6 +86,7 @@ |
89 | 86 |
"normal_to_failure_mode_change_alert" : False, |
90 | 87 |
"failure_to_normal_mode_change_alert" : False |
91 | 88 |
} |
89 |
+ self.cache_cctv_info = None |
|
92 | 90 |
else : |
93 | 91 |
raise KeyError(f"Error! Source {key} already initialized.") |
94 | 92 |
# Update logic here if needed |
... | ... | @@ -99,7 +97,7 @@ |
99 | 97 |
def __call__(self): |
100 | 98 |
return self.sources |
101 | 99 |
|
102 |
- def add_status(self, source, status, image, seg_image): |
|
100 |
+ def add_status(self, source, status, cctv_info, image, seg_image): |
|
103 | 101 |
assert status in ["OK", "FAIL"],\ |
104 | 102 |
f"Invalid status was given!, status must be one of 'OK' or 'FAIL', but given '{status}'!" |
105 | 103 |
|
... | ... | @@ -109,6 +107,8 @@ |
109 | 107 |
flag_send_event = False |
110 | 108 |
|
111 | 109 |
status_value = 1 if status == "OK" else 0 |
110 |
+ |
|
111 |
+ self.cache_cctv_info = cctv_info |
|
112 | 112 |
|
113 | 113 |
self.sources[source]["status_counts"].append(status_value) |
114 | 114 |
if len(self.sources[source]["status_counts"]) > self.buffer_size: |
... | ... | @@ -161,25 +161,63 @@ |
161 | 161 |
self.sources[source]["normal_to_failure_mode_change_alert"] = False |
162 | 162 |
|
163 | 163 |
def send_event(self, source): |
164 |
- try : |
|
164 |
+ source_data = self.sources[source] |
|
165 |
+ try: |
|
166 |
+ # Connect to the database |
|
165 | 167 |
conn = psycopg2.connect(**db_config) |
166 | 168 |
cursor = conn.cursor() |
167 | 169 |
|
168 |
- upload_data_sql_query = """ |
|
169 |
- INSERT INTO |
|
170 |
+ # Set the search path for the schema |
|
171 |
+ cursor.execute("SET search_path TO ai_camera_v0_1;") |
|
172 |
+ |
|
173 |
+ # Prepare the SQL query |
|
174 |
+ insert_sql = """ |
|
175 |
+ INSERT INTO flooding_detect_event ( |
|
176 |
+ ocrn_dt, |
|
177 |
+ eqpmn_nm, |
|
178 |
+ flooding_result, |
|
179 |
+ flooding_per, |
|
180 |
+ image, |
|
181 |
+ image_seg, |
|
182 |
+ eqpmn_lat, |
|
183 |
+ eqpmn_lon, |
|
184 |
+ norm_to_alert_flag, |
|
185 |
+ alert_to_norm_flag |
|
186 |
+ ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s); |
|
170 | 187 |
""" |
171 | 188 |
|
172 |
- cursor.close() |
|
173 |
- conn.close() |
|
189 |
+ # Prepare data to insert |
|
190 |
+ data_tuple = ( |
|
191 |
+ time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(source_data["last_send_before"])), |
|
192 |
+ source_data["cctv_info"]["cctv_name"], |
|
193 |
+ "FAIL" if source_data["failure_counts"] >= self.failure_mode_thres else "OK", |
|
194 |
+ source_data["cctv_info"]["area_percent"], |
|
195 |
+ source_data["most_recent_image"], |
|
196 |
+ source_data["most_recent_seg_image"], |
|
197 |
+ source_data["cctv_info"]["cctv_latitude"], |
|
198 |
+ source_data["cctv_info"]["cctv_longitude"], |
|
199 |
+ source_data["normal_to_failure_mode_change_alert"], |
|
200 |
+ source_data["failure_to_normal_mode_change_alert"] |
|
201 |
+ ) |
|
174 | 202 |
|
175 |
- except ValueError as e: |
|
176 |
- print(e) |
|
203 |
+ # Execute the query |
|
204 |
+ cursor.execute(insert_sql, data_tuple) |
|
205 |
+ conn.commit() |
|
206 |
+ |
|
207 |
+ print(f"EVENT: Sent for {source} - Data inserted successfully.") |
|
208 |
+ |
|
177 | 209 |
except Exception as e: |
178 |
- print(e) |
|
210 |
+ print(f"Database operation failed: {e}") |
|
211 |
+ finally: |
|
212 |
+ if cursor: |
|
213 |
+ cursor.close() |
|
214 |
+ if conn: |
|
215 |
+ conn.close() |
|
179 | 216 |
|
180 |
- self.sources[source]["last_send_before"] = 0 |
|
181 |
- print(f"EVENT : SENDING {source}!!") |
|
182 |
- pass |
|
217 |
+ # Reset the image data after sending to avoid re-sending the same image |
|
218 |
+ source_data["most_recent_image"] = None |
|
219 |
+ source_data["most_recent_seg_image"] = None |
|
220 |
+ source_data["last_send_before"] = 0 |
|
183 | 221 |
|
184 | 222 |
|
185 | 223 |
memory = StreamSources( |
... | ... | @@ -276,7 +314,7 @@ |
276 | 314 |
pass |
277 | 315 |
pass_fail = self.pass_fail() |
278 | 316 |
|
279 |
- memory.add_status(self.cctv_name, pass_fail, image_b64, seg_image_b64) |
|
317 |
+ memory.add_status(self.cctv_name, pass_fail, self.cctv_info, image_b64, seg_image_b64) |
|
280 | 318 |
|
281 | 319 |
if debug: |
282 | 320 |
print(memory()) |
Add a comment
Delete comment
Once you delete this comment, you won't be able to recover it. Are you sure you want to delete this comment?