2024-02-10 23:13:31 +00:00
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
2024-02-11 21:48:05 +00:00
|
|
|
"log"
|
2024-02-10 23:13:31 +00:00
|
|
|
"sync"
|
|
|
|
)
|
|
|
|
|
|
|
|
type Sender interface {
|
|
|
|
Send(msg GenericMessage) error
|
|
|
|
}
|
|
|
|
|
2024-02-11 10:51:41 +00:00
|
|
|
type Sleeper interface {
|
|
|
|
Sleep(ch chan<- interface{})
|
|
|
|
}
|
|
|
|
|
2024-02-18 08:42:27 +00:00
|
|
|
type ChatId interface {
|
|
|
|
GetTgIds() *[]int64
|
|
|
|
}
|
|
|
|
|
|
|
|
type TgChatId struct {
|
|
|
|
tgChatId int64
|
|
|
|
}
|
|
|
|
|
|
|
|
func (self TgChatId) GetTgIds() *[]int64 {
|
|
|
|
return &[]int64{self.tgChatId}
|
|
|
|
}
|
|
|
|
|
2024-02-18 00:44:10 +00:00
|
|
|
func Consume(events <-chan GenericMessage) {
|
|
|
|
go func() {
|
|
|
|
for range events {
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
2024-02-18 16:28:32 +00:00
|
|
|
type MsgContent interface {
|
|
|
|
TgString() string
|
|
|
|
}
|
|
|
|
|
|
|
|
type StringMsgContent struct {
|
2024-02-18 08:42:27 +00:00
|
|
|
msg string
|
|
|
|
}
|
|
|
|
|
2024-02-18 16:28:32 +00:00
|
|
|
func (self StringMsgContent) TgString() string { return self.msg }
|
2024-02-18 08:42:27 +00:00
|
|
|
|
|
|
|
type GenericMessage struct {
|
|
|
|
chatIds ChatId
|
|
|
|
msgs []MsgContent
|
|
|
|
}
|
|
|
|
|
2024-02-18 00:44:10 +00:00
|
|
|
func SendToTg(events <-chan GenericMessage, s Sender, wg *sync.WaitGroup, logger *log.Logger) <-chan GenericMessage {
|
|
|
|
returnEvents := make(chan GenericMessage)
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
wg.Add(1)
|
|
|
|
defer wg.Done()
|
|
|
|
for e := range events {
|
|
|
|
returnEvents <- e
|
|
|
|
err := s.Send(e)
|
|
|
|
if err != nil {
|
|
|
|
// TODO: handle it better
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
close(returnEvents)
|
|
|
|
}()
|
|
|
|
|
|
|
|
return returnEvents
|
|
|
|
}
|
|
|
|
|
|
|
|
func tgSenderWorker(tgEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper Sleeper, logger *log.Logger) <-chan GenericMessage {
|
2024-02-11 21:48:05 +00:00
|
|
|
logger.Print("Starting")
|
2024-02-18 08:42:27 +00:00
|
|
|
messagesToSend := make(map[ChatId][]MsgContent)
|
2024-02-10 23:13:31 +00:00
|
|
|
waitingStarted := false
|
|
|
|
timeoutEvents := make(chan interface{})
|
2024-02-18 00:44:10 +00:00
|
|
|
returnEvents := make(chan GenericMessage)
|
2024-02-10 23:13:31 +00:00
|
|
|
|
2024-02-18 00:44:10 +00:00
|
|
|
go func() {
|
|
|
|
wg.Add(1)
|
|
|
|
defer wg.Done()
|
|
|
|
defer close(returnEvents)
|
|
|
|
loop:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case ev, ok := <-tgEvents:
|
|
|
|
if !ok {
|
|
|
|
break loop
|
|
|
|
}
|
|
|
|
// Collect all messages to send them at once
|
2024-02-18 08:42:27 +00:00
|
|
|
_, messageBuilderExists := messagesToSend[ev.chatIds]
|
2024-02-18 00:44:10 +00:00
|
|
|
if !messageBuilderExists {
|
2024-02-18 08:42:27 +00:00
|
|
|
messagesToSend[ev.chatIds] = make([]MsgContent, 0)
|
2024-02-18 00:44:10 +00:00
|
|
|
}
|
2024-02-18 08:42:27 +00:00
|
|
|
messagesToSend[ev.chatIds] = append(messagesToSend[ev.chatIds], ev.msgs...)
|
2024-02-18 00:44:10 +00:00
|
|
|
if !waitingStarted {
|
|
|
|
logger.Print("Waiting for more messages to arrive before sending...")
|
|
|
|
waitingStarted = true
|
|
|
|
sleeper.Sleep(timeoutEvents)
|
|
|
|
}
|
|
|
|
case <-timeoutEvents:
|
|
|
|
logger.Print("Time's up, sending all messages we've got for now.")
|
|
|
|
waitingStarted = false
|
2024-02-18 08:42:27 +00:00
|
|
|
for chatId, msgs := range messagesToSend {
|
|
|
|
returnEvents <- GenericMessage{chatId, msgs}
|
2024-02-18 00:44:10 +00:00
|
|
|
delete(messagesToSend, chatId)
|
2024-02-10 23:13:31 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-02-18 00:44:10 +00:00
|
|
|
logger.Print("Exiting")
|
|
|
|
// If anything is left to be sent, send it now
|
2024-02-18 08:42:27 +00:00
|
|
|
for chatId, msgs := range messagesToSend {
|
|
|
|
returnEvents <- GenericMessage{chatId, msgs}
|
2024-02-10 23:13:31 +00:00
|
|
|
}
|
2024-02-18 00:44:10 +00:00
|
|
|
}()
|
|
|
|
|
|
|
|
return returnEvents
|
2024-02-10 23:13:31 +00:00
|
|
|
}
|