import pandas as pd from datetime import datetime, timedelta import psycopg2 import json from flask import Flask from flask_restx import Api from apscheduler.schedulers.background import BackgroundScheduler with open('config_files/MAIN_DB_ENDPOINT.json', 'r') as file: db_config = json.load(file) app = Flask(__name__) print("ITS API Updater START") api = Api(app, version='0.1', title="monitoring", description="API Server", terms_url="/", contact="", ) def get_hourly_datetime_range(): current_time = datetime.now() current_hour_start = current_time.replace(minute=0, second=0, microsecond=0) next_hour_start = current_hour_start + timedelta(hours=1) # If the current time is past 5 minutes into the hour if current_time.minute > 5: return current_hour_start, next_hour_start else: previous_hour_start = current_hour_start - timedelta(hours=1) return previous_hour_start, current_hour_start def fetch_and_update(): conn = psycopg2.connect(**db_config) cursor = conn.cursor() set_schema_query = """ SET search_path TO ai_camera_v0_1; """ cursor.execute(set_schema_query) with conn: with conn.cursor() as cursor: # Determine the time window from last full hour to current full hour now = datetime.now() previous_hour = (now - timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) current_hour = now.replace(minute=0, second=0, microsecond=0) fetch_sql = """ SELECT eqpmn_id, COUNT(*) AS flooding_cnt FROM flooding_detect_event WHERE ocrn_dt >= %s AND ocrn_dt < %s AND norm_to_alert_flag = 'True' GROUP BY eqpmn_id; """ cursor.execute(fetch_sql, (previous_hour, current_hour)) rows = cursor.fetchall() df = pd.DataFrame(rows, columns=['eqpmn_id', 'flooding_cnt']) insert_sql = """ INSERT INTO flooding_anals_event_data_hr (clct_dt, eqpmn_id, flooding_cnt) VALUES (%s, %s, %s); """ for index, row in df.iterrows(): cursor.execute(insert_sql, (previous_hour, row['eqpmn_id'], row['flooding_cnt'])) conn.commit() scheduler = BackgroundScheduler() scheduler.add_job(func=fetch_and_update, trigger='cron', minute=5) scheduler.start() @app.route('/') def home(): return "Flooding analysis service running." if __name__ == '__main__': fetch_and_update() app.run(debug=True, use_reloader=False)