diff --git a/filters_sync.go b/filters_sync.go index 1a9105a..cc3c51a 100644 --- a/filters_sync.go +++ b/filters_sync.go @@ -7,28 +7,34 @@ type SyncFilter interface { Call(msg GenericMessage) } -type CollectFromChannel struct { +type SyncFilterImpl struct { next SyncFilter } -func (self *CollectFromChannel) Then(what SyncFilter) SyncFilter { - self.next = what +func (impl *SyncFilterImpl) Then(what SyncFilter) SyncFilter { + impl.next = what return what } -func (self *CollectFromChannel) Call(msg GenericMessage) { - if self.next != nil { - self.next.Call(msg) +func (impl *SyncFilterImpl) CallNext(msg GenericMessage) { + if impl.next != nil { + impl.next.Call(msg) } } -func (self CollectFromChannel) Collect(events <-chan GenericMessage, wg *sync.WaitGroup) { +type CollectFromChannel struct{ SyncFilterImpl } + +func (collect *CollectFromChannel) Call(msg GenericMessage) { + collect.CallNext(msg) +} + +func (collect CollectFromChannel) Collect(events <-chan GenericMessage, wg *sync.WaitGroup) { wg.Add(1) go func() { defer wg.Done() for e := range events { - self.Call(e) + collect.Call(e) } }() } diff --git a/filters_test.go b/filters_test.go index da5cd70..4c1c57e 100644 --- a/filters_test.go +++ b/filters_test.go @@ -332,20 +332,13 @@ func TestThrottle_ManyMessagesInOneEvent(t *testing.T) { } type SyncMockFilter struct { + SyncFilterImpl collected []GenericMessage - next SyncFilter -} - -func (self *SyncMockFilter) Then(what SyncFilter) SyncFilter { - self.next = what - return what } func (self *SyncMockFilter) Call(msg GenericMessage) { self.collected = append(self.collected, msg) - if self.next != nil { - self.next.Call(msg) - } + self.CallNext(msg) } func TestSyncCollect(t *testing.T) {