From 6f0f78907cd31717fe0991bd6c8c6236e3aac807 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Rudowicz?= Date: Sun, 11 Feb 2024 00:13:31 +0100 Subject: [PATCH] More reorganizing, new throttling --- .build.yml | 2 +- .pre-commit-config.yaml | 2 +- go.mod | 7 ++++ go.sum | 10 ++++++ main.go | 71 ++++++++--------------------------------- sender_worker.go | 64 +++++++++++++++++++++++++++++++++++++ sender_worker_test.go | 38 ++++++++++++++++++++++ 7 files changed, 135 insertions(+), 59 deletions(-) create mode 100644 sender_worker.go create mode 100644 sender_worker_test.go diff --git a/.build.yml b/.build.yml index ef3d228..487c5c5 100644 --- a/.build.yml +++ b/.build.yml @@ -4,7 +4,7 @@ packages: tasks: - go-get: | cd hswro-alarm-bot - go get + go get -t - build-x86_64: | cd hswro-alarm-bot env GOOS=linux GOARCH=amd64 go build -o alarm_bot.x86-64 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0f4c955..f1d7437 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -12,4 +12,4 @@ repos: hooks: - id: go-fmt args: [ -w ] - - id: go-vet + - id: go-vet-mod diff --git a/go.mod b/go.mod index 66d99c1..0d057ac 100644 --- a/go.mod +++ b/go.mod @@ -5,4 +5,11 @@ go 1.19 require ( github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1 github.com/probakowski/go-satel v0.0.0-20211120120346-bed9818777ce + github.com/stretchr/testify v1.8.4 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 1985067..9022e64 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,20 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1 h1:wG8n/XJQ07TmjbITcGiUaOtXxdrINDz1b0J1w0SzqDc= github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1/go.mod h1:A2S0CWkNylc2phvKXWBBdD3K0iGnDBGbzRpISP2zBl8= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/probakowski/go-satel v0.0.0-20211120120346-bed9818777ce h1:V21PmRMDowz+5pA7zn0YTVLnoGEEFqj14UN6/f3zRiY= github.com/probakowski/go-satel v0.0.0-20211120120346-bed9818777ce/go.mod h1:q3DquDWRcoFWZ61dGZFg3snucolljixMoAzJIiCjWoY= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index 2fcf0de..1e7efe0 100644 --- a/main.go +++ b/main.go @@ -3,7 +3,6 @@ package main import ( "flag" "fmt" - "math" "net" "os" "strconv" @@ -16,68 +15,29 @@ import ( ) const ( - MaximumMessagesPerMinute = 2 - NanosecondsPerMinute = 6e10 + MessageNotMoreOftenThanSeconds = 15 ) -type TgEvent struct { - msg tgbotapi.MessageConfig +type TgSender struct { + bot *tgbotapi.BotAPI } -type SecurityEvent interface { - Execute(chat_ids []int64, tg_events chan TgEvent) +func (self TgSender) Send(msg GenericMessage) error { + to_send := tgbotapi.NewMessage(msg.chat_id, msg.msg) + _, err := self.bot.Send(to_send) + return err } -type SatelEvent struct { - ev satel.Event -} - -func (s SatelEvent) Execute(chat_ids []int64, tg_events chan TgEvent) { - fmt.Println("Change from SATEL: ", "type", s.ev.Type, "index", s.ev.Index, "value", s.ev.Value) +func send_tg_message(tg_events chan GenericMessage, msg string, chat_ids []int64) { for _, chat_id := range chat_ids { - msg := tgbotapi.NewMessage(chat_id, fmt.Sprintf("Change from SATEL: Zone: %d Type: %s Value: %t", - s.ev.Index, s.ev.Type, s.ev.Value)) - - send_tg_message(tg_events, msg) + tg_events <- GenericMessage{chat_id, msg} } } -type EmptyEvent struct{} - -func (s EmptyEvent) Execute(chat_ids []int64, tg_events chan TgEvent) {} - -func tg_sender_worker(tg_events <-chan TgEvent, bot *tgbotapi.BotAPI, wg *sync.WaitGroup) { - wg.Add(1) - defer wg.Done() - lastExecutionTime := time.Now() - minimumTimeBetweenEachExecution := time.Duration(math.Ceil(NanosecondsPerMinute / MaximumMessagesPerMinute)) - _, shouldOmitTg := os.LookupEnv("OMIT_TG") - - for ev := range tg_events { - defer func() { lastExecutionTime = time.Now() }() - - time.Sleep(minimumTimeBetweenEachExecution - time.Since(lastExecutionTime)) - fmt.Println("Will send to TG: ", ev.msg.Text) - if shouldOmitTg { - continue - } - _, err := bot.Send(ev.msg) - if err != nil { - // TODO: handle it better - panic(err) - } - } -} - -func send_tg_message(tg_events chan TgEvent, msg tgbotapi.MessageConfig) { - tg_events <- TgEvent{msg} -} - func main() { var ( - wg sync.WaitGroup - tg_events = make(chan TgEvent) - sec_events = make(chan SecurityEvent) + wg sync.WaitGroup + tg_events = make(chan GenericMessage) ) satel_api_addr := flag.String("satel-addr", "", "Address that should be used to connect to the SATEL device") satel_api_port := flag.String("satel-port", "7094", "Port that should be used to connect to the SATEL device") @@ -110,13 +70,10 @@ func main() { panic(err) } - go tg_sender_worker(tg_events, bot, &wg) + tgSender := TgSender{bot} + go tg_sender_worker(tg_events, tgSender, &wg, time.Second*15) for e, ok := <-s.Events; ok; e, ok = <-s.Events { - sec_events <- SatelEvent{e} - } - - for ev := range sec_events { - ev.Execute(chat_ids, tg_events) + send_tg_message(tg_events, fmt.Sprintln("Change from SATEL: ", "type", e.Type, "index", e.Index, "value", e.Value), chat_ids) } close(tg_events) diff --git a/sender_worker.go b/sender_worker.go new file mode 100644 index 0000000..1eca48d --- /dev/null +++ b/sender_worker.go @@ -0,0 +1,64 @@ +package main + +import ( + "strings" + "sync" + "time" +) + +type GenericMessage struct { + chat_id int64 + msg string +} + +type Sender interface { + Send(msg GenericMessage) error +} + +func tg_sender_worker(tg_events <-chan GenericMessage, s Sender, wg *sync.WaitGroup, messageNotMoreOftenThan time.Duration) { + wg.Add(1) + defer wg.Done() + messagesToSend := make(map[int64]*strings.Builder) + waitingStarted := false + timeoutEvents := make(chan interface{}) + + for { + select { + case ev, ok := <-tg_events: + if !ok { + return + } + // Collect all messages to send them at once + _, messageBuilderExists := messagesToSend[ev.chat_id] + if !messageBuilderExists { + messagesToSend[ev.chat_id] = &strings.Builder{} + } + messagesToSend[ev.chat_id].WriteString(ev.msg) + messagesToSend[ev.chat_id].WriteRune('\n') + if !waitingStarted { + waitingStarted = true + go func() { + time.Sleep(messageNotMoreOftenThan) + timeoutEvents <- nil + }() + } + case <-timeoutEvents: + waitingStarted = false + for chat_id, msgBuilder := range messagesToSend { + err := s.Send(GenericMessage{chat_id, msgBuilder.String()}) + if err != nil { + // TODO: handle it better + panic(err) + } + delete(messagesToSend, chat_id) + } + if tg_events == nil { + close(timeoutEvents) + } + } + + if tg_events == nil && timeoutEvents == nil { + return + } + } +} diff --git a/sender_worker_test.go b/sender_worker_test.go new file mode 100644 index 0000000..6e3d0d9 --- /dev/null +++ b/sender_worker_test.go @@ -0,0 +1,38 @@ +package main + +import ( + "github.com/stretchr/testify/assert" + "sync" + "testing" + "time" +) + +type MockSender struct { + messages []GenericMessage +} + +func (self *MockSender) Send(msg GenericMessage) error { + self.messages = append(self.messages, msg) + return nil +} + +func TestMessageThrottling(t *testing.T) { + testEvents := make(chan GenericMessage) + wg := sync.WaitGroup{} + mockSender := MockSender{make([]GenericMessage, 0)} + go tg_sender_worker(testEvents, &mockSender, &wg, time.Millisecond) + testEvents <- GenericMessage{123, "test1"} + testEvents <- GenericMessage{124, "test3"} + testEvents <- GenericMessage{123, "test2"} + testEvents <- GenericMessage{124, "test4"} + time.Sleep(time.Millisecond * 10) + testEvents <- GenericMessage{123, "test5"} + time.Sleep(time.Millisecond * 10) + + close(testEvents) + + assert.Len(t, mockSender.messages, 3) + assert.Contains(t, mockSender.messages, GenericMessage{123, "test1\ntest2\n"}) + + wg.Wait() +}