diff --git a/filters.go b/filters.go index 23b4768..ff95fe8 100644 --- a/filters.go +++ b/filters.go @@ -86,3 +86,40 @@ func FilterByLastSeen(ev <-chan satel.Event, wg *sync.WaitGroup, dataStore *Data return returnChan } + +func Throttle[EvType any](inputEvents <-chan EvType, wg *sync.WaitGroup, sleeper Sleeper, logger *log.Logger) <-chan EvType { + returnChan := make(chan EvType) + timeoutEvents := make(chan interface{}) + + go func() { + wg.Add(1) + defer wg.Done() + defer close(returnChan) + var currentEvent *EvType = nil + loop: + for { + select { + case ev, ok := <-inputEvents: + if !ok { + break loop + } + if currentEvent == nil { + logger.Print("Waiting for more messages to arrive before sending...") + sleeper.Sleep(timeoutEvents) + } + currentEvent = &ev + case <-timeoutEvents: + logger.Print("Time's up, sending all messages we've got for now.") + returnChan <- *currentEvent + currentEvent = nil + } + } + + // If anything is left to be sent, send it now + if currentEvent != nil { + returnChan <- *currentEvent + } + }() + + return returnChan +} diff --git a/filters_test.go b/filters_test.go index f6f52df..e869746 100644 --- a/filters_test.go +++ b/filters_test.go @@ -216,3 +216,52 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) { assert.Len(t, receivedEvents, 1) assert.Contains(t, receivedEvents, makeTestSatelEvent(satel.ArmedPartition, 1, true)) } + +type MockSleeper struct { + ch *chan<- interface{} + callCount int +} + +func (self *MockSleeper) Sleep(ch chan<- interface{}) { + if self.ch == nil { + self.ch = &ch + } + self.callCount += 1 +} + +func TestThrottle(t *testing.T) { + testEvents := make(chan int) + receivedEvents := make([]int, 0) + wg := sync.WaitGroup{} + fakeLog := log.New(io.Discard, "", log.Ltime) + mockSleeper := MockSleeper{nil, 0} + + go func() { + wg.Add(1) + for e := range Throttle(testEvents, &wg, &mockSleeper, fakeLog) { + receivedEvents = append(receivedEvents, e) + } + wg.Done() + }() + + testEvents <- 1 + testEvents <- 2 + testEvents <- 3 + *mockSleeper.ch <- nil + + testEvents <- 4 + testEvents <- 5 + testEvents <- 6 + *mockSleeper.ch <- nil + + testEvents <- 7 + + close(testEvents) + wg.Wait() + + assert.Equal(t, 3, mockSleeper.callCount) + assert.Len(t, receivedEvents, 3) + assert.Contains(t, receivedEvents, 3) + assert.Contains(t, receivedEvents, 6) + assert.Contains(t, receivedEvents, 7) +} diff --git a/main.go b/main.go index 78c427a..f0da7a7 100644 --- a/main.go +++ b/main.go @@ -130,6 +130,7 @@ func main() { wg sync.WaitGroup tgEvents = make(chan GenericMessage, 5) logger = log.New(os.Stderr, "Main", log.Lmicroseconds) + sleeper = RealSleeper{time.Second * 60} ) satelAddr, chatIds, allowedTypes, allowedIndexes, poolInterval := getCmdLineParams(logger) @@ -149,10 +150,10 @@ func main() { dataStore := MakeDataStore(log.New(os.Stderr, "DataStore", log.Lmicroseconds), getPersistenceFilePath()) - NotifyViaHTTP( - SendToTg(tgEvents, tgSender, &wg, log.New(os.Stderr, "SendToTg", log.Lmicroseconds), tpl), - &wg, - log.New(os.Stderr, "HTTPNotify", log.Lmicroseconds), + Consume( + SendToTg(Throttle(NotifyViaHTTP(tgEvents, &wg, log.New(os.Stderr, "HTTPNotify", log.Lmicroseconds)), + &wg, sleeper, log.New(os.Stderr, "MessageThrottle", log.Lmicroseconds)), + tgSender, &wg, log.New(os.Stderr, "SendToTg", log.Lmicroseconds), tpl), ) go CloseSatelOnCtrlC(s) diff --git a/sender_worker.go b/sender_worker.go index b6b9c7d..589a457 100644 --- a/sender_worker.go +++ b/sender_worker.go @@ -77,16 +77,19 @@ func doHttpNotification(url string, logger *log.Logger, wg *sync.WaitGroup) { logger.Print("Notified via HTTP with result ", res.StatusCode) } -func NotifyViaHTTP(events <-chan GenericMessage, wg *sync.WaitGroup, logger *log.Logger) { +func NotifyViaHTTP(events <-chan GenericMessage, wg *sync.WaitGroup, logger *log.Logger) <-chan GenericMessage { + returnEvents := make(chan GenericMessage) armCallbackUrl := os.Getenv("NOTIFY_URL_ARM") disarmCallbackUrl := os.Getenv("NOTIFY_URL_DISARM") alarmCallbackUrl := os.Getenv("ALARM_URL_ARM") armDisarmCallbackEnabled := (len(armCallbackUrl) != 0) && (len(disarmCallbackUrl) != 0) alarmCallbackEnabled := (len(alarmCallbackUrl) != 0) + go func() { wg.Add(1) defer wg.Done() for e := range events { + returnEvents <- e if armDisarmCallbackEnabled { inner_arm: for _, basicElement := range e.Messages { @@ -113,4 +116,6 @@ func NotifyViaHTTP(events <-chan GenericMessage, wg *sync.WaitGroup, logger *log } } }() + + return returnEvents }