From f574369d0dcd2fb787c0518cad8d602ddac0fc39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Rudowicz?= Date: Fri, 9 Feb 2024 23:32:32 +0100 Subject: [PATCH] Message throttling, OMIT_TG --- README.md | 6 ++++- main.go | 77 +++++++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 65 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 136a250..b0b9e89 100644 --- a/README.md +++ b/README.md @@ -11,4 +11,8 @@ I didn't even test it yet so no idea if it works ``` $ TELEGRAM_APITOKEN=YOUR_API_TOKEN ./alarm_bot --satel_addr=127.0.0.1 --satel_port=31337 --tg_chat_id=YOUR_CHAT_ID_FROM_BOTFATHER -``` \ No newline at end of file +``` + +## Debugging + +Set the `OMIT_TG` environment variable to, well, omit sending anything over to Telegram and just see the logs instead. diff --git a/main.go b/main.go index 374239b..34313c2 100644 --- a/main.go +++ b/main.go @@ -1,18 +1,59 @@ package main import ( - "os" - "fmt" - "net" "flag" - "strings" + "fmt" + "math" + "net" + "os" "strconv" + "strings" + "sync" + "time" - "github.com/probakowski/go-satel" tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5" + "github.com/probakowski/go-satel" ) +const ( + MaximumMessagesPerMinute = 2 + NanosecondsPerMinute = 6e10 +) + +type TgEvent struct { + msg tgbotapi.MessageConfig +} + +func tg_sender_worker(tg_events <-chan TgEvent, bot *tgbotapi.BotAPI, wg *sync.WaitGroup) { + wg.Add(1) + defer wg.Done() + lastExecutionTime := time.Now() + minimumTimeBetweenEachExecution := time.Duration(math.Ceil(NanosecondsPerMinute / MaximumMessagesPerMinute)) + _, shouldOmitTg := os.LookupEnv("OMIT_TG") + + for ev := range tg_events { + defer func() { lastExecutionTime = time.Now() }() + + time.Sleep(minimumTimeBetweenEachExecution - time.Since(lastExecutionTime)) + fmt.Println("Will send to TG: ", ev.msg.Text) + if shouldOmitTg { + continue + } + _, err := bot.Send(ev.msg) + if err != nil { + // TODO: handle it better + panic(err) + } + } +} + +func send_tg_message(tg_events chan TgEvent, msg tgbotapi.MessageConfig) { + tg_events <- TgEvent{msg} +} + func main() { + var wg sync.WaitGroup + tg_events := make(chan TgEvent) satel_api_addr := flag.String("satel-addr", "", "Address that should be used to connect to the SATEL device") satel_api_port := flag.String("satel-port", "7094", "Port that should be used to connect to the SATEL device") chat_id_raw := flag.String("tg-chat-id", "", "Telegram Chat ID where to send updates. Use \",\" to specify multiple IDs.") @@ -40,20 +81,22 @@ func main() { } s := satel.NewConfig(satel_conn, satel.Config{EventsQueueSize: 1000}) bot, err := tgbotapi.NewBotAPI(os.Getenv("TELEGRAM_APITOKEN")) - if err != nil { - panic(err) - } + if err != nil { + panic(err) + } + + go tg_sender_worker(tg_events, bot, &wg) for e, ok := <-s.Events; ok; e, ok = <-s.Events { - fmt.Println("Change from SATEL: ", "type", e.Type, "index", e.Index, "value", e.Value) - for _, chat_id := range chat_ids { - msg := tgbotapi.NewMessage(chat_id, fmt.Sprintf("Change from SATEL: Zone: %d Type: %s Value: %t", - e.Index, e.Type, e.Value)) + fmt.Println("Change from SATEL: ", "type", e.Type, "index", e.Index, "value", e.Value) + for _, chat_id := range chat_ids { + msg := tgbotapi.NewMessage(chat_id, fmt.Sprintf("Change from SATEL: Zone: %d Type: %s Value: %t", + e.Index, e.Type, e.Value)) - if _, err := bot.Send(msg); err != nil { - // TODO: retry sending later in case of problems - panic(err) - } - } + send_tg_message(tg_events, msg) + } } + + close(tg_events) + wg.Wait() }