1
0
Fork 0
hswro-alarm-bot/filters_sync.go

96 lines
2.0 KiB
Go

package main
import (
"log"
"sync"
)
type SyncFilter[MsgType any] interface {
Then(what SyncFilter[MsgType]) SyncFilter[MsgType]
Call(msg MsgType)
}
type SyncFilterImpl[MsgType any] struct {
next SyncFilter[MsgType]
}
func (impl *SyncFilterImpl[MsgType]) Then(what SyncFilter[MsgType]) SyncFilter[MsgType] {
impl.next = what
return what
}
func (impl *SyncFilterImpl[MsgType]) CallNext(msg MsgType) {
if impl.next != nil {
impl.next.Call(msg)
}
}
type CollectFromChannel[MsgType any] struct{ SyncFilterImpl[MsgType] }
func (collect *CollectFromChannel[MsgType]) Call(msg MsgType) {
collect.CallNext(msg)
}
func (collect CollectFromChannel[MsgType]) Collect(events <-chan MsgType, wg *sync.WaitGroup, onClose func()) {
wg.Add(1)
go func() {
defer wg.Done()
defer onClose()
for e := range events {
collect.Call(e)
}
}()
}
type ThrottleSync struct {
SyncFilterImpl[GenericMessage]
events chan GenericMessage
}
func MakeThrottleSync(sleeper Sleeper, logger *log.Logger, wg *sync.WaitGroup) *ThrottleSync {
events := make(chan GenericMessage)
throttle := ThrottleSync{SyncFilterImpl[GenericMessage]{}, events}
wg.Add(1)
go func() {
timeoutEvents := make(chan interface{})
var currentEvent *GenericMessage = nil
loop:
for {
select {
case ev, ok := <-events:
if !ok {
break loop
}
if currentEvent == nil {
logger.Print("Waiting for more messages to arrive before sending...")
sleeper.Sleep(timeoutEvents)
}
currentEvent = appendToGenericMessage(currentEvent, &ev)
case <-timeoutEvents:
logger.Print("Time's up, sending all messages we've got for now.")
throttle.CallNext(*currentEvent)
currentEvent = nil
}
}
// If anything is left to be sent, send it now
if currentEvent != nil {
throttle.CallNext(*currentEvent)
}
wg.Done()
logger.Print("Throttling goroutine finishing")
}()
return &throttle
}
func (throttle *ThrottleSync) Close() { close(throttle.events) }
func (throttle *ThrottleSync) Call(msg GenericMessage) {
throttle.events <- msg
}