1
0
Fork 0

Shuffling things around in hope that it'll be better in the end

This commit is contained in:
Michał Rudowicz 2024-02-18 09:42:27 +01:00
parent e776813230
commit 7a9a973904
3 changed files with 58 additions and 29 deletions

25
main.go
View File

@ -24,15 +24,28 @@ type TgSender struct {
} }
func (self TgSender) Send(msg GenericMessage) error { func (self TgSender) Send(msg GenericMessage) error {
toSend := tgbotapi.NewMessage(msg.chatId, msg.msg) chatIds := msg.chatIds.GetTgIds()
if chatIds == nil {
return nil
}
b := strings.Builder{}
for _, msg := range msg.msgs {
b.WriteString(msg.String())
b.WriteRune('\n')
}
message := b.String()
for _, chatId := range *chatIds {
toSend := tgbotapi.NewMessage(chatId, message)
_, err := self.bot.Send(toSend) _, err := self.bot.Send(toSend)
if err != nil {
return err return err
}
}
return nil
} }
func sendTgMessage(tgEvents chan GenericMessage, msg string, chatIds []int64) { func sendTgMessage(tgEvents chan GenericMessage, msg string, chatId int64) {
for _, chatId := range chatIds { tgEvents <- GenericMessage{TgChatId{chatId}, []MsgContent{MsgContent{msg}}}
tgEvents <- GenericMessage{chatId, msg}
}
} }
type RealSleeper struct { type RealSleeper struct {
@ -101,7 +114,7 @@ func main() {
tgSender, &wg, log.New(os.Stderr, "SendToTg", log.Lmicroseconds))) tgSender, &wg, log.New(os.Stderr, "SendToTg", log.Lmicroseconds)))
for e := range FilterByLastSeen(s.Events, "hs_wro_last_seen.bin", log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)) { for e := range FilterByLastSeen(s.Events, "hs_wro_last_seen.bin", log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)) {
logger.Print("Received change from SATEL: ", e) logger.Print("Received change from SATEL: ", e)
sendTgMessage(tgEvents, fmt.Sprint("Change from SATEL: type:", e.Type, ", index:", e.Index, ", value:", e.Value), chatIds) sendTgMessage(tgEvents, fmt.Sprint("Change from SATEL: type:", e.Type, ", index:", e.Index, ", value:", e.Value), chatIds[0])
} }
close(tgEvents) close(tgEvents)

View File

