1
0
Fork 0

Message throttling, OMIT_TG

This commit is contained in:
Michał Rudowicz 2024-02-09 23:32:32 +01:00
parent 3937112ef9
commit f574369d0d
2 changed files with 65 additions and 18 deletions

View File

@ -12,3 +12,7 @@ I didn't even test it yet so no idea if it works
``` ```
$ TELEGRAM_APITOKEN=YOUR_API_TOKEN ./alarm_bot --satel_addr=127.0.0.1 --satel_port=31337 --tg_chat_id=YOUR_CHAT_ID_FROM_BOTFATHER $ TELEGRAM_APITOKEN=YOUR_API_TOKEN ./alarm_bot --satel_addr=127.0.0.1 --satel_port=31337 --tg_chat_id=YOUR_CHAT_ID_FROM_BOTFATHER
``` ```
## Debugging
Set the `OMIT_TG` environment variable to, well, omit sending anything over to Telegram and just see the logs instead.

77
main.go
View File

@ -1,18 +1,59 @@
package main package main
import ( import (
"os"
"fmt"
"net"
"flag" "flag"
"strings" "fmt"
"math"
"net"
"os"
"strconv" "strconv"
"strings"
"sync"
"time"
"github.com/probakowski/go-satel"
tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5" tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5"
"github.com/probakowski/go-satel"
) )
const (
MaximumMessagesPerMinute = 2
NanosecondsPerMinute = 6e10
)
type TgEvent struct {
msg tgbotapi.MessageConfig
}
func tg_sender_worker(tg_events <-chan TgEvent, bot *tgbotapi.BotAPI, wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
lastExecutionTime := time.Now()
minimumTimeBetweenEachExecution := time.Duration(math.Ceil(NanosecondsPerMinute / MaximumMessagesPerMinute))
_, shouldOmitTg := os.LookupEnv("OMIT_TG")
for ev := range tg_events {
defer func() { lastExecutionTime = time.Now() }()
time.Sleep(minimumTimeBetweenEachExecution - time.Since(lastExecutionTime))
fmt.Println("Will send to TG: ", ev.msg.Text)
if shouldOmitTg {
continue
}
_, err := bot.Send(ev.msg)
if err != nil {
// TODO: handle it better
panic(err)
}
}
}
func send_tg_message(tg_events chan TgEvent, msg tgbotapi.MessageConfig) {
tg_events <- TgEvent{msg}
}
func main() { func main() {
var wg sync.WaitGroup
tg_events := make(chan TgEvent)
satel_api_addr := flag.String("satel-addr", "", "Address that should be used to connect to the SATEL device") satel_api_addr := flag.String("satel-addr", "", "Address that should be used to connect to the SATEL device")
satel_api_port := flag.String("satel-port", "7094", "Port that should be used to connect to the SATEL device") satel_api_port := flag.String("satel-port", "7094", "Port that should be used to connect to the SATEL device")
chat_id_raw := flag.String("tg-chat-id", "", "Telegram Chat ID where to send updates. Use \",\" to specify multiple IDs.") chat_id_raw := flag.String("tg-chat-id", "", "Telegram Chat ID where to send updates. Use \",\" to specify multiple IDs.")
@ -40,20 +81,22 @@ func main() {
} }
s := satel.NewConfig(satel_conn, satel.Config{EventsQueueSize: 1000}) s := satel.NewConfig(satel_conn, satel.Config{EventsQueueSize: 1000})
bot, err := tgbotapi.NewBotAPI(os.Getenv("TELEGRAM_APITOKEN")) bot, err := tgbotapi.NewBotAPI(os.Getenv("TELEGRAM_APITOKEN"))
if err != nil { if err != nil {
panic(err) panic(err)
} }
go tg_sender_worker(tg_events, bot, &wg)
for e, ok := <-s.Events; ok; e, ok = <-s.Events { for e, ok := <-s.Events; ok; e, ok = <-s.Events {
fmt.Println("Change from SATEL: ", "type", e.Type, "index", e.Index, "value", e.Value) fmt.Println("Change from SATEL: ", "type", e.Type, "index", e.Index, "value", e.Value)
for _, chat_id := range chat_ids { for _, chat_id := range chat_ids {
msg := tgbotapi.NewMessage(chat_id, fmt.Sprintf("Change from SATEL: Zone: %d Type: %s Value: %t", msg := tgbotapi.NewMessage(chat_id, fmt.Sprintf("Change from SATEL: Zone: %d Type: %s Value: %t",
e.Index, e.Type, e.Value)) e.Index, e.Type, e.Value))
if _, err := bot.Send(msg); err != nil { send_tg_message(tg_events, msg)
// TODO: retry sending later in case of problems }
panic(err)
}
}
} }
close(tg_events)
wg.Wait()
} }