+++ Hourly_db_schedular.py
... | ... | @@ -0,0 +1,84 @@ |
1 | +import pandas as pd | |
2 | +from datetime import datetime, timedelta | |
3 | +import psycopg2 | |
4 | +import json | |
5 | + | |
6 | +from flask import Flask | |
7 | +from flask_restx import Api | |
8 | +from apscheduler.schedulers.background import BackgroundScheduler | |
9 | + | |
10 | +with open('config_files/MAIN_DB_ENDPOINT.json', 'r') as file: | |
11 | + db_config = json.load(file) | |
12 | + | |
13 | +app = Flask(__name__) | |
14 | +print("ITS API Updater START") | |
15 | + | |
16 | +api = Api(app, | |
17 | + version='0.1', | |
18 | + title="monitoring", | |
19 | + description="API Server", | |
20 | + terms_url="/", | |
21 | + contact="", | |
22 | + ) | |
23 | + | |
24 | +def get_hourly_datetime_range(): | |
25 | + current_time = datetime.now() | |
26 | + current_hour_start = current_time.replace(minute=0, second=0, microsecond=0) | |
27 | + next_hour_start = current_hour_start + timedelta(hours=1) | |
28 | + | |
29 | + # If the current time is past 5 minutes into the hour | |
30 | + if current_time.minute > 5: | |
31 | + return current_hour_start, next_hour_start | |
32 | + else: | |
33 | + previous_hour_start = current_hour_start - timedelta(hours=1) | |
34 | + return previous_hour_start, current_hour_start | |
35 | + | |
36 | + | |
37 | +def fetch_and_update(): | |
38 | + conn = psycopg2.connect(**db_config) | |
39 | + cursor = conn.cursor() | |
40 | + | |
41 | + set_schema_query = """ | |
42 | + SET search_path TO ai_camera_v0_1; | |
43 | + """ | |
44 | + cursor.execute(set_schema_query) | |
45 | + | |
46 | + with conn: | |
47 | + with conn.cursor() as cursor: | |
48 | + # Determine the time window from last full hour to current full hour | |
49 | + now = datetime.now() | |
50 | + previous_hour = (now - timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) | |
51 | + current_hour = now.replace(minute=0, second=0, microsecond=0) | |
52 | + | |
53 | + # SQL to fetch data | |
54 | + fetch_sql = """ | |
55 | + SELECT eqpmn_id, COUNT(*) AS flooding_cnt | |
56 | + FROM flooding_detect_event | |
57 | + WHERE ocrn_dt >= %s AND ocrn_dt < %s AND norm_to_alert_flag = 'True' | |
58 | + GROUP BY eqpmn_id; | |
59 | + """ | |
60 | + cursor.execute(fetch_sql, (previous_hour, current_hour)) | |
61 | + rows = cursor.fetchall() | |
62 | + df = pd.DataFrame(rows, columns=['eqpmn_id', 'flooding_cnt']) | |
63 | + | |
64 | + # Insert results into flooding_anals_event_data_hr | |
65 | + insert_sql = """ | |
66 | + INSERT INTO flooding_anals_event_data_hr (clct_dt, eqpmn_id, flooding_cnt) | |
67 | + VALUES (%s, %s, %s); | |
68 | + """ | |
69 | + for index, row in df.iterrows(): | |
70 | + cursor.execute(insert_sql, (previous_hour, row['eqpmn_id'], row['flooding_cnt'])) | |
71 | + conn.commit() | |
72 | + | |
73 | +# Scheduler configuration | |
74 | +scheduler = BackgroundScheduler() | |
75 | +scheduler.add_job(func=fetch_and_update, trigger='cron', minute=5) | |
76 | +scheduler.start() | |
77 | + | |
78 | [email protected]('/') | |
79 | +def home(): | |
80 | + return "Flooding analysis service running." | |
81 | + | |
82 | +if __name__ == '__main__': | |
83 | + app.run(debug=True, use_reloader=False) | |
84 | + |
--- config_files/MAIN_DB_ENDPOINT.json
+++ config_files/MAIN_DB_ENDPOINT.json
... | ... | @@ -3,17 +3,5 @@ |
3 | 3 |
"port" : "5423", |
4 | 4 |
"id" : "takensoft", |
5 | 5 |
"password" : "tts96314728!@", |
6 |
- "table_name" : "flooding_detect_event", |
|
7 |
- "columns" : [ |
|
8 |
- "ocrn_dt", |
|
9 |
- "eqpmn_id", |
|
10 |
- "flooding_result", |
|
11 |
- "flooding_per", |
|
12 |
- "image", |
|
13 |
- "image_seg", |
|
14 |
- "eqpmn_lat", |
|
15 |
- "eqpmn_lon", |
|
16 |
- "flooding_y", |
|
17 |
- "flooding_x" |
|
18 |
- ] |
|
6 |
+ "schema_name" : "ai_camera_v0_1", |
|
19 | 7 |
}(파일 끝에 줄바꿈 문자 없음) |
--- postprocess_draft.py
+++ postprocess_draft.py
... | ... | @@ -2,11 +2,13 @@ |
2 | 2 |
from flask import Flask, request, jsonify |
3 | 3 |
from flask_restx import Api, Resource, fields |
4 | 4 |
import os |
5 |
+import psycopg2 |
|
5 | 6 |
from datetime import datetime |
6 | 7 |
from yoloseg.inference_ import Inference, overlay_mask |
7 | 8 |
import cv2 |
8 | 9 |
import time |
9 | 10 |
import base64 |
11 |
+import json |
|
10 | 12 |
import requests |
11 | 13 |
import typing |
12 | 14 |
from requests_toolbelt import MultipartEncoder |
... | ... | @@ -30,6 +32,9 @@ |
30 | 32 |
|
31 | 33 |
# Namespace definition |
32 | 34 |
ns = api.namespace('postprocess', description='Postprocessing of inference results') |
35 |
+ |
|
36 |
+with open('config_files/MAIN_DB_ENDPOINT.json', 'r') as file: |
|
37 |
+ db_config = json.load(file) |
|
33 | 38 |
|
34 | 39 |
class StreamSources(): |
35 | 40 |
def __init__(self, buffer_size, normal_send_interval, failure_mode_thres, failure_mode_check_past_n, normal_mode_thres, normal_mode_check_past_n): |
... | ... | @@ -147,7 +152,7 @@ |
147 | 152 |
if flag_send_event: |
148 | 153 |
self.send_event(source) |
149 | 154 |
|
150 |
- # alert only alarms once |
|
155 |
+ # alert alarms only once |
|
151 | 156 |
if self.sources[source]["failure_to_normal_mode_change_alert"]: |
152 | 157 |
self.sources[source]["failure_to_normal_mode_change_alert"] = False |
153 | 158 |
|
... | ... | @@ -155,6 +160,22 @@ |
155 | 160 |
self.sources[source]["normal_to_failure_mode_change_alert"] = False |
156 | 161 |
|
157 | 162 |
def send_event(self, source): |
163 |
+ try : |
|
164 |
+ conn = psycopg2.connect(**db_config) |
|
165 |
+ cursor = conn.cursor() |
|
166 |
+ |
|
167 |
+ upload_data_sql_query = """ |
|
168 |
+ INSERT INTO |
|
169 |
+ """ |
|
170 |
+ |
|
171 |
+ cursor.close() |
|
172 |
+ conn.close() |
|
173 |
+ |
|
174 |
+ except ValueError as e: |
|
175 |
+ print(e) |
|
176 |
+ except Exception as e: |
|
177 |
+ print(e) |
|
178 |
+ |
|
158 | 179 |
self.sources[source]["last_send_before"] = 0 |
159 | 180 |
print(f"EVENT : SENDING {source}!!") |
160 | 181 |
pass |
Add a comment
Delete comment
Once you delete this comment, you won't be able to recover it. Are you sure you want to delete this comment?