diff --git a/main.go b/main.go index d868c7c..b27e41b 100644 --- a/main.go +++ b/main.go @@ -24,15 +24,28 @@ type TgSender struct { } func (self TgSender) Send(msg GenericMessage) error { - toSend := tgbotapi.NewMessage(msg.chatId, msg.msg) - _, err := self.bot.Send(toSend) - return err + chatIds := msg.chatIds.GetTgIds() + if chatIds == nil { + return nil + } + b := strings.Builder{} + for _, msg := range msg.msgs { + b.WriteString(msg.String()) + b.WriteRune('\n') + } + message := b.String() + for _, chatId := range *chatIds { + toSend := tgbotapi.NewMessage(chatId, message) + _, err := self.bot.Send(toSend) + if err != nil { + return err + } + } + return nil } -func sendTgMessage(tgEvents chan GenericMessage, msg string, chatIds []int64) { - for _, chatId := range chatIds { - tgEvents <- GenericMessage{chatId, msg} - } +func sendTgMessage(tgEvents chan GenericMessage, msg string, chatId int64) { + tgEvents <- GenericMessage{TgChatId{chatId}, []MsgContent{MsgContent{msg}}} } type RealSleeper struct { @@ -101,7 +114,7 @@ func main() { tgSender, &wg, log.New(os.Stderr, "SendToTg", log.Lmicroseconds))) for e := range FilterByLastSeen(s.Events, "hs_wro_last_seen.bin", log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)) { logger.Print("Received change from SATEL: ", e) - sendTgMessage(tgEvents, fmt.Sprint("Change from SATEL: type:", e.Type, ", index:", e.Index, ", value:", e.Value), chatIds) + sendTgMessage(tgEvents, fmt.Sprint("Change from SATEL: type:", e.Type, ", index:", e.Index, ", value:", e.Value), chatIds[0]) } close(tgEvents) diff --git a/sender_worker.go b/sender_worker.go index b563502..1f16586 100644 --- a/sender_worker.go +++ b/sender_worker.go @@ -2,15 +2,9 @@ package main import ( "log" - "strings" "sync" ) -type GenericMessage struct { - chatId int64 - msg string -} - type Sender interface { Send(msg GenericMessage) error } @@ -19,6 +13,18 @@ type Sleeper interface { Sleep(ch chan<- interface{}) } +type ChatId interface { + GetTgIds() *[]int64 +} + +type TgChatId struct { + tgChatId int64 +} + +func (self TgChatId) GetTgIds() *[]int64 { + return &[]int64{self.tgChatId} +} + func Consume(events <-chan GenericMessage) { go func() { for range events { @@ -26,6 +32,17 @@ func Consume(events <-chan GenericMessage) { }() } +type MsgContent struct { + msg string +} + +func (self MsgContent) String() string { return self.msg } + +type GenericMessage struct { + chatIds ChatId + msgs []MsgContent +} + func SendToTg(events <-chan GenericMessage, s Sender, wg *sync.WaitGroup, logger *log.Logger) <-chan GenericMessage { returnEvents := make(chan GenericMessage) @@ -48,7 +65,7 @@ func SendToTg(events <-chan GenericMessage, s Sender, wg *sync.WaitGroup, logger func tgSenderWorker(tgEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper Sleeper, logger *log.Logger) <-chan GenericMessage { logger.Print("Starting") - messagesToSend := make(map[int64]*strings.Builder) + messagesToSend := make(map[ChatId][]MsgContent) waitingStarted := false timeoutEvents := make(chan interface{}) returnEvents := make(chan GenericMessage) @@ -65,12 +82,11 @@ func tgSenderWorker(tgEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper break loop } // Collect all messages to send them at once - _, messageBuilderExists := messagesToSend[ev.chatId] + _, messageBuilderExists := messagesToSend[ev.chatIds] if !messageBuilderExists { - messagesToSend[ev.chatId] = &strings.Builder{} + messagesToSend[ev.chatIds] = make([]MsgContent, 0) } - messagesToSend[ev.chatId].WriteString(ev.msg) - messagesToSend[ev.chatId].WriteRune('\n') + messagesToSend[ev.chatIds] = append(messagesToSend[ev.chatIds], ev.msgs...) if !waitingStarted { logger.Print("Waiting for more messages to arrive before sending...") waitingStarted = true @@ -79,8 +95,8 @@ func tgSenderWorker(tgEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper case <-timeoutEvents: logger.Print("Time's up, sending all messages we've got for now.") waitingStarted = false - for chatId, msgBuilder := range messagesToSend { - returnEvents <- GenericMessage{chatId, msgBuilder.String()} + for chatId, msgs := range messagesToSend { + returnEvents <- GenericMessage{chatId, msgs} delete(messagesToSend, chatId) } } @@ -88,8 +104,8 @@ func tgSenderWorker(tgEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper logger.Print("Exiting") // If anything is left to be sent, send it now - for chatId, msgBuilder := range messagesToSend { - returnEvents <- GenericMessage{chatId, msgBuilder.String()} + for chatId, msgs := range messagesToSend { + returnEvents <- GenericMessage{chatId, msgs} } }() diff --git a/sender_worker_test.go b/sender_worker_test.go index 658ad7f..3c87e5b 100644 --- a/sender_worker_test.go +++ b/sender_worker_test.go @@ -37,18 +37,18 @@ func TestMessageThrottling(t *testing.T) { mockSleeper := MockSleeper{nil, 0} Consume(SendToTg(tgSenderWorker(testEvents, &wg, &mockSleeper, log.New(io.Discard, "", log.Ltime)), &mockSender, &wg, log.New(io.Discard, "", log.Ltime))) - testEvents <- GenericMessage{123, "test1"} - testEvents <- GenericMessage{124, "test3"} - testEvents <- GenericMessage{123, "test2"} - testEvents <- GenericMessage{124, "test4"} + testEvents <- GenericMessage{TgChatId{123}, []MsgContent{MsgContent{"test1"}}} + testEvents <- GenericMessage{TgChatId{124}, []MsgContent{MsgContent{"test3"}}} + testEvents <- GenericMessage{TgChatId{123}, []MsgContent{MsgContent{"test2"}}} + testEvents <- GenericMessage{TgChatId{124}, []MsgContent{MsgContent{"test4"}}} assert.Equal(t, 1, mockSleeper.callCount) *mockSleeper.ch <- nil assert.Equal(t, 1, mockSleeper.callCount) - testEvents <- GenericMessage{123, "test5"} + testEvents <- GenericMessage{TgChatId{123}, []MsgContent{MsgContent{"test5"}}} close(testEvents) wg.Wait() assert.Equal(t, 2, mockSleeper.callCount) assert.Len(t, mockSender.messages, 3) - assert.Contains(t, mockSender.messages, GenericMessage{123, "test1\ntest2\n"}) + assert.Contains(t, mockSender.messages, GenericMessage{TgChatId{123}, []MsgContent{MsgContent{"test1"}, MsgContent{"test2"}}}) }