package main import "sync" type SyncFilter interface { Then(what SyncFilter) SyncFilter Call(msg GenericMessage) } type SyncFilterImpl struct { next SyncFilter } func (impl *SyncFilterImpl) Then(what SyncFilter) SyncFilter { impl.next = what return what } func (impl *SyncFilterImpl) CallNext(msg GenericMessage) { if impl.next != nil { impl.next.Call(msg) } } type CollectFromChannel struct{ SyncFilterImpl } func (collect *CollectFromChannel) Call(msg GenericMessage) { collect.CallNext(msg) } func (collect CollectFromChannel) Collect(events <-chan GenericMessage, wg *sync.WaitGroup) { wg.Add(1) go func() { defer wg.Done() for e := range events { collect.Call(e) } }() }