package main import "sync" type SyncFilter interface { Then(what SyncFilter) SyncFilter Call(msg GenericMessage) } type CollectFromChannel struct { next SyncFilter } func (self *CollectFromChannel) Then(what SyncFilter) SyncFilter { self.next = what return what } func (self *CollectFromChannel) Call(msg GenericMessage) { if self.next != nil { self.next.Call(msg) } } func (self CollectFromChannel) Collect(events <-chan GenericMessage, wg *sync.WaitGroup) { wg.Add(1) go func() { defer wg.Done() for e := range events { self.Call(e) } }() }