File name
Commit message
Commit date
File name
Commit message
Commit date
File name
Commit message
Commit date
2023-09-26
import os
import pandas as pd
import requests
from datetime import datetime, timedelta
from io import StringIO
from tools.weather_agency_api.duplicate_check import duplicate_check
from tools.weather_agency_api.check_missing import check_missing
from sqlalchemy import create_engine
DATABASE_URL = "postgresql+psycopg2://username:ts4430!@@localhost:5432/welding"
# https://apihub.kma.go.kr/ 참고
weather_api_columns = [
"관측시각",
"지점번호",
"풍향",
"풍속",
"돌풍향",
"돌풍속",
"돌풍속_관측_시각",
"현지기압",
"해면기압",
"기압변화경향",
"기압변화량",
"기온",
"이슬점_온도",
"상대습도",
"수증기압",
"강수량",
"위_관측시간까지의_일강수량",
"위_관측시간까지의_일강수량(전문)",
"강수강도",
"3시간_신적설",
"일_신적설",
"적설",
"GTS_현재일기",
"GTS_과거일기",
"국내식_일기코드",
"전운량",
"중하층운량",
"최저운고",
"운형",
"GTS_상층운행",
"GTS_중층운행",
"GTS_하층운행",
"시정",
"일조",
"일사",
"지면상태_코드",
"지면온도",
"5cm지중온도",
"10cm지중온도",
"20cm지중온도",
"30cm지중온도",
"해면상태_코드",
"파고",
"Beaufart_최대풍력",
"강수자료",
"유인관측/무인관측"
]
def generate_dates(start_date, end_date):
current_date = datetime.strptime(start_date, '%Y%m%d%H%M')
end_date = datetime.strptime(end_date, '%Y%m%d%H%M')
list_of_days = []
while current_date <= end_date:
list_of_days.append(current_date.strftime('%Y%m%d%H%M'))
current_date += timedelta(days=1)
return list_of_days
def call_administration_of_weather_api(start_date, end_date, stn=281): # 281 == 영천
headers = {
'Content-Type': 'application/json'
}
# 이거 기상청 api가 조금 많이 이상해서 해더가 실제로 작동하지 아니하고 json 대신 그냥 txt 로 반환됩니다.
auth = "umD2O5RRRSOg9juUUcUjvw"
url = (f'https://apihub.kma.go.kr/api/typ01/url/kma_sfctm3.php?'
f'tm1={start_date}&tm2={end_date}&stn={stn}&authKey={auth}') # API URL 설정
response = requests.get(url, headers=headers) # GET 요청
return response.text
def update_weather_info_to_today():
"""
Updates the weather information up to today at 00:00.
"""
# Set up a connection using SQLAlchemy (for Pandas operations)
engine = create_engine(DATABASE_URL)
# Load existing data
query = "SELECT * FROM weather_data"
df_weather_existing = pd.read_sql(query, engine)
# Find the last date available in the database
last_date_str = f'{df_weather_existing.iloc[-1]["관측시각"]}'
last_date = datetime.strptime(last_date_str, '%Y%m%d%H%M')
# Get today's date at 00:00
today_00 = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
# If the last date in the database is not today's date, fetch data and append
if last_date < today_00:
start_date = (last_date + timedelta(hours=1)).strftime('%Y%m%d%H%M')
end_date = today_00.strftime('%Y%m%d%H%M')
date_lists = generate_dates(start_date, end_date) # Ensure this function is defined
df_weather_new = pd.DataFrame()
k = 24
for i, date in enumerate(range(0, len(date_lists) - 1, k)):
if date + k <= len(date_lists):
end_day = date + k
else:
end_day = len(date_lists) - 1
text = call_administration_of_weather_api(date_lists[date], date_lists[end_day]) # Ensure this function is defined
buffer = StringIO(text)
df = pd.read_csv(buffer, skiprows=2, skipfooter=1, sep=r"\s+", header=None, index_col=False, engine="python").iloc[2:, :-1]
df = df.set_axis(weather_api_columns, axis=1, inplace=False) # Ensure 'weather_api_columns' is defined
if not check_missing(df): # Ensure this function is defined
print("API is not working!")
return {
"responses": 500
}
df_weather_new = pd.concat([df_weather_new, df], ignore_index=True)
print(f"{i}/{len(range(0, len(date_lists) - 1, k)) - 1}")
# Append the new data to the existing data in the database
df_weather_new.to_sql('weather_data', engine, if_exists='append', index=False)
else:
print("Weather data is already up-to-date!")
def update_weather_info_to_today_csv(file_name):
"""
Updates the weather information up to today at 00:00.
:param file_name: CSV file name containing weather data.
"""
# Load existing data
df_weather_existing = pd.read_csv(file_name)
# Find the last date available in the CSV
last_date_str = f'{df_weather_existing.iloc[-1]["관측시각"]}'
last_date = datetime.strptime(last_date_str, '%Y%m%d%H%M')
# Get today's date at 00:00
today_00 = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
print(today_00)
print(last_date)
# If the last date in CSV is not today's date, fetch data and append
if last_date < today_00:
start_date = (last_date + timedelta(hours=1)).strftime('%Y%m%d%H%M')
end_date = today_00.strftime('%Y%m%d%H%M')
date_lists = generate_dates(start_date, end_date)
df_weather_new = pd.DataFrame()
k = 24
for i, date in enumerate(range(0, len(date_lists) - 1, k)):
if date + k <= len(date_lists):
end_day = date + k
else:
end_day = len(date_lists) - 1
text = call_administration_of_weather_api(date_lists[date], date_lists[end_day])
buffer = StringIO(text)
df = pd.read_csv(buffer, skiprows=2, skipfooter=1, sep=r"\s+", header=None, index_col=False,
engine="python").iloc[2:, :-1]
df = df.set_axis(weather_api_columns, axis=1, inplace=False)
if not check_missing(df): # 빠진 정보가 있다면
print("api is not working !")
return {
"responses" : 500
}
df_weather_new = pd.concat([df_weather_new, df], ignore_index=True)
print(f"{i}/{len(range(0, len(date_lists) - 1, k)) - 1}")
# Append the new data to the existing data
df_weather_updated = pd.concat([df_weather_existing, df_weather_new], ignore_index=True)
df_weather_updated = duplicate_check(df_weather_updated)
df_weather_updated.to_csv(file_name, index=False)
else:
print("Weather data is already up-to-date!")
if __name__ == "__main__":
file = "202007010000_202308310000.csv"
update_weather_info_to_today(file)