1
0
Fork 0

Collect message contents during throttling

References https://todo.sr.ht/~michalr/hswro-alarm-bot/9
This commit is contained in:
Michał Rudowicz 2024-03-10 11:25:36 +01:00
parent 3e3eb3bc5d
commit 57c66c4690
2 changed files with 47 additions and 22 deletions

View File

@ -87,15 +87,33 @@ func FilterByLastSeen(ev <-chan satel.Event, wg *sync.WaitGroup, dataStore *Data
return returnChan
}
func Throttle[EvType any](inputEvents <-chan EvType, wg *sync.WaitGroup, sleeper Sleeper, logger *log.Logger) <-chan EvType {
returnChan := make(chan EvType)
func appendToGenericMessage(msg *GenericMessage, new *GenericMessage) *GenericMessage {
if msg == nil {
return new
}
throughNewMessages:
for _, newEv := range new.Messages {
for i, oldEv := range msg.Messages {
if oldEv.Index == newEv.Index && oldEv.Type == newEv.Type {
// this message was seen - update its value
msg.Messages[i].Value = newEv.Value
continue throughNewMessages
}
// apparently this type of message was not yet seen, save it
msg.Messages = append(msg.Messages, newEv)
}
}
return msg
}
func Throttle(inputEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper Sleeper, logger *log.Logger) <-chan GenericMessage {
returnChan := make(chan GenericMessage)
timeoutEvents := make(chan interface{})
go func() {
wg.Add(1)
defer wg.Done()
defer close(returnChan)
var currentEvent *EvType = nil
var currentEvent *GenericMessage = nil
loop:
for {
select {
@ -107,7 +125,7 @@ func Throttle[EvType any](inputEvents <-chan EvType, wg *sync.WaitGroup, sleeper
logger.Print("Waiting for more messages to arrive before sending...")
sleeper.Sleep(timeoutEvents)
}
currentEvent = &ev
currentEvent = appendToGenericMessage(currentEvent, &ev)
case <-timeoutEvents:
logger.Print("Time's up, sending all messages we've got for now.")
returnChan <- *currentEvent
@ -119,6 +137,8 @@ func Throttle[EvType any](inputEvents <-chan EvType, wg *sync.WaitGroup, sleeper
if currentEvent != nil {
returnChan <- *currentEvent
}
close(returnChan)
wg.Done()
}()
return returnChan

View File

@ -230,12 +230,19 @@ func (self *MockSleeper) Sleep(ch chan<- interface{}) {
}
func TestThrottle(t *testing.T) {
testEvents := make(chan int)
receivedEvents := make([]int, 0)
testEvents := make(chan GenericMessage)
receivedEvents := make([]GenericMessage, 0)
wg := sync.WaitGroup{}
fakeLog := log.New(io.Discard, "", log.Ltime)
mockSleeper := MockSleeper{nil, 0}
var (
tplMessageTest1 = satel.BasicEventElement{Type: satel.ArmedPartition, Index: 1, Value: true}
tplMessageTest2 = satel.BasicEventElement{Type: satel.ZoneViolation, Index: 2, Value: true}
tplMessageTest3 = satel.BasicEventElement{Type: satel.ArmedPartition, Index: 1, Value: false}
tplMessageTest4 = satel.BasicEventElement{Type: satel.ZoneViolation, Index: 2, Value: false}
)
go func() {
wg.Add(1)
for e := range Throttle(testEvents, &wg, &mockSleeper, fakeLog) {
@ -244,24 +251,22 @@ func TestThrottle(t *testing.T) {
wg.Done()
}()
testEvents <- 1
testEvents <- 2
testEvents <- 3
testEvents <- GenericMessage{TgChatId{123}, []satel.BasicEventElement{tplMessageTest1}}
testEvents <- GenericMessage{TgChatId{123}, []satel.BasicEventElement{tplMessageTest2}}
testEvents <- GenericMessage{TgChatId{123}, []satel.BasicEventElement{tplMessageTest3}}
*mockSleeper.ch <- nil
testEvents <- 4
testEvents <- 5
testEvents <- 6
*mockSleeper.ch <- nil
testEvents <- 7
testEvents <- GenericMessage{TgChatId{123}, []satel.BasicEventElement{tplMessageTest4}}
close(testEvents)
wg.Wait()
assert.Equal(t, 3, mockSleeper.callCount)
assert.Len(t, receivedEvents, 3)
assert.Contains(t, receivedEvents, 3)
assert.Contains(t, receivedEvents, 6)
assert.Contains(t, receivedEvents, 7)
assert.Equal(t, 2, mockSleeper.callCount)
assert.Len(t, receivedEvents, 2)
assert.Contains(t, receivedEvents[0].Messages, tplMessageTest2)
assert.Contains(t, receivedEvents[0].Messages, tplMessageTest3)
assert.Len(t, receivedEvents[0].Messages, 2)
assert.Contains(t, receivedEvents[1].Messages, tplMessageTest4)
assert.Len(t, receivedEvents[1].Messages, 1)
}