import requests from datetime import datetime import os import logging logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s',level=logging.DEBUG) logger = logging.getLogger(__name__) # OpenWeather API settings OPENWEATHER_API_KEY = os.getenv('OPENWEATHER_API_KEY') LOCATION_ID = os.getenv('LOCATION_ID', '3081368') BASE_URL = "https://api.openweathermap.org/data/2.5/" FLIPDOT_API = os.getenv('FLIPDOT_API', 'http://127.0.0.1/') FLIPDOT_SLOTS = int(os.getenv('FLIPDOT_SLOTS', 15)) LUFTDATEN_CLOSEST_SENSOR = os.getenv('LUFTDATEN_CLOSEST_SENSOR', '84121') DAYS = os.getenv('DAYS', 3) # Icon mapping from API's "icon" field ICON_MAPPING = { "01d": "sun", "01n": "moon", "02d": "cloud_sun", "02n": "cloud_moon", "03d": "cloud", "03n": "cloud", "04d": "clouds", "04n": "clouds", "09d": "rain1", "09n": "rain1", "10d": "rain2", "10n": "rain2", "11d": "lightning", "11n": "lightning", "13d": "snow", "13n": "snow", "50d": "mist", "50n": "mist", "unknown": "wtf" } def evenly_distribute(data, target_slots): ## TODO - move to utils num_elements = len(data) if num_elements == 0: raise ValueError("Data array cannot be empty.") base_repeats = target_slots // num_elements remainder = target_slots % num_elements if remainder > 0: max_repeats = base_repeats else: max_repeats = base_repeats result = [] for item in data: result.extend([item] * max_repeats) return result def clean(): send_post('actions/clear_pages') def send_post(endpoint,payload = {}, data = {}): headers = { 'accept': 'application/json', } uri = (FLIPDOT_API+endpoint).replace('\r', '').replace('\n', '') print(uri) requests.post( uri, headers=headers, json = payload, params=data ) # Helper function to map API's icon to your icon set def get_icon(api_icon): return ICON_MAPPING.get(api_icon, ICON_MAPPING["unknown"]) def fetch_luftdaten_data(): url = "https://data.sensor.community/airrohr/v1/sensor/{}/".format(LUFTDATEN_CLOSEST_SENSOR).replace('\r', '').replace('\n', '') response = requests.get(url) return response.json() # Fetch current weather def fetch_current_weather(): url = "{}weather?id={}&appid={}&units=metric".format(BASE_URL, LOCATION_ID, OPENWEATHER_API_KEY).replace('\r', '').replace('\n', '') response = requests.get(url) return response.json() # Fetch hourly forecast def fetch_hourly_forecast(): url = "{}forecast?id={}&appid={}&units=metric".format(BASE_URL, LOCATION_ID, OPENWEATHER_API_KEY).replace('\r', '').replace('\n', '') response = requests.get(url) return response.json() def create_payload(icon, text1="", text2=None, font="(5) LITERY 112X17", invert=False, auto_break=False, align="center"): result = { "addition": { "addition_type": "icon", "invert": invert, "icon": icon }, "lines": [] } if text1: result["lines"].append({ "text": text1, "font": font, "invert": invert, "auto_break": auto_break, "align": "center" }) if text2: result["lines"].append({ "text": text2, "font": font, "invert": invert, "auto_break": auto_break, "align": "center" }) return result # Generate payload for the flipdot display def generate_payload(current_weather, forecast): payloads = [] # Current weather payload temp = round(current_weather["main"]["temp"]) weather_icon = current_weather["weather"][0]["icon"] weather_main = current_weather["weather"][0]["main"] wind_speed = round(current_weather["wind"]["speed"] * 3.6) # Convert m/s to km/h wind_dir = current_weather["wind"]["deg"] pm25 = fetch_luftdaten_data()[0]['sensordatavalues'][0]['value'] pm10 = fetch_luftdaten_data()[0]['sensordatavalues'][1]['value'] if "rain" in current_weather.keys(): rain = current_weather["rain"]["1h"] else: rain = 0 if "snow" in current_weather.keys(): snow = current_weather["snow"]["1h"] else: snow = 0 payloads.append(create_payload(get_icon(weather_icon), f"NOW: {temp} st.C - {weather_main}", f"WIND: {wind_speed}KM/H {wind_dir}°")) payloads.append(create_payload("mist", f"PM2.5: {pm25} ug/m3", f"PM10: {pm10} ug/m3")) if rain > 0: payloads.append(create_payload("rain2", f"NOW: RAIN: {rain} mm", "IN 3H")) if snow > 0: payloads.append(create_payload("snow", f"NOW: SNOW: {snow} mm", "IN 3H")) payloads.append(create_payload('mist', f"PM2.5: {pm25} ug/m3", f"PM10: {pm10} ug/m3")) # Hourly forecast payloads for entry in forecast["list"][:DAYS]: dt = datetime.fromtimestamp(entry["dt"]).strftime("%H:%M") temp = round(entry["main"]["temp"]) weather_icon = entry["weather"][0]["icon"] weather_main = entry["weather"][0]["main"] wind_speed = round(entry["wind"]["speed"] * 3.6) wind_dir = entry["wind"]["deg"] rain = entry['rain']['3h'] if 'rain' in entry.keys() else 0 snow = entry['snow']['3h'] if 'snow' in entry.keys() else 0 payloads.append(create_payload(get_icon(weather_icon), f"{dt}: {temp} st.C - {weather_main}", f"WIND: {wind_speed}KM/H {wind_dir}°")) if rain > 0: payloads.append(create_payload("rain2", f"{dt}: RAIN: {rain} mm", "IN 3H")) if snow > 0: payloads.append(create_payload("snow", f"{dt}: SNOW: {snow} mm", "IN 3H")) return evenly_distribute(payloads, FLIPDOT_SLOTS) # Main function def main(): current_weather = fetch_current_weather() forecast = fetch_hourly_forecast() payloads = generate_payload(current_weather, forecast) clean() for i, payload in enumerate(payloads): logger.debug(payload) logger.info(f"Sending frame {i+1} of {len(payloads)} - text: {payload['lines'][0]['text']}") send_post('display/complex', payload=payload,data={'page':i}) if __name__ == "__main__": main()