HSWroFlipdotWeather/app.py

188 lines
5.9 KiB
Python

import requests
from datetime import datetime
import os
import logging
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s',level=logging.DEBUG)
logger = logging.getLogger(__name__)
# OpenWeather API settings
OPENWEATHER_API_KEY = os.getenv('OPENWEATHER_API_KEY')
LOCATION_ID = os.getenv('LOCATION_ID', '3081368')
BASE_URL = "https://api.openweathermap.org/data/2.5/"
FLIPDOT_API = os.getenv('FLIPDOT_API', 'http://127.0.0.1/')
FLIPDOT_SLOTS = int(os.getenv('FLIPDOT_SLOTS', 15))
LUFTDATEN_CLOSEST_SENSOR = os.getenv('LUFTDATEN_CLOSEST_SENSOR', '84121')
DAYS = os.getenv('DAYS', 3)
# Icon mapping from API's "icon" field
ICON_MAPPING = {
"01d": "sun",
"01n": "moon",
"02d": "cloud_sun",
"02n": "cloud_moon",
"03d": "cloud",
"03n": "cloud",
"04d": "clouds",
"04n": "clouds",
"09d": "rain1",
"09n": "rain1",
"10d": "rain2",
"10n": "rain2",
"11d": "lightning",
"11n": "lightning",
"13d": "snow",
"13n": "snow",
"50d": "mist",
"50n": "mist",
"unknown": "wtf"
}
def evenly_distribute(data, target_slots): ## TODO - move to utils
num_elements = len(data)
if num_elements == 0:
raise ValueError("Data array cannot be empty.")
base_repeats = target_slots // num_elements
remainder = target_slots % num_elements
if remainder > 0:
max_repeats = base_repeats
else:
max_repeats = base_repeats
result = []
for item in data:
result.extend([item] * max_repeats)
return result
def clean():
send_post('actions/clear_pages')
def send_post(endpoint,payload = {}, data = {}):
headers = {
'accept': 'application/json',
}
uri = (FLIPDOT_API+endpoint).replace('\r', '').replace('\n', '')
print(uri)
requests.post( uri, headers=headers, json = payload, params=data )
# Helper function to map API's icon to your icon set
def get_icon(api_icon):
return ICON_MAPPING.get(api_icon, ICON_MAPPING["unknown"])
def fetch_luftdaten_data():
url = "https://data.sensor.community/airrohr/v1/sensor/{}/".format(LUFTDATEN_CLOSEST_SENSOR).replace('\r', '').replace('\n', '')
response = requests.get(url)
return response.json()
# Fetch current weather
def fetch_current_weather():
url = "{}weather?id={}&appid={}&units=metric".format(BASE_URL, LOCATION_ID, OPENWEATHER_API_KEY).replace('\r', '').replace('\n', '')
response = requests.get(url)
return response.json()
# Fetch hourly forecast
def fetch_hourly_forecast():
url = "{}forecast?id={}&appid={}&units=metric".format(BASE_URL, LOCATION_ID, OPENWEATHER_API_KEY).replace('\r', '').replace('\n', '')
response = requests.get(url)
return response.json()
def create_payload(icon, text1="", text2=None, font="(5) LITERY 112X17", invert=False, auto_break=False, align="center"):
result = {
"addition": {
"addition_type": "icon",
"invert": invert,
"icon": icon
},
"lines": []
}
if text1:
result["lines"].append({
"text": text1,
"font": font,
"invert": invert,
"auto_break": auto_break,
"align": "center"
})
if text2:
result["lines"].append({
"text": text2,
"font": font,
"invert": invert,
"auto_break": auto_break,
"align": "center"
})
return result
# Generate payload for the flipdot display
def generate_payload(current_weather, forecast):
payloads = []
# Current weather payload
temp = round(current_weather["main"]["temp"])
weather_icon = current_weather["weather"][0]["icon"]
weather_main = current_weather["weather"][0]["main"]
wind_speed = round(current_weather["wind"]["speed"] * 3.6) # Convert m/s to km/h
wind_dir = current_weather["wind"]["deg"]
pm25 = fetch_luftdaten_data()[0]['sensordatavalues'][0]['value']
pm10 = fetch_luftdaten_data()[0]['sensordatavalues'][1]['value']
if "rain" in current_weather.keys():
rain = current_weather["rain"]["1h"]
else:
rain = 0
if "snow" in current_weather.keys():
snow = current_weather["snow"]["1h"]
else:
snow = 0
payloads.append(create_payload(get_icon(weather_icon), f"NOW: {temp} st.C - {weather_main}", f"WIND: {wind_speed}KM/H {wind_dir}°"))
payloads.append(create_payload("mist", f"PM2.5: {pm25} ug/m3", f"PM10: {pm10} ug/m3"))
if rain > 0:
payloads.append(create_payload("rain2", f"NOW: RAIN: {rain} mm", "IN 3H"))
if snow > 0:
payloads.append(create_payload("snow", f"NOW: SNOW: {snow} mm", "IN 3H"))
payloads.append(create_payload('mist', f"PM2.5: {pm25} ug/m3", f"PM10: {pm10} ug/m3"))
# Hourly forecast payloads
for entry in forecast["list"][:DAYS]:
dt = datetime.fromtimestamp(entry["dt"]).strftime("%H:%M")
temp = round(entry["main"]["temp"])
weather_icon = entry["weather"][0]["icon"]
weather_main = entry["weather"][0]["main"]
wind_speed = round(entry["wind"]["speed"] * 3.6)
wind_dir = entry["wind"]["deg"]
rain = entry['rain']['3h'] if 'rain' in entry.keys() else 0
snow = entry['snow']['3h'] if 'snow' in entry.keys() else 0
payloads.append(create_payload(get_icon(weather_icon), f"{dt}: {temp} st.C - {weather_main}", f"WIND: {wind_speed}KM/H {wind_dir}°"))
if rain > 0:
payloads.append(create_payload("rain2", f"{dt}: RAIN: {rain} mm", "IN 3H"))
if snow > 0:
payloads.append(create_payload("snow", f"{dt}: SNOW: {snow} mm", "IN 3H"))
return evenly_distribute(payloads, FLIPDOT_SLOTS)
# Main function
def main():
current_weather = fetch_current_weather()
forecast = fetch_hourly_forecast()
payloads = generate_payload(current_weather, forecast)
clean()
for i, payload in enumerate(payloads):
logger.debug(payload)
logger.info(f"Sending frame {i+1} of {len(payloads)} - text: {payload['lines'][0]['text']}")
send_post('display/complex', payload=payload,data={'page':i})
if __name__ == "__main__":
main()