https://github.com/drone-skynet/server
이전에 진행했던 다중 무인 드론 중고거래 배송 시스템에서 기상청 API 활용 방법에 대해 소개하고자 한다!
우리는 기상청 API를 경로 탐색 서버(Flask)의 경로 탐색 모듈에 탑재하였다. 이 모듈은 기상청 Open API를 활용하여 실시간 기상 데이터를 수집하고, 드론의 경로 탐색 알고리즘에 반영하여 동적 경로 조정을 수행한다. API에서 제공받은 강수 여부, 풍향, 풍속 등의 데이터를 기반으로 드론 비행 가능 여부를 판단하고 최적의 경로를 결정한다.
만약, API 키에 대한 개념이 없다면, 아래의 포스팅을 먼저 보는 것을 추천한다!
https://claremont.tistory.com/entry/%EC%9B%B9-%EC%A7%80%EC%8B%9D-API-%ED%82%A4Key%EB%9E%80
1. 기상 데이터 수집 코드 (weather_api.py)
get_station_weather 함수는 기상청 Open API를 호출하여 지정된 지역의 실시간 기상 정보를 가져온다
주요 데이터는 비 유무(is_raining), 풍향(wind_direction), 풍속(wind_speed)
import requests
from datetime import datetime, timedelta
import os
from dotenv import load_dotenv
load_dotenv()
def get_station_weather(station):
url = "http://apis.data.go.kr/1360000/VilageFcstInfoService_2.0/getUltraSrtNcst"
service_key = os.getenv('WEATHER_API_KEY')
now = datetime.now()
# 자정일 경우, 전날 데이터 사용
if now.hour == 0:
base_date = (now - timedelta(days=1)).strftime('%Y%m%d')
base_time = '2300'
else:
base_date = now.strftime('%Y%m%d')
base_time = f'{now.hour:02d}00'
params = {
'serviceKey': service_key,
'numOfRows': 10,
'pageNo': 1,
'dataType': 'JSON',
'base_date': base_date,
'base_time': base_time,
'nx': station.grid_x,
'ny': station.grid_y
}
try:
res = requests.get(url=url, params=params)
data = res.json()['response']['body']['items']['item']
weather_info = {
'is_raining': False,
'wind_direction': 0,
'wind_speed': 0
}
for item in data:
if item['category'] == 'PTY': # 강수 형태
weather_info['is_raining'] = int(item['obsrValue']) > 0
elif item['category'] == 'VEC': # 풍향
weather_info['wind_direction'] = float(item['obsrValue'])
elif item['category'] == 'WSD': # 풍속
weather_info['wind_speed'] = float(item['obsrValue'])
return weather_info
except Exception as e:
print(f"날씨 정보 조회 실패: {e}")
return {'is_raining': False, 'wind_direction': 0, 'wind_speed': 0}
주요 포인트:
- API 호출 시 자정에는 전날 데이터를 사용하는 로직이 포함됨
- PTY(강수 형태), VEC(풍향), WSD(풍속) 데이터를 파싱하여 JSON 객체로 반환
- 환경 변수(.env) 파일에서 API 키를 가져옴
2. 기상 데이터를 활용한 경로 탐색 조정 (path_planning.py)
check_weather_thread는 기상 데이터를 정기적으로 업데이트하며, 역(station)과 연결된 간선(edge)의 비행 가능성을 판단한다
def check_weather_thread():
while True:
try:
for station in stations:
station_weather = get_station_weather(station)
station.is_flyable = not station_weather['is_raining']
for edge in edges:
if edge.origin.is_flyable and edge.destination.is_flyable:
edge.weight = edge.distance
else:
edge.weight = float('inf') # 비행 불가 간선은 가중치를 무한대로 설정
make_graph()
time.sleep(3600) # 매 1시간 간격으로 업데이트
except Exception as e:
print(f"기상 데이터 갱신 실패: {e}")
time.sleep(60) # 오류 발생 시 1분 후 재시도
주요 포인트:
- 각 역(station)의 비행 가능 여부는 is_flyable 속성에 저장
- 역 또는 간선이 비행 불가 상태일 경우, 해당 간선의 가중치를 무한대로 설정하여 경로 탐색에서 제외
- 1시간마다 데이터를 업데이트하며, API 오류 발생 시 재시도 로직 포함
3. 기상 데이터와 경로 탐색 알고리즘 통합
휴리스틱 함수 수정 - 기상 데이터를 반영하여 강수량 및 풍속에 따라 가중치를 추가하거나 경로를 우회함
def heuristic(n1, n2):
distance = haversine([n1.latitude, n1.longitude], [n2.latitude, n2.longitude])
weather_penalty = 0.5 if not n1.is_flyable or not n2.is_flyable else 0
return distance + weather_penalty
'웹 > 웹 개발' 카테고리의 다른 글
[웹 개발] 캐싱 문제와 시크릿 탭 해결 방법의 이유 (1) | 2025.01.10 |
---|---|
[웹 개발] IPv4 0.0.0.0란?(feat. 로컬 환경에서의 웹 모바일 접속) (1) | 2024.12.22 |
[웹 개발] API 명세서 작성 tip (4) | 2024.10.06 |
[웹 개발] 데이터 사전의 테이블 정의(ERD 설계) (1) | 2024.10.05 |