1
0
Fork 0

Split sending into chained goroutines

This commit is contained in:
Michał Rudowicz 2024-02-18 01:44:10 +01:00
parent 8c8b7c4f9b
commit e776813230
3 changed files with 69 additions and 42 deletions

View File

@ -97,7 +97,8 @@ func main() {
logger.Print("Created Telegram Bot API client") logger.Print("Created Telegram Bot API client")
tgSender := TgSender{bot} tgSender := TgSender{bot}
go tgSenderWorker(tgEvents, tgSender, &wg, sleeper, log.New(os.Stderr, "TgSender", log.Lmicroseconds)) Consume(SendToTg(tgSenderWorker(tgEvents, &wg, sleeper, log.New(os.Stderr, "TgSender", log.Lmicroseconds)),
tgSender, &wg, log.New(os.Stderr, "SendToTg", log.Lmicroseconds)))
for e := range FilterByLastSeen(s.Events, "hs_wro_last_seen.bin", log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)) { for e := range FilterByLastSeen(s.Events, "hs_wro_last_seen.bin", log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)) {
logger.Print("Received change from SATEL: ", e) logger.Print("Received change from SATEL: ", e)
sendTgMessage(tgEvents, fmt.Sprint("Change from SATEL: type:", e.Type, ", index:", e.Index, ", value:", e.Value), chatIds) sendTgMessage(tgEvents, fmt.Sprint("Change from SATEL: type:", e.Type, ", index:", e.Index, ", value:", e.Value), chatIds)

View File

@ -19,54 +19,79 @@ type Sleeper interface {
Sleep(ch chan<- interface{}) Sleep(ch chan<- interface{})
} }
func tgSenderWorker(tgEvents <-chan GenericMessage, s Sender, wg *sync.WaitGroup, sleeper Sleeper, logger *log.Logger) { func Consume(events <-chan GenericMessage) {
go func() {
for range events {
}
}()
}
func SendToTg(events <-chan GenericMessage, s Sender, wg *sync.WaitGroup, logger *log.Logger) <-chan GenericMessage {
returnEvents := make(chan GenericMessage)
go func() {
wg.Add(1)
defer wg.Done()
for e := range events {
returnEvents <- e
err := s.Send(e)
if err != nil {
// TODO: handle it better
panic(err)
}
}
close(returnEvents)
}()
return returnEvents
}
func tgSenderWorker(tgEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper Sleeper, logger *log.Logger) <-chan GenericMessage {
logger.Print("Starting") logger.Print("Starting")
wg.Add(1)
defer wg.Done()
messagesToSend := make(map[int64]*strings.Builder) messagesToSend := make(map[int64]*strings.Builder)
waitingStarted := false waitingStarted := false
timeoutEvents := make(chan interface{}) timeoutEvents := make(chan interface{})
returnEvents := make(chan GenericMessage)
loop: go func() {
for { wg.Add(1)
select { defer wg.Done()
case ev, ok := <-tgEvents: defer close(returnEvents)
if !ok { loop:
break loop for {
} select {
// Collect all messages to send them at once case ev, ok := <-tgEvents:
_, messageBuilderExists := messagesToSend[ev.chatId] if !ok {
if !messageBuilderExists { break loop
messagesToSend[ev.chatId] = &strings.Builder{} }
} // Collect all messages to send them at once
messagesToSend[ev.chatId].WriteString(ev.msg) _, messageBuilderExists := messagesToSend[ev.chatId]
messagesToSend[ev.chatId].WriteRune('\n') if !messageBuilderExists {
if !waitingStarted { messagesToSend[ev.chatId] = &strings.Builder{}
logger.Print("Waiting for more messages to arrive before sending...") }
waitingStarted = true messagesToSend[ev.chatId].WriteString(ev.msg)
sleeper.Sleep(timeoutEvents) messagesToSend[ev.chatId].WriteRune('\n')
} if !waitingStarted {
case <-timeoutEvents: logger.Print("Waiting for more messages to arrive before sending...")
logger.Print("Time's up, sending all messages we've got for now.") waitingStarted = true
waitingStarted = false sleeper.Sleep(timeoutEvents)
for chatId, msgBuilder := range messagesToSend { }
err := s.Send(GenericMessage{chatId, msgBuilder.String()}) case <-timeoutEvents:
if err != nil { logger.Print("Time's up, sending all messages we've got for now.")
// TODO: handle it better waitingStarted = false
panic(err) for chatId, msgBuilder := range messagesToSend {
returnEvents <- GenericMessage{chatId, msgBuilder.String()}
delete(messagesToSend, chatId)
} }
delete(messagesToSend, chatId)
} }
} }
}
logger.Print("Exiting") logger.Print("Exiting")
// If anything is left to be sent, send it now // If anything is left to be sent, send it now
for chatId, msgBuilder := range messagesToSend { for chatId, msgBuilder := range messagesToSend {
err := s.Send(GenericMessage{chatId, msgBuilder.String()}) returnEvents <- GenericMessage{chatId, msgBuilder.String()}
if err != nil {
// TODO: handle it better
panic(err)
} }
} }()
return returnEvents
} }

View File

@ -35,7 +35,8 @@ func TestMessageThrottling(t *testing.T) {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
mockSender := MockSender{make([]GenericMessage, 0)} mockSender := MockSender{make([]GenericMessage, 0)}
mockSleeper := MockSleeper{nil, 0} mockSleeper := MockSleeper{nil, 0}
go tgSenderWorker(testEvents, &mockSender, &wg, &mockSleeper, log.New(io.Discard, "", log.Ltime)) Consume(SendToTg(tgSenderWorker(testEvents, &wg, &mockSleeper, log.New(io.Discard, "", log.Ltime)),
&mockSender, &wg, log.New(io.Discard, "", log.Ltime)))
testEvents <- GenericMessage{123, "test1"} testEvents <- GenericMessage{123, "test1"}
testEvents <- GenericMessage{124, "test3"} testEvents <- GenericMessage{124, "test3"}
testEvents <- GenericMessage{123, "test2"} testEvents <- GenericMessage{123, "test2"}