diff --git a/filters.go b/filters.go index ff95fe8..8ad47f5 100644 --- a/filters.go +++ b/filters.go @@ -87,15 +87,33 @@ func FilterByLastSeen(ev <-chan satel.Event, wg *sync.WaitGroup, dataStore *Data return returnChan } -func Throttle[EvType any](inputEvents <-chan EvType, wg *sync.WaitGroup, sleeper Sleeper, logger *log.Logger) <-chan EvType { - returnChan := make(chan EvType) +func appendToGenericMessage(msg *GenericMessage, new *GenericMessage) *GenericMessage { + if msg == nil { + return new + } + +throughNewMessages: + for _, newEv := range new.Messages { + for i, oldEv := range msg.Messages { + if oldEv.Index == newEv.Index && oldEv.Type == newEv.Type { + // this message was seen - update its value + msg.Messages[i].Value = newEv.Value + continue throughNewMessages + } + // apparently this type of message was not yet seen, save it + msg.Messages = append(msg.Messages, newEv) + } + } + return msg +} + +func Throttle(inputEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper Sleeper, logger *log.Logger) <-chan GenericMessage { + returnChan := make(chan GenericMessage) timeoutEvents := make(chan interface{}) go func() { wg.Add(1) - defer wg.Done() - defer close(returnChan) - var currentEvent *EvType = nil + var currentEvent *GenericMessage = nil loop: for { select { @@ -107,7 +125,7 @@ func Throttle[EvType any](inputEvents <-chan EvType, wg *sync.WaitGroup, sleeper logger.Print("Waiting for more messages to arrive before sending...") sleeper.Sleep(timeoutEvents) } - currentEvent = &ev + currentEvent = appendToGenericMessage(currentEvent, &ev) case <-timeoutEvents: logger.Print("Time's up, sending all messages we've got for now.") returnChan <- *currentEvent @@ -119,6 +137,8 @@ func Throttle[EvType any](inputEvents <-chan EvType, wg *sync.WaitGroup, sleeper if currentEvent != nil { returnChan <- *currentEvent } + close(returnChan) + wg.Done() }() return returnChan diff --git a/filters_test.go b/filters_test.go index e869746..6e6d78e 100644 --- a/filters_test.go +++ b/filters_test.go @@ -230,12 +230,19 @@ func (self *MockSleeper) Sleep(ch chan<- interface{}) { } func TestThrottle(t *testing.T) { - testEvents := make(chan int) - receivedEvents := make([]int, 0) + testEvents := make(chan GenericMessage) + receivedEvents := make([]GenericMessage, 0) wg := sync.WaitGroup{} fakeLog := log.New(io.Discard, "", log.Ltime) mockSleeper := MockSleeper{nil, 0} + var ( + tplMessageTest1 = satel.BasicEventElement{Type: satel.ArmedPartition, Index: 1, Value: true} + tplMessageTest2 = satel.BasicEventElement{Type: satel.ZoneViolation, Index: 2, Value: true} + tplMessageTest3 = satel.BasicEventElement{Type: satel.ArmedPartition, Index: 1, Value: false} + tplMessageTest4 = satel.BasicEventElement{Type: satel.ZoneViolation, Index: 2, Value: false} + ) + go func() { wg.Add(1) for e := range Throttle(testEvents, &wg, &mockSleeper, fakeLog) { @@ -244,24 +251,22 @@ func TestThrottle(t *testing.T) { wg.Done() }() - testEvents <- 1 - testEvents <- 2 - testEvents <- 3 + testEvents <- GenericMessage{TgChatId{123}, []satel.BasicEventElement{tplMessageTest1}} + testEvents <- GenericMessage{TgChatId{123}, []satel.BasicEventElement{tplMessageTest2}} + testEvents <- GenericMessage{TgChatId{123}, []satel.BasicEventElement{tplMessageTest3}} *mockSleeper.ch <- nil - testEvents <- 4 - testEvents <- 5 - testEvents <- 6 - *mockSleeper.ch <- nil - - testEvents <- 7 + testEvents <- GenericMessage{TgChatId{123}, []satel.BasicEventElement{tplMessageTest4}} close(testEvents) wg.Wait() - assert.Equal(t, 3, mockSleeper.callCount) - assert.Len(t, receivedEvents, 3) - assert.Contains(t, receivedEvents, 3) - assert.Contains(t, receivedEvents, 6) - assert.Contains(t, receivedEvents, 7) + assert.Equal(t, 2, mockSleeper.callCount) + assert.Len(t, receivedEvents, 2) + assert.Contains(t, receivedEvents[0].Messages, tplMessageTest2) + assert.Contains(t, receivedEvents[0].Messages, tplMessageTest3) + assert.Len(t, receivedEvents[0].Messages, 2) + + assert.Contains(t, receivedEvents[1].Messages, tplMessageTest4) + assert.Len(t, receivedEvents[1].Messages, 1) }