1
0
Fork 0

SyncFilterImpl

This commit is contained in:
Michał Rudowicz 2025-01-06 20:53:08 +01:00
parent 88c86a9581
commit f7bb03721c
2 changed files with 16 additions and 17 deletions

View File

@ -7,28 +7,34 @@ type SyncFilter interface {
Call(msg GenericMessage)
}
type CollectFromChannel struct {
type SyncFilterImpl struct {
next SyncFilter
}
func (self *CollectFromChannel) Then(what SyncFilter) SyncFilter {
self.next = what
func (impl *SyncFilterImpl) Then(what SyncFilter) SyncFilter {
impl.next = what
return what
}
func (self *CollectFromChannel) Call(msg GenericMessage) {
if self.next != nil {
self.next.Call(msg)
func (impl *SyncFilterImpl) CallNext(msg GenericMessage) {
if impl.next != nil {
impl.next.Call(msg)
}
}
func (self CollectFromChannel) Collect(events <-chan GenericMessage, wg *sync.WaitGroup) {
type CollectFromChannel struct{ SyncFilterImpl }
func (collect *CollectFromChannel) Call(msg GenericMessage) {
collect.CallNext(msg)
}
func (collect CollectFromChannel) Collect(events <-chan GenericMessage, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
for e := range events {
self.Call(e)
collect.Call(e)
}
}()
}

View File

@ -332,20 +332,13 @@ func TestThrottle_ManyMessagesInOneEvent(t *testing.T) {
}
type SyncMockFilter struct {
SyncFilterImpl
collected []GenericMessage
next SyncFilter
}
func (self *SyncMockFilter) Then(what SyncFilter) SyncFilter {
self.next = what
return what
}
func (self *SyncMockFilter) Call(msg GenericMessage) {
self.collected = append(self.collected, msg)
if self.next != nil {
self.next.Call(msg)
}
self.CallNext(msg)
}
func TestSyncCollect(t *testing.T) {