1
0
Fork 0

Message throttling

This commit is contained in:
Michał Rudowicz 2024-03-10 10:52:27 +01:00
parent 6b69fed5b8
commit 3e3eb3bc5d
4 changed files with 97 additions and 5 deletions

View File

@ -86,3 +86,40 @@ func FilterByLastSeen(ev <-chan satel.Event, wg *sync.WaitGroup, dataStore *Data
return returnChan return returnChan
} }
func Throttle[EvType any](inputEvents <-chan EvType, wg *sync.WaitGroup, sleeper Sleeper, logger *log.Logger) <-chan EvType {
returnChan := make(chan EvType)
timeoutEvents := make(chan interface{})
go func() {
wg.Add(1)
defer wg.Done()
defer close(returnChan)
var currentEvent *EvType = nil
loop:
for {
select {
case ev, ok := <-inputEvents:
if !ok {
break loop
}
if currentEvent == nil {
logger.Print("Waiting for more messages to arrive before sending...")
sleeper.Sleep(timeoutEvents)
}
currentEvent = &ev
case <-timeoutEvents:
logger.Print("Time's up, sending all messages we've got for now.")
returnChan <- *currentEvent
currentEvent = nil
}
}
// If anything is left to be sent, send it now
if currentEvent != nil {
returnChan <- *currentEvent
}
}()
return returnChan
}

View File

@ -216,3 +216,52 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) {
assert.Len(t, receivedEvents, 1) assert.Len(t, receivedEvents, 1)
assert.Contains(t, receivedEvents, makeTestSatelEvent(satel.ArmedPartition, 1, true)) assert.Contains(t, receivedEvents, makeTestSatelEvent(satel.ArmedPartition, 1, true))
} }
type MockSleeper struct {
ch *chan<- interface{}
callCount int
}
func (self *MockSleeper) Sleep(ch chan<- interface{}) {
if self.ch == nil {
self.ch = &ch
}
self.callCount += 1
}
func TestThrottle(t *testing.T) {
testEvents := make(chan int)
receivedEvents := make([]int, 0)
wg := sync.WaitGroup{}
fakeLog := log.New(io.Discard, "", log.Ltime)
mockSleeper := MockSleeper{nil, 0}
go func() {
wg.Add(1)
for e := range Throttle(testEvents, &wg, &mockSleeper, fakeLog) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
}()
testEvents <- 1
testEvents <- 2
testEvents <- 3
*mockSleeper.ch <- nil
testEvents <- 4
testEvents <- 5
testEvents <- 6
*mockSleeper.ch <- nil
testEvents <- 7
close(testEvents)
wg.Wait()
assert.Equal(t, 3, mockSleeper.callCount)
assert.Len(t, receivedEvents, 3)
assert.Contains(t, receivedEvents, 3)
assert.Contains(t, receivedEvents, 6)
assert.Contains(t, receivedEvents, 7)
}

View File

@ -130,6 +130,7 @@ func main() {
wg sync.WaitGroup wg sync.WaitGroup
tgEvents = make(chan GenericMessage, 5) tgEvents = make(chan GenericMessage, 5)
logger = log.New(os.Stderr, "Main", log.Lmicroseconds) logger = log.New(os.Stderr, "Main", log.Lmicroseconds)
sleeper = RealSleeper{time.Second * 60}
) )
satelAddr, chatIds, allowedTypes, allowedIndexes, poolInterval := getCmdLineParams(logger) satelAddr, chatIds, allowedTypes, allowedIndexes, poolInterval := getCmdLineParams(logger)
@ -149,10 +150,10 @@ func main() {
dataStore := MakeDataStore(log.New(os.Stderr, "DataStore", log.Lmicroseconds), getPersistenceFilePath()) dataStore := MakeDataStore(log.New(os.Stderr, "DataStore", log.Lmicroseconds), getPersistenceFilePath())
NotifyViaHTTP( Consume(
SendToTg(tgEvents, tgSender, &wg, log.New(os.Stderr, "SendToTg", log.Lmicroseconds), tpl), SendToTg(Throttle(NotifyViaHTTP(tgEvents, &wg, log.New(os.Stderr, "HTTPNotify", log.Lmicroseconds)),
&wg, &wg, sleeper, log.New(os.Stderr, "MessageThrottle", log.Lmicroseconds)),
log.New(os.Stderr, "HTTPNotify", log.Lmicroseconds), tgSender, &wg, log.New(os.Stderr, "SendToTg", log.Lmicroseconds), tpl),
) )
go CloseSatelOnCtrlC(s) go CloseSatelOnCtrlC(s)

View File

@ -77,16 +77,19 @@ func doHttpNotification(url string, logger *log.Logger, wg *sync.WaitGroup) {
logger.Print("Notified via HTTP with result ", res.StatusCode) logger.Print("Notified via HTTP with result ", res.StatusCode)
} }
func NotifyViaHTTP(events <-chan GenericMessage, wg *sync.WaitGroup, logger *log.Logger) { func NotifyViaHTTP(events <-chan GenericMessage, wg *sync.WaitGroup, logger *log.Logger) <-chan GenericMessage {
returnEvents := make(chan GenericMessage)
armCallbackUrl := os.Getenv("NOTIFY_URL_ARM") armCallbackUrl := os.Getenv("NOTIFY_URL_ARM")
disarmCallbackUrl := os.Getenv("NOTIFY_URL_DISARM") disarmCallbackUrl := os.Getenv("NOTIFY_URL_DISARM")
alarmCallbackUrl := os.Getenv("ALARM_URL_ARM") alarmCallbackUrl := os.Getenv("ALARM_URL_ARM")
armDisarmCallbackEnabled := (len(armCallbackUrl) != 0) && (len(disarmCallbackUrl) != 0) armDisarmCallbackEnabled := (len(armCallbackUrl) != 0) && (len(disarmCallbackUrl) != 0)
alarmCallbackEnabled := (len(alarmCallbackUrl) != 0) alarmCallbackEnabled := (len(alarmCallbackUrl) != 0)
go func() { go func() {
wg.Add(1) wg.Add(1)
defer wg.Done() defer wg.Done()
for e := range events { for e := range events {
returnEvents <- e
if armDisarmCallbackEnabled { if armDisarmCallbackEnabled {
inner_arm: inner_arm:
for _, basicElement := range e.Messages { for _, basicElement := range e.Messages {
@ -113,4 +116,6 @@ func NotifyViaHTTP(events <-chan GenericMessage, wg *sync.WaitGroup, logger *log
} }
} }
}() }()
return returnEvents
} }