package main import "sync" type SyncFilter[MsgType any] interface { Then(what SyncFilter[MsgType]) SyncFilter[MsgType] Call(msg MsgType) } type SyncFilterImpl[MsgType any] struct { next SyncFilter[MsgType] } func (impl *SyncFilterImpl[MsgType]) Then(what SyncFilter[MsgType]) SyncFilter[MsgType] { impl.next = what return what } func (impl *SyncFilterImpl[MsgType]) CallNext(msg MsgType) { if impl.next != nil { impl.next.Call(msg) } } type CollectFromChannel struct{ SyncFilterImpl[GenericMessage] } func (collect *CollectFromChannel) Call(msg GenericMessage) { collect.CallNext(msg) } func (collect CollectFromChannel) Collect(events <-chan GenericMessage, wg *sync.WaitGroup) { wg.Add(1) go func() { defer wg.Done() for e := range events { collect.Call(e) } }() }