diff --git a/sender_worker.go b/sender_worker.go index 1eca48d..7cb1977 100644 --- a/sender_worker.go +++ b/sender_worker.go @@ -22,11 +22,12 @@ func tg_sender_worker(tg_events <-chan GenericMessage, s Sender, wg *sync.WaitGr waitingStarted := false timeoutEvents := make(chan interface{}) +loop: for { select { case ev, ok := <-tg_events: if !ok { - return + break loop } // Collect all messages to send them at once _, messageBuilderExists := messagesToSend[ev.chat_id] @@ -52,13 +53,15 @@ func tg_sender_worker(tg_events <-chan GenericMessage, s Sender, wg *sync.WaitGr } delete(messagesToSend, chat_id) } - if tg_events == nil { - close(timeoutEvents) - } } + } - if tg_events == nil && timeoutEvents == nil { - return + // If anything is left to be sent, send it now + for chat_id, msgBuilder := range messagesToSend { + err := s.Send(GenericMessage{chat_id, msgBuilder.String()}) + if err != nil { + // TODO: handle it better + panic(err) } } } diff --git a/sender_worker_test.go b/sender_worker_test.go index 6e3d0d9..dad67dd 100644 --- a/sender_worker_test.go +++ b/sender_worker_test.go @@ -27,12 +27,9 @@ func TestMessageThrottling(t *testing.T) { testEvents <- GenericMessage{124, "test4"} time.Sleep(time.Millisecond * 10) testEvents <- GenericMessage{123, "test5"} - time.Sleep(time.Millisecond * 10) - close(testEvents) + wg.Wait() assert.Len(t, mockSender.messages, 3) assert.Contains(t, mockSender.messages, GenericMessage{123, "test1\ntest2\n"}) - - wg.Wait() }