1
0
Fork 0
hswro-alarm-bot/sender_worker.go

114 lines
2.3 KiB
Go
Raw Normal View History

2024-02-10 23:13:31 +00:00
package main
import (
2024-02-11 21:48:05 +00:00
"log"
2024-02-10 23:13:31 +00:00
"sync"
)
type Sender interface {
Send(msg GenericMessage) error
}
2024-02-11 10:51:41 +00:00
type Sleeper interface {
Sleep(ch chan<- interface{})
}
type ChatId interface {
GetTgIds() *[]int64
}
type TgChatId struct {
tgChatId int64
}
func (self TgChatId) GetTgIds() *[]int64 {
return &[]int64{self.tgChatId}
}
2024-02-18 00:44:10 +00:00
func Consume(events <-chan GenericMessage) {
go func() {
for range events {
}
}()
}
type MsgContent struct {
msg string
}
func (self MsgContent) String() string { return self.msg }
type GenericMessage struct {
chatIds ChatId
msgs []MsgContent
}
2024-02-18 00:44:10 +00:00
func SendToTg(events <-chan GenericMessage, s Sender, wg *sync.WaitGroup, logger *log.Logger) <-chan GenericMessage {
returnEvents := make(chan GenericMessage)
go func() {
wg.Add(1)
defer wg.Done()
for e := range events {
returnEvents <- e
err := s.Send(e)
if err != nil {
// TODO: handle it better
panic(err)
}
}
close(returnEvents)
}()
return returnEvents
}
func tgSenderWorker(tgEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper Sleeper, logger *log.Logger) <-chan GenericMessage {
2024-02-11 21:48:05 +00:00
logger.Print("Starting")
messagesToSend := make(map[ChatId][]MsgContent)
2024-02-10 23:13:31 +00:00
waitingStarted := false
timeoutEvents := make(chan interface{})
2024-02-18 00:44:10 +00:00
returnEvents := make(chan GenericMessage)
2024-02-10 23:13:31 +00:00
2024-02-18 00:44:10 +00:00
go func() {
wg.Add(1)
defer wg.Done()
defer close(returnEvents)
loop:
for {
select {
case ev, ok := <-tgEvents:
if !ok {
break loop
}
// Collect all messages to send them at once
_, messageBuilderExists := messagesToSend[ev.chatIds]
2024-02-18 00:44:10 +00:00
if !messageBuilderExists {
messagesToSend[ev.chatIds] = make([]MsgContent, 0)
2024-02-18 00:44:10 +00:00
}
messagesToSend[ev.chatIds] = append(messagesToSend[ev.chatIds], ev.msgs...)
2024-02-18 00:44:10 +00:00
if !waitingStarted {
logger.Print("Waiting for more messages to arrive before sending...")
waitingStarted = true
sleeper.Sleep(timeoutEvents)
}
case <-timeoutEvents:
logger.Print("Time's up, sending all messages we've got for now.")
waitingStarted = false
for chatId, msgs := range messagesToSend {
returnEvents <- GenericMessage{chatId, msgs}
2024-02-18 00:44:10 +00:00
delete(messagesToSend, chatId)
2024-02-10 23:13:31 +00:00
}
}
}
2024-02-18 00:44:10 +00:00
logger.Print("Exiting")
// If anything is left to be sent, send it now
for chatId, msgs := range messagesToSend {
returnEvents <- GenericMessage{chatId, msgs}
2024-02-10 23:13:31 +00:00
}
2024-02-18 00:44:10 +00:00
}()
return returnEvents
2024-02-10 23:13:31 +00:00
}