diff --git a/filters_sync.go b/filters_sync.go new file mode 100644 index 0000000..1a9105a --- /dev/null +++ b/filters_sync.go @@ -0,0 +1,34 @@ +package main + +import "sync" + +type SyncFilter interface { + Then(what SyncFilter) SyncFilter + Call(msg GenericMessage) +} + +type CollectFromChannel struct { + next SyncFilter +} + +func (self *CollectFromChannel) Then(what SyncFilter) SyncFilter { + self.next = what + return what +} + +func (self *CollectFromChannel) Call(msg GenericMessage) { + if self.next != nil { + self.next.Call(msg) + } +} + +func (self CollectFromChannel) Collect(events <-chan GenericMessage, wg *sync.WaitGroup) { + wg.Add(1) + go func() { + defer wg.Done() + + for e := range events { + self.Call(e) + } + }() +} diff --git a/filters_test.go b/filters_test.go index 2cc1751..da5cd70 100644 --- a/filters_test.go +++ b/filters_test.go @@ -330,3 +330,59 @@ func TestThrottle_ManyMessagesInOneEvent(t *testing.T) { assert.Contains(t, receivedEvents[1].Messages, tplMessageTest4) assert.Len(t, receivedEvents[1].Messages, 1) } + +type SyncMockFilter struct { + collected []GenericMessage + next SyncFilter +} + +func (self *SyncMockFilter) Then(what SyncFilter) SyncFilter { + self.next = what + return what +} + +func (self *SyncMockFilter) Call(msg GenericMessage) { + self.collected = append(self.collected, msg) + if self.next != nil { + self.next.Call(msg) + } +} + +func TestSyncCollect(t *testing.T) { + testEvents := make(chan GenericMessage) + wg := sync.WaitGroup{} + + tested := CollectFromChannel{} + mock := &SyncMockFilter{} + mock2 := &SyncMockFilter{} + + tested.Then(mock).Then(mock2) + + tested.Collect(testEvents, &wg) + + testEvents <- makeGenericMessage(satel.ArmedPartition, 1, true) + testEvents <- makeGenericMessage(satel.DoorOpened, 2, true) + testEvents <- makeGenericMessage(satel.PartitionAlarm, 3, true) + testEvents <- makeGenericMessage(satel.PartitionFireAlarm, 4, true) + testEvents <- makeGenericMessage(satel.TroublePart1, 5, true) + testEvents <- makeGenericMessage(satel.ZoneTamper, 6, true) + + close(testEvents) + wg.Wait() + + assert.Len(t, mock.collected, 6) + assert.Contains(t, mock.collected, makeGenericMessage(satel.ArmedPartition, 1, true)) + assert.Contains(t, mock.collected, makeGenericMessage(satel.DoorOpened, 2, true)) + assert.Contains(t, mock.collected, makeGenericMessage(satel.PartitionAlarm, 3, true)) + assert.Contains(t, mock.collected, makeGenericMessage(satel.PartitionFireAlarm, 4, true)) + assert.Contains(t, mock.collected, makeGenericMessage(satel.TroublePart1, 5, true)) + assert.Contains(t, mock.collected, makeGenericMessage(satel.ZoneTamper, 6, true)) + + assert.Len(t, mock2.collected, 6) + assert.Contains(t, mock2.collected, makeGenericMessage(satel.ArmedPartition, 1, true)) + assert.Contains(t, mock2.collected, makeGenericMessage(satel.DoorOpened, 2, true)) + assert.Contains(t, mock2.collected, makeGenericMessage(satel.PartitionAlarm, 3, true)) + assert.Contains(t, mock2.collected, makeGenericMessage(satel.PartitionFireAlarm, 4, true)) + assert.Contains(t, mock2.collected, makeGenericMessage(satel.TroublePart1, 5, true)) + assert.Contains(t, mock2.collected, makeGenericMessage(satel.ZoneTamper, 6, true)) +} diff --git a/test_utils.go b/test_utils.go index 006a7a1..b2c4572 100644 --- a/test_utils.go +++ b/test_utils.go @@ -7,3 +7,7 @@ func makeTestSatelEvent(changeType satel.ChangeType, index int, val bool) satel. BasicEvents: []satel.BasicEventElement{{Type: changeType, Index: index, Value: val}}, } } + +func makeGenericMessage(changeType satel.ChangeType, index int, val bool) GenericMessage { + return GenericMessage{[]satel.BasicEventElement{{Type: changeType, Index: index, Value: val}}} +}