diff --git a/main.go b/main.go index 7176f84..d868c7c 100644 --- a/main.go +++ b/main.go @@ -97,7 +97,8 @@ func main() { logger.Print("Created Telegram Bot API client") tgSender := TgSender{bot} - go tgSenderWorker(tgEvents, tgSender, &wg, sleeper, log.New(os.Stderr, "TgSender", log.Lmicroseconds)) + Consume(SendToTg(tgSenderWorker(tgEvents, &wg, sleeper, log.New(os.Stderr, "TgSender", log.Lmicroseconds)), + tgSender, &wg, log.New(os.Stderr, "SendToTg", log.Lmicroseconds))) for e := range FilterByLastSeen(s.Events, "hs_wro_last_seen.bin", log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)) { logger.Print("Received change from SATEL: ", e) sendTgMessage(tgEvents, fmt.Sprint("Change from SATEL: type:", e.Type, ", index:", e.Index, ", value:", e.Value), chatIds) diff --git a/sender_worker.go b/sender_worker.go index c0a7dc1..b563502 100644 --- a/sender_worker.go +++ b/sender_worker.go @@ -19,54 +19,79 @@ type Sleeper interface { Sleep(ch chan<- interface{}) } -func tgSenderWorker(tgEvents <-chan GenericMessage, s Sender, wg *sync.WaitGroup, sleeper Sleeper, logger *log.Logger) { +func Consume(events <-chan GenericMessage) { + go func() { + for range events { + } + }() +} + +func SendToTg(events <-chan GenericMessage, s Sender, wg *sync.WaitGroup, logger *log.Logger) <-chan GenericMessage { + returnEvents := make(chan GenericMessage) + + go func() { + wg.Add(1) + defer wg.Done() + for e := range events { + returnEvents <- e + err := s.Send(e) + if err != nil { + // TODO: handle it better + panic(err) + } + } + close(returnEvents) + }() + + return returnEvents +} + +func tgSenderWorker(tgEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper Sleeper, logger *log.Logger) <-chan GenericMessage { logger.Print("Starting") - wg.Add(1) - defer wg.Done() messagesToSend := make(map[int64]*strings.Builder) waitingStarted := false timeoutEvents := make(chan interface{}) + returnEvents := make(chan GenericMessage) -loop: - for { - select { - case ev, ok := <-tgEvents: - if !ok { - break loop - } - // Collect all messages to send them at once - _, messageBuilderExists := messagesToSend[ev.chatId] - if !messageBuilderExists { - messagesToSend[ev.chatId] = &strings.Builder{} - } - messagesToSend[ev.chatId].WriteString(ev.msg) - messagesToSend[ev.chatId].WriteRune('\n') - if !waitingStarted { - logger.Print("Waiting for more messages to arrive before sending...") - waitingStarted = true - sleeper.Sleep(timeoutEvents) - } - case <-timeoutEvents: - logger.Print("Time's up, sending all messages we've got for now.") - waitingStarted = false - for chatId, msgBuilder := range messagesToSend { - err := s.Send(GenericMessage{chatId, msgBuilder.String()}) - if err != nil { - // TODO: handle it better - panic(err) + go func() { + wg.Add(1) + defer wg.Done() + defer close(returnEvents) + loop: + for { + select { + case ev, ok := <-tgEvents: + if !ok { + break loop + } + // Collect all messages to send them at once + _, messageBuilderExists := messagesToSend[ev.chatId] + if !messageBuilderExists { + messagesToSend[ev.chatId] = &strings.Builder{} + } + messagesToSend[ev.chatId].WriteString(ev.msg) + messagesToSend[ev.chatId].WriteRune('\n') + if !waitingStarted { + logger.Print("Waiting for more messages to arrive before sending...") + waitingStarted = true + sleeper.Sleep(timeoutEvents) + } + case <-timeoutEvents: + logger.Print("Time's up, sending all messages we've got for now.") + waitingStarted = false + for chatId, msgBuilder := range messagesToSend { + returnEvents <- GenericMessage{chatId, msgBuilder.String()} + delete(messagesToSend, chatId) } - delete(messagesToSend, chatId) } } - } - logger.Print("Exiting") - // If anything is left to be sent, send it now - for chatId, msgBuilder := range messagesToSend { - err := s.Send(GenericMessage{chatId, msgBuilder.String()}) - if err != nil { - // TODO: handle it better - panic(err) + logger.Print("Exiting") + // If anything is left to be sent, send it now + for chatId, msgBuilder := range messagesToSend { + returnEvents <- GenericMessage{chatId, msgBuilder.String()} } - } + }() + + return returnEvents } diff --git a/sender_worker_test.go b/sender_worker_test.go index f04e2d7..658ad7f 100644 --- a/sender_worker_test.go +++ b/sender_worker_test.go @@ -35,7 +35,8 @@ func TestMessageThrottling(t *testing.T) { wg := sync.WaitGroup{} mockSender := MockSender{make([]GenericMessage, 0)} mockSleeper := MockSleeper{nil, 0} - go tgSenderWorker(testEvents, &mockSender, &wg, &mockSleeper, log.New(io.Discard, "", log.Ltime)) + Consume(SendToTg(tgSenderWorker(testEvents, &wg, &mockSleeper, log.New(io.Discard, "", log.Ltime)), + &mockSender, &wg, log.New(io.Discard, "", log.Ltime))) testEvents <- GenericMessage{123, "test1"} testEvents <- GenericMessage{124, "test3"} testEvents <- GenericMessage{123, "test2"}