@ -2,15 +2,9 @@ package main
import ( import (
"log" "log"
"strings"
"sync" "sync"
) )
type GenericMessage struct {
chatId int64
msg string
}
type Sender interface { type Sender interface {
Send(msg GenericMessage) error Send(msg GenericMessage) error
} }
@ -19,6 +13,18 @@ type Sleeper interface {
Sleep(ch chan<- interface{}) Sleep(ch chan<- interface{})
} }
type ChatId interface {
GetTgIds() *[]int64
}
type TgChatId struct {
tgChatId int64
}
func (self TgChatId) GetTgIds() *[]int64 {
return &[]int64{self.tgChatId}
}
func Consume(events <-chan GenericMessage) { func Consume(events <-chan GenericMessage) {
go func() { go func() {
for range events { for range events {
@ -26,6 +32,17 @@ func Consume(events <-chan GenericMessage) {
}() }()
} }
type MsgContent struct {
msg string
}
func (self MsgContent) String() string { return self.msg }
type GenericMessage struct {
chatIds ChatId
msgs []MsgContent
}
func SendToTg(events <-chan GenericMessage, s Sender, wg *sync.WaitGroup, logger *log.Logger) <-chan GenericMessage { func SendToTg(events <-chan GenericMessage, s Sender, wg *sync.WaitGroup, logger *log.Logger) <-chan GenericMessage {
returnEvents := make(chan GenericMessage) returnEvents := make(chan GenericMessage)
@ -48,7 +65,7 @@ func SendToTg(events <-chan GenericMessage, s Sender, wg *sync.WaitGroup, logger
func tgSenderWorker(tgEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper Sleeper, logger *log.Logger) <-chan GenericMessage { func tgSenderWorker(tgEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper Sleeper, logger *log.Logger) <-chan GenericMessage {
logger.Print("Starting") logger.Print("Starting")
messagesToSend := make(map[int64]*strings.Builder) messagesToSend := make(map[ChatId][]MsgContent)
waitingStarted := false waitingStarted := false
timeoutEvents := make(chan interface{}) timeoutEvents := make(chan interface{})
returnEvents := make(chan GenericMessage) returnEvents := make(chan GenericMessage)
@ -65,12 +82,11 @@ func tgSenderWorker(tgEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper
break loop break loop
} }
// Collect all messages to send them at once // Collect all messages to send them at once
_, messageBuilderExists := messagesToSend[ev.chatId] _, messageBuilderExists := messagesToSend[ev.chatIds]
if !messageBuilderExists { if !messageBuilderExists {
messagesToSend[ev.chatId] = &strings.Builder{} messagesToSend[ev.chatIds] = make([]MsgContent, 0)
} }
messagesToSend[ev.chatId].WriteString(ev.msg) messagesToSend[ev.chatIds] = append(messagesToSend[ev.chatIds], ev.msgs...)
messagesToSend[ev.chatId].WriteRune('\n')
if !waitingStarted { if !waitingStarted {
logger.Print("Waiting for more messages to arrive before sending...") logger.Print("Waiting for more messages to arrive before sending...")
waitingStarted = true waitingStarted = true
@ -79,8 +95,8 @@ func tgSenderWorker(tgEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper
case <-timeoutEvents: case <-timeoutEvents:
logger.Print("Time's up, sending all messages we've got for now.") logger.Print("Time's up, sending all messages we've got for now.")
waitingStarted = false waitingStarted = false
for chatId, msgBuilder := range messagesToSend { for chatId, msgs := range messagesToSend {
returnEvents <- GenericMessage{chatId, msgBuilder.String()} returnEvents <- GenericMessage{chatId, msgs}
delete(messagesToSend, chatId) delete(messagesToSend, chatId)
} }
} }
@ -88,8 +104,8 @@ func tgSenderWorker(tgEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper
logger.Print("Exiting") logger.Print("Exiting")
// If anything is left to be sent, send it now // If anything is left to be sent, send it now
for chatId, msgBuilder := range messagesToSend { for chatId, msgs := range messagesToSend {
returnEvents <- GenericMessage{chatId, msgBuilder.String()} returnEvents <- GenericMessage{chatId, msgs}
} }
}() }()

View File

@ -37,18 +37,18 @@ func TestMessageThrottling(t *testing.T) {
mockSleeper := MockSleeper{nil, 0} mockSleeper := MockSleeper{nil, 0}
Consume(SendToTg(tgSenderWorker(testEvents, &wg, &mockSleeper, log.New(io.Discard, "", log.Ltime)), Consume(SendToTg(tgSenderWorker(testEvents, &wg, &mockSleeper, log.New(io.Discard, "", log.Ltime)),
&mockSender, &wg, log.New(io.Discard, "", log.Ltime))) &mockSender, &wg, log.New(io.Discard, "", log.Ltime)))
testEvents <- GenericMessage{123, "test1"} testEvents <- GenericMessage{TgChatId{123}, []MsgContent{MsgContent{"test1"}}}
testEvents <- GenericMessage{124, "test3"} testEvents <- GenericMessage{TgChatId{124}, []MsgContent{MsgContent{"test3"}}}
testEvents <- GenericMessage{123, "test2"} testEvents <- GenericMessage{TgChatId{123}, []MsgContent{MsgContent{"test2"}}}
testEvents <- GenericMessage{124, "test4"} testEvents <- GenericMessage{TgChatId{124}, []MsgContent{MsgContent{"test4"}}}
assert.Equal(t, 1, mockSleeper.callCount) assert.Equal(t, 1, mockSleeper.callCount)
*mockSleeper.ch <- nil *mockSleeper.ch <- nil
assert.Equal(t, 1, mockSleeper.callCount) assert.Equal(t, 1, mockSleeper.callCount)
testEvents <- GenericMessage{123, "test5"} testEvents <- GenericMessage{TgChatId{123}, []MsgContent{MsgContent{"test5"}}}
close(testEvents) close(testEvents)
wg.Wait() wg.Wait()
assert.Equal(t, 2, mockSleeper.callCount) assert.Equal(t, 2, mockSleeper.callCount)
assert.Len(t, mockSender.messages, 3) assert.Len(t, mockSender.messages, 3)
assert.Contains(t, mockSender.messages, GenericMessage{123, "test1\ntest2\n"}) assert.Contains(t, mockSender.messages, GenericMessage{TgChatId{123}, []MsgContent{MsgContent{"test1"}, MsgContent{"test2"}}})
} }