1
0
Fork 0

More reorganizing, new throttling

This commit is contained in:
Michał Rudowicz 2024-02-11 00:13:31 +01:00
parent 7f3b5a4abe
commit 6f0f78907c
7 changed files with 135 additions and 59 deletions

View File

@ -4,7 +4,7 @@ packages:
tasks: tasks:
- go-get: | - go-get: |
cd hswro-alarm-bot cd hswro-alarm-bot
go get go get -t
- build-x86_64: | - build-x86_64: |
cd hswro-alarm-bot cd hswro-alarm-bot
env GOOS=linux GOARCH=amd64 go build -o alarm_bot.x86-64 env GOOS=linux GOARCH=amd64 go build -o alarm_bot.x86-64

View File

@ -12,4 +12,4 @@ repos:
hooks: hooks:
- id: go-fmt - id: go-fmt
args: [ -w ] args: [ -w ]
- id: go-vet - id: go-vet-mod

7
go.mod
View File

@ -5,4 +5,11 @@ go 1.19
require ( require (
github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1 github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1
github.com/probakowski/go-satel v0.0.0-20211120120346-bed9818777ce github.com/probakowski/go-satel v0.0.0-20211120120346-bed9818777ce
github.com/stretchr/testify v1.8.4
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
) )

10
go.sum
View File

@ -1,10 +1,20 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1 h1:wG8n/XJQ07TmjbITcGiUaOtXxdrINDz1b0J1w0SzqDc= github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1 h1:wG8n/XJQ07TmjbITcGiUaOtXxdrINDz1b0J1w0SzqDc=
github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1/go.mod h1:A2S0CWkNylc2phvKXWBBdD3K0iGnDBGbzRpISP2zBl8= github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1/go.mod h1:A2S0CWkNylc2phvKXWBBdD3K0iGnDBGbzRpISP2zBl8=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/probakowski/go-satel v0.0.0-20211120120346-bed9818777ce h1:V21PmRMDowz+5pA7zn0YTVLnoGEEFqj14UN6/f3zRiY= github.com/probakowski/go-satel v0.0.0-20211120120346-bed9818777ce h1:V21PmRMDowz+5pA7zn0YTVLnoGEEFqj14UN6/f3zRiY=
github.com/probakowski/go-satel v0.0.0-20211120120346-bed9818777ce/go.mod h1:q3DquDWRcoFWZ61dGZFg3snucolljixMoAzJIiCjWoY= github.com/probakowski/go-satel v0.0.0-20211120120346-bed9818777ce/go.mod h1:q3DquDWRcoFWZ61dGZFg3snucolljixMoAzJIiCjWoY=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

69
main.go
View File

@ -3,7 +3,6 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"math"
"net" "net"
"os" "os"
"strconv" "strconv"
@ -16,68 +15,29 @@ import (
) )
const ( const (
MaximumMessagesPerMinute = 2 MessageNotMoreOftenThanSeconds = 15
NanosecondsPerMinute = 6e10
) )
type TgEvent struct { type TgSender struct {
msg tgbotapi.MessageConfig bot *tgbotapi.BotAPI
} }
type SecurityEvent interface { func (self TgSender) Send(msg GenericMessage) error {
Execute(chat_ids []int64, tg_events chan TgEvent) to_send := tgbotapi.NewMessage(msg.chat_id, msg.msg)
_, err := self.bot.Send(to_send)
return err
} }
type SatelEvent struct { func send_tg_message(tg_events chan GenericMessage, msg string, chat_ids []int64) {
ev satel.Event
}
func (s SatelEvent) Execute(chat_ids []int64, tg_events chan TgEvent) {
fmt.Println("Change from SATEL: ", "type", s.ev.Type, "index", s.ev.Index, "value", s.ev.Value)
for _, chat_id := range chat_ids { for _, chat_id := range chat_ids {
msg := tgbotapi.NewMessage(chat_id, fmt.Sprintf("Change from SATEL: Zone: %d Type: %s Value: %t", tg_events <- GenericMessage{chat_id, msg}
s.ev.Index, s.ev.Type, s.ev.Value))
send_tg_message(tg_events, msg)
} }
} }
type EmptyEvent struct{}
func (s EmptyEvent) Execute(chat_ids []int64, tg_events chan TgEvent) {}
func tg_sender_worker(tg_events <-chan TgEvent, bot *tgbotapi.BotAPI, wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
lastExecutionTime := time.Now()
minimumTimeBetweenEachExecution := time.Duration(math.Ceil(NanosecondsPerMinute / MaximumMessagesPerMinute))
_, shouldOmitTg := os.LookupEnv("OMIT_TG")
for ev := range tg_events {
defer func() { lastExecutionTime = time.Now() }()
time.Sleep(minimumTimeBetweenEachExecution - time.Since(lastExecutionTime))
fmt.Println("Will send to TG: ", ev.msg.Text)
if shouldOmitTg {
continue
}
_, err := bot.Send(ev.msg)
if err != nil {
// TODO: handle it better
panic(err)
}
}
}
func send_tg_message(tg_events chan TgEvent, msg tgbotapi.MessageConfig) {
tg_events <- TgEvent{msg}
}
func main() { func main() {
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
tg_events = make(chan TgEvent) tg_events = make(chan GenericMessage)
sec_events = make(chan SecurityEvent)
) )
satel_api_addr := flag.String("satel-addr", "", "Address that should be used to connect to the SATEL device") satel_api_addr := flag.String("satel-addr", "", "Address that should be used to connect to the SATEL device")
satel_api_port := flag.String("satel-port", "7094", "Port that should be used to connect to the SATEL device") satel_api_port := flag.String("satel-port", "7094", "Port that should be used to connect to the SATEL device")
@ -110,13 +70,10 @@ func main() {
panic(err) panic(err)
} }
go tg_sender_worker(tg_events, bot, &wg) tgSender := TgSender{bot}
go tg_sender_worker(tg_events, tgSender, &wg, time.Second*15)
for e, ok := <-s.Events; ok; e, ok = <-s.Events { for e, ok := <-s.Events; ok; e, ok = <-s.Events {
sec_events <- SatelEvent{e} send_tg_message(tg_events, fmt.Sprintln("Change from SATEL: ", "type", e.Type, "index", e.Index, "value", e.Value), chat_ids)
}
for ev := range sec_events {
ev.Execute(chat_ids, tg_events)
} }
close(tg_events) close(tg_events)

64
sender_worker.go Normal file
View File

@ -0,0 +1,64 @@
package main
import (
"strings"
"sync"
"time"
)
type GenericMessage struct {
chat_id int64
msg string
}
type Sender interface {
Send(msg GenericMessage) error
}
func tg_sender_worker(tg_events <-chan GenericMessage, s Sender, wg *sync.WaitGroup, messageNotMoreOftenThan time.Duration) {
wg.Add(1)
defer wg.Done()
messagesToSend := make(map[int64]*strings.Builder)
waitingStarted := false
timeoutEvents := make(chan interface{})
for {
select {
case ev, ok := <-tg_events:
if !ok {
return
}
// Collect all messages to send them at once
_, messageBuilderExists := messagesToSend[ev.chat_id]
if !messageBuilderExists {
messagesToSend[ev.chat_id] = &strings.Builder{}
}
messagesToSend[ev.chat_id].WriteString(ev.msg)
messagesToSend[ev.chat_id].WriteRune('\n')
if !waitingStarted {
waitingStarted = true
go func() {
time.Sleep(messageNotMoreOftenThan)
timeoutEvents <- nil
}()
}
case <-timeoutEvents:
waitingStarted = false
for chat_id, msgBuilder := range messagesToSend {
err := s.Send(GenericMessage{chat_id, msgBuilder.String()})
if err != nil {
// TODO: handle it better
panic(err)
}
delete(messagesToSend, chat_id)
}
if tg_events == nil {
close(timeoutEvents)
}
}
if tg_events == nil && timeoutEvents == nil {
return
}
}
}

38
sender_worker_test.go Normal file
View File

@ -0,0 +1,38 @@
package main
import (
"github.com/stretchr/testify/assert"
"sync"
"testing"
"time"
)
type MockSender struct {
messages []GenericMessage
}
func (self *MockSender) Send(msg GenericMessage) error {
self.messages = append(self.messages, msg)
return nil
}
func TestMessageThrottling(t *testing.T) {
testEvents := make(chan GenericMessage)
wg := sync.WaitGroup{}
mockSender := MockSender{make([]GenericMessage, 0)}
go tg_sender_worker(testEvents, &mockSender, &wg, time.Millisecond)
testEvents <- GenericMessage{123, "test1"}
testEvents <- GenericMessage{124, "test3"}
testEvents <- GenericMessage{123, "test2"}
testEvents <- GenericMessage{124, "test4"}
time.Sleep(time.Millisecond * 10)
testEvents <- GenericMessage{123, "test5"}
time.Sleep(time.Millisecond * 10)
close(testEvents)
assert.Len(t, mockSender.messages, 3)
assert.Contains(t, mockSender.messages, GenericMessage{123, "test1\ntest2\n"})
wg.Wait()
}