package main import ( "log" "sync" "git.sr.ht/~michalr/go-satel" ) type SyncFilter[MsgType any] interface { Then(what SyncFilter[MsgType]) SyncFilter[MsgType] Call(msg MsgType) } type SyncFilterImpl[MsgType any] struct { next SyncFilter[MsgType] } func (impl *SyncFilterImpl[MsgType]) Then(what SyncFilter[MsgType]) SyncFilter[MsgType] { impl.next = what return what } func (impl *SyncFilterImpl[MsgType]) CallNext(msg MsgType) { if impl.next != nil { impl.next.Call(msg) } } type CollectFromChannel[MsgType any] struct{ SyncFilterImpl[MsgType] } func (collect *CollectFromChannel[MsgType]) Call(msg MsgType) { collect.CallNext(msg) } func (collect CollectFromChannel[MsgType]) Collect(events <-chan MsgType, wg *sync.WaitGroup, onClose func()) { wg.Add(1) go func() { defer wg.Done() defer onClose() for e := range events { collect.Call(e) } }() } type ThrottleSync struct { SyncFilterImpl[GenericMessage] events chan GenericMessage } func appendToGenericMessage(msg *GenericMessage, new *GenericMessage) *GenericMessage { if msg == nil { msg = &GenericMessage{make([]satel.BasicEventElement, 0)} } throughNewMessages: for _, newEv := range new.Messages { for i, oldEv := range msg.Messages { if oldEv.Index == newEv.Index && oldEv.Type == newEv.Type { // this message was seen - update its value msg.Messages[i].Value = newEv.Value continue throughNewMessages } } // apparently this type of message was not yet seen, save it msg.Messages = append(msg.Messages, newEv) } return msg } func MakeThrottleSync(sleeper Sleeper, logger *log.Logger, wg *sync.WaitGroup) *ThrottleSync { events := make(chan GenericMessage) throttle := ThrottleSync{SyncFilterImpl[GenericMessage]{}, events} wg.Add(1) go func() { timeoutEvents := make(chan interface{}) var currentEvent *GenericMessage = nil loop: for { select { case ev, ok := <-events: if !ok { break loop } if currentEvent == nil { logger.Print("Waiting for more messages to arrive before sending...") sleeper.Sleep(timeoutEvents) } currentEvent = appendToGenericMessage(currentEvent, &ev) case <-timeoutEvents: logger.Print("Time's up, sending all messages we've got for now.") throttle.CallNext(*currentEvent) currentEvent = nil } } // If anything is left to be sent, send it now if currentEvent != nil { throttle.CallNext(*currentEvent) } wg.Done() logger.Print("Throttling goroutine finishing") }() return &throttle } func (throttle *ThrottleSync) Close() { close(throttle.events) } func (throttle *ThrottleSync) Call(msg GenericMessage) { throttle.events <- msg }