diff --git a/filters.go b/filters.go index a0d16c2..678314c 100644 --- a/filters.go +++ b/filters.go @@ -7,6 +7,177 @@ import ( "git.sr.ht/~michalr/go-satel" ) +type SyncFilter[MsgType any] interface { + Then(what SyncFilter[MsgType]) SyncFilter[MsgType] + Call(msg MsgType) +} + +type SyncFilterImpl[MsgType any] struct { + next SyncFilter[MsgType] +} + +func (impl *SyncFilterImpl[MsgType]) Then(what SyncFilter[MsgType]) SyncFilter[MsgType] { + impl.next = what + return what +} + +func (impl *SyncFilterImpl[MsgType]) CallNext(msg MsgType) { + if impl.next != nil { + impl.next.Call(msg) + } +} + +type CollectFromChannel[MsgType any] struct{ SyncFilterImpl[MsgType] } + +func (collect *CollectFromChannel[MsgType]) Call(msg MsgType) { + collect.CallNext(msg) +} + +func (collect CollectFromChannel[MsgType]) Collect(events <-chan MsgType, wg *sync.WaitGroup, onClose func()) { + wg.Add(1) + go func() { + defer wg.Done() + defer onClose() + + for e := range events { + collect.Call(e) + } + }() +} + +type ThrottleSync struct { + SyncFilterImpl[GenericMessage] + + events chan GenericMessage +} + +func appendToGenericMessage(msg *GenericMessage, new *GenericMessage) *GenericMessage { + if msg == nil { + msg = &GenericMessage{make([]satel.BasicEventElement, 0)} + } + +throughNewMessages: + for _, newEv := range new.Messages { + for i, oldEv := range msg.Messages { + if oldEv.Index == newEv.Index && oldEv.Type == newEv.Type { + // this message was seen - update its value + msg.Messages[i].Value = newEv.Value + continue throughNewMessages + } + } + // apparently this type of message was not yet seen, save it + msg.Messages = append(msg.Messages, newEv) + } + return msg +} + +func MakeThrottleSync(sleeper Sleeper, logger *log.Logger, wg *sync.WaitGroup) *ThrottleSync { + events := make(chan GenericMessage) + + throttle := ThrottleSync{SyncFilterImpl[GenericMessage]{}, events} + + wg.Add(1) + go func() { + timeoutEvents := make(chan interface{}) + var currentEvent *GenericMessage = nil + loop: + for { + select { + case ev, ok := <-events: + if !ok { + break loop + } + if currentEvent == nil { + logger.Print("Waiting for more messages to arrive before sending...") + sleeper.Sleep(timeoutEvents) + } + currentEvent = appendToGenericMessage(currentEvent, &ev) + case <-timeoutEvents: + logger.Print("Time's up, sending all messages we've got for now.") + throttle.CallNext(*currentEvent) + currentEvent = nil + } + } + + // If anything is left to be sent, send it now + if currentEvent != nil { + throttle.CallNext(*currentEvent) + } + wg.Done() + logger.Print("Throttling goroutine finishing") + }() + + return &throttle +} + +func (throttle *ThrottleSync) Close() { close(throttle.events) } + +func (throttle *ThrottleSync) Call(msg GenericMessage) { + throttle.events <- msg +} + +type Convert[InMsgType any] struct { + out SyncFilter[GenericMessage] + convert func(InMsgType) GenericMessage +} + +func MakeConvert[InMsgType any](convertFunc func(InMsgType) GenericMessage) *Convert[InMsgType] { + return &Convert[InMsgType]{nil, convertFunc} +} + +func (convert *Convert[InMsgType]) Call(msg InMsgType) { + if convert.out == nil { + panic("Use ConvertTo() to set next element in the chain.") + } + convert.out.Call(convert.convert(msg)) +} + +func (convert *Convert[InMsgType]) ConvertTo(out SyncFilter[GenericMessage]) SyncFilter[GenericMessage] { + convert.out = out + return out +} + +func (convert *Convert[InMsgType]) Then(_ SyncFilter[InMsgType]) SyncFilter[InMsgType] { + panic("Use ConvertTo() with Convert object instead of Then().") +} + +type FilterByLastSeen struct { + SyncFilterImpl[satel.Event] + dataStore *DataStore +} + +func MakeFilterByLastSeen(dataStore *DataStore) *FilterByLastSeen { + return &FilterByLastSeen{SyncFilterImpl[satel.Event]{}, dataStore} +} + +func (filter *FilterByLastSeen) Call(ev satel.Event) { + retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)} + for _, basicEventElement := range ev.BasicEvents { + lastSeen := filter.dataStore.GetSystemState() + val, ok := lastSeen[EventKey{basicEventElement.Type, basicEventElement.Index}] + if !ok || val.Value != basicEventElement.Value { + retEv.BasicEvents = append(retEv.BasicEvents, basicEventElement) + // TODO: flush to disk only after the loop finishes + filter.dataStore.SetSystemState(EventKey{basicEventElement.Type, basicEventElement.Index}, + EventValue{basicEventElement.Value}) + } + } + if len(retEv.BasicEvents) != 0 { + filter.CallNext(retEv) + } + +} + +type FilterByTypeOrIndex struct { + SyncFilterImpl[satel.Event] + allowedTypes []SatelChangeType + allowedIndexes []int +} + +func MakeFilterByTypeOrIndex(allowedTypes []SatelChangeType, allowedIndexes []int) *FilterByTypeOrIndex { + return &FilterByTypeOrIndex{SyncFilterImpl[satel.Event]{}, allowedTypes, allowedIndexes} +} + func isBasicEventElementOkay(basicEventElement satel.BasicEventElement, allowedTypes []SatelChangeType, allowedIndexes []int) bool { for _, allowedType := range allowedTypes { if allowedType.GetChangeType() == basicEventElement.Type { @@ -21,68 +192,19 @@ func isBasicEventElementOkay(basicEventElement satel.BasicEventElement, allowedT return false } -func FilterByTypeOrIndex(ev <-chan satel.Event, wg *sync.WaitGroup, allowedTypes []SatelChangeType, allowedIndexes []int) <-chan satel.Event { - returnChan := make(chan satel.Event) - - if (len(allowedTypes) == 0) && (len(allowedIndexes) == 0) { +func (filter *FilterByTypeOrIndex) Call(ev satel.Event) { + if (len(filter.allowedTypes) == 0) && (len(filter.allowedIndexes) == 0) { // no allowed types == all types are allowed - wg.Add(1) - go func() { - defer wg.Done() - defer close(returnChan) - - for e := range ev { - returnChan <- e - } - }() + filter.CallNext(ev) } else { - wg.Add(1) - go func() { - defer wg.Done() - defer close(returnChan) - - for e := range ev { - retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)} - for _, basicEventElement := range e.BasicEvents { - if isBasicEventElementOkay(basicEventElement, allowedTypes, allowedIndexes) { - retEv.BasicEvents = append(retEv.BasicEvents, basicEventElement) - } - } - if len(retEv.BasicEvents) != 0 { - returnChan <- retEv - } - } - }() - } - - return returnChan -} - -func FilterByLastSeen(ev <-chan satel.Event, wg *sync.WaitGroup, dataStore *DataStore, logger *log.Logger) <-chan satel.Event { - returnChan := make(chan satel.Event) - - wg.Add(1) - go func() { - defer wg.Done() - defer close(returnChan) - - for e := range ev { - retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)} - for _, basicEventElement := range e.BasicEvents { - lastSeen := dataStore.GetSystemState() - val, ok := lastSeen[EventKey{basicEventElement.Type, basicEventElement.Index}] - if !ok || val.Value != basicEventElement.Value { - retEv.BasicEvents = append(retEv.BasicEvents, basicEventElement) - // TODO: flush to disk only after the loop finishes - dataStore.SetSystemState(EventKey{basicEventElement.Type, basicEventElement.Index}, EventValue{basicEventElement.Value}) - } - } - if len(retEv.BasicEvents) != 0 { - returnChan <- retEv + retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)} + for _, basicEventElement := range ev.BasicEvents { + if isBasicEventElementOkay(basicEventElement, filter.allowedTypes, filter.allowedIndexes) { + retEv.BasicEvents = append(retEv.BasicEvents, basicEventElement) } } - logger.Print("Satel disconnected.") - }() - - return returnChan + if len(retEv.BasicEvents) != 0 { + filter.CallNext(retEv) + } + } } diff --git a/filters_sync.go b/filters_sync.go deleted file mode 100644 index f83e094..0000000 --- a/filters_sync.go +++ /dev/null @@ -1,187 +0,0 @@ -package main - -import ( - "log" - "sync" - - "git.sr.ht/~michalr/go-satel" -) - -type SyncFilter[MsgType any] interface { - Then(what SyncFilter[MsgType]) SyncFilter[MsgType] - Call(msg MsgType) -} - -type SyncFilterImpl[MsgType any] struct { - next SyncFilter[MsgType] -} - -func (impl *SyncFilterImpl[MsgType]) Then(what SyncFilter[MsgType]) SyncFilter[MsgType] { - impl.next = what - return what -} - -func (impl *SyncFilterImpl[MsgType]) CallNext(msg MsgType) { - if impl.next != nil { - impl.next.Call(msg) - } -} - -type CollectFromChannel[MsgType any] struct{ SyncFilterImpl[MsgType] } - -func (collect *CollectFromChannel[MsgType]) Call(msg MsgType) { - collect.CallNext(msg) -} - -func (collect CollectFromChannel[MsgType]) Collect(events <-chan MsgType, wg *sync.WaitGroup, onClose func()) { - wg.Add(1) - go func() { - defer wg.Done() - defer onClose() - - for e := range events { - collect.Call(e) - } - }() -} - -type ThrottleSync struct { - SyncFilterImpl[GenericMessage] - - events chan GenericMessage -} - -func appendToGenericMessage(msg *GenericMessage, new *GenericMessage) *GenericMessage { - if msg == nil { - msg = &GenericMessage{make([]satel.BasicEventElement, 0)} - } - -throughNewMessages: - for _, newEv := range new.Messages { - for i, oldEv := range msg.Messages { - if oldEv.Index == newEv.Index && oldEv.Type == newEv.Type { - // this message was seen - update its value - msg.Messages[i].Value = newEv.Value - continue throughNewMessages - } - } - // apparently this type of message was not yet seen, save it - msg.Messages = append(msg.Messages, newEv) - } - return msg -} - -func MakeThrottleSync(sleeper Sleeper, logger *log.Logger, wg *sync.WaitGroup) *ThrottleSync { - events := make(chan GenericMessage) - - throttle := ThrottleSync{SyncFilterImpl[GenericMessage]{}, events} - - wg.Add(1) - go func() { - timeoutEvents := make(chan interface{}) - var currentEvent *GenericMessage = nil - loop: - for { - select { - case ev, ok := <-events: - if !ok { - break loop - } - if currentEvent == nil { - logger.Print("Waiting for more messages to arrive before sending...") - sleeper.Sleep(timeoutEvents) - } - currentEvent = appendToGenericMessage(currentEvent, &ev) - case <-timeoutEvents: - logger.Print("Time's up, sending all messages we've got for now.") - throttle.CallNext(*currentEvent) - currentEvent = nil - } - } - - // If anything is left to be sent, send it now - if currentEvent != nil { - throttle.CallNext(*currentEvent) - } - wg.Done() - logger.Print("Throttling goroutine finishing") - }() - - return &throttle -} - -func (throttle *ThrottleSync) Close() { close(throttle.events) } - -func (throttle *ThrottleSync) Call(msg GenericMessage) { - throttle.events <- msg -} - -type Convert[InMsgType any] struct { - out SyncFilter[GenericMessage] - convert func(InMsgType) GenericMessage -} - -func MakeConvert[InMsgType any](convertFunc func(InMsgType) GenericMessage) *Convert[InMsgType] { - return &Convert[InMsgType]{nil, convertFunc} -} - -func (convert *Convert[InMsgType]) Call(msg InMsgType) { - if convert.out == nil { - panic("Use ConvertTo() to set next element in the chain.") - } - convert.out.Call(convert.convert(msg)) -} - -func (convert *Convert[InMsgType]) ConvertTo(out SyncFilter[GenericMessage]) { - convert.out = out -} - -func (convert *Convert[InMsgType]) Then(_ SyncFilter[InMsgType]) { - panic("Use ConvertTo() with Convert object instead of Then().") -} - -type FilterByLastSeenSync struct { - SyncFilterImpl[satel.Event] - dataStore *DataStore -} - -func (filter *FilterByLastSeenSync) Call(ev satel.Event) { - retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)} - for _, basicEventElement := range ev.BasicEvents { - lastSeen := filter.dataStore.GetSystemState() - val, ok := lastSeen[EventKey{basicEventElement.Type, basicEventElement.Index}] - if !ok || val.Value != basicEventElement.Value { - retEv.BasicEvents = append(retEv.BasicEvents, basicEventElement) - // TODO: flush to disk only after the loop finishes - filter.dataStore.SetSystemState(EventKey{basicEventElement.Type, basicEventElement.Index}, - EventValue{basicEventElement.Value}) - } - } - if len(retEv.BasicEvents) != 0 { - filter.CallNext(retEv) - } - -} - -type FilterByTypeOrIndexSync struct { - SyncFilterImpl[satel.Event] - allowedTypes []SatelChangeType - allowedIndexes []int -} - -func (filter *FilterByTypeOrIndexSync) Call(ev satel.Event) { - if (len(filter.allowedTypes) == 0) && (len(filter.allowedIndexes) == 0) { - // no allowed types == all types are allowed - filter.CallNext(ev) - } else { - retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)} - for _, basicEventElement := range ev.BasicEvents { - if isBasicEventElementOkay(basicEventElement, filter.allowedTypes, filter.allowedIndexes) { - retEv.BasicEvents = append(retEv.BasicEvents, basicEventElement) - } - } - if len(retEv.BasicEvents) != 0 { - filter.CallNext(retEv) - } - } -} diff --git a/filters_test.go b/filters_test.go index 69e8369..d24d330 100644 --- a/filters_test.go +++ b/filters_test.go @@ -12,115 +12,74 @@ import ( ) func TestSatelEventTypeFiltering(t *testing.T) { - testEvents := make(chan satel.Event) - receivedEvents := make([]satel.Event, 0) - wg := sync.WaitGroup{} + tested := MakeFilterByTypeOrIndex([]SatelChangeType{{satel.ArmedPartition}, {satel.PartitionFireAlarm}}, + []int{}) + mock := &GenericSyncMockFilter[satel.Event]{} - wg.Add(1) - go func() { - defer wg.Done() + tested.Then(mock) - for e := range FilterByTypeOrIndex(testEvents, &wg, []SatelChangeType{{satel.ArmedPartition}, {satel.PartitionFireAlarm}}, []int{}) { - receivedEvents = append(receivedEvents, e) - } - }() + tested.Call(makeTestSatelEvent(satel.ArmedPartition, 1, true)) + tested.Call(makeTestSatelEvent(satel.DoorOpened, 2, true)) + tested.Call(makeTestSatelEvent(satel.PartitionAlarm, 3, true)) + tested.Call(makeTestSatelEvent(satel.PartitionFireAlarm, 4, true)) + tested.Call(makeTestSatelEvent(satel.TroublePart1, 5, true)) + tested.Call(makeTestSatelEvent(satel.ZoneTamper, 6, true)) - testEvents <- makeTestSatelEvent(satel.ArmedPartition, 1, true) - testEvents <- makeTestSatelEvent(satel.DoorOpened, 2, true) - testEvents <- makeTestSatelEvent(satel.PartitionAlarm, 3, true) - testEvents <- makeTestSatelEvent(satel.PartitionFireAlarm, 4, true) - testEvents <- makeTestSatelEvent(satel.TroublePart1, 5, true) - testEvents <- makeTestSatelEvent(satel.ZoneTamper, 6, true) - - close(testEvents) - wg.Wait() - - assert.Len(t, receivedEvents, 2) - assert.Contains(t, receivedEvents, makeTestSatelEvent(satel.ArmedPartition, 1, true)) - assert.Contains(t, receivedEvents, makeTestSatelEvent(satel.PartitionFireAlarm, 4, true)) + assert.Len(t, mock.collected, 2) + assert.Contains(t, mock.collected, makeTestSatelEvent(satel.ArmedPartition, 1, true)) + assert.Contains(t, mock.collected, makeTestSatelEvent(satel.PartitionFireAlarm, 4, true)) } func TestSatelEventTypeFiltering_NoAllowedEventTypesMeansAllAreAllowed(t *testing.T) { - testEvents := make(chan satel.Event) - receivedEvents := make([]satel.Event, 0) - wg := sync.WaitGroup{} + tested := MakeFilterByTypeOrIndex([]SatelChangeType{}, + []int{}) + mock := &GenericSyncMockFilter[satel.Event]{} - wg.Add(1) - go func() { - defer wg.Done() - - for e := range FilterByTypeOrIndex(testEvents, &wg, []SatelChangeType{}, []int{}) { - receivedEvents = append(receivedEvents, e) - } - }() + tested.Then(mock) for index, ct := range SUPPORTED_CHANGE_TYPES { - testEvents <- makeTestSatelEvent(ct, index, true) + tested.Call(makeTestSatelEvent(ct, index, true)) } - close(testEvents) - wg.Wait() - - assert.Len(t, receivedEvents, len(SUPPORTED_CHANGE_TYPES)) + assert.Len(t, mock.collected, len(SUPPORTED_CHANGE_TYPES)) for index, ct := range SUPPORTED_CHANGE_TYPES { - assert.Contains(t, receivedEvents, makeTestSatelEvent(ct, index, true)) + assert.Contains(t, mock.collected, makeTestSatelEvent(ct, index, true)) } } func TestSatelIndexFiltering(t *testing.T) { - testEvents := make(chan satel.Event) - receivedEvents := make([]satel.Event, 0) - wg := sync.WaitGroup{} + tested := MakeFilterByTypeOrIndex([]SatelChangeType{}, []int{1, 3}) + mock := &GenericSyncMockFilter[satel.Event]{} - wg.Add(1) - go func() { - defer wg.Done() + tested.Then(mock) - for e := range FilterByTypeOrIndex(testEvents, &wg, []SatelChangeType{}, []int{1, 3}) { - receivedEvents = append(receivedEvents, e) - } - }() + tested.Call(makeTestSatelEvent(satel.ArmedPartition, 1, true)) + tested.Call(makeTestSatelEvent(satel.DoorOpened, 2, true)) + tested.Call(makeTestSatelEvent(satel.PartitionAlarm, 3, true)) + tested.Call(makeTestSatelEvent(satel.PartitionFireAlarm, 4, true)) + tested.Call(makeTestSatelEvent(satel.TroublePart1, 5, true)) + tested.Call(makeTestSatelEvent(satel.ZoneTamper, 6, true)) - testEvents <- makeTestSatelEvent(satel.ArmedPartition, 1, true) - testEvents <- makeTestSatelEvent(satel.DoorOpened, 2, true) - testEvents <- makeTestSatelEvent(satel.PartitionAlarm, 3, true) - testEvents <- makeTestSatelEvent(satel.PartitionFireAlarm, 4, true) - testEvents <- makeTestSatelEvent(satel.TroublePart1, 5, true) - testEvents <- makeTestSatelEvent(satel.ZoneTamper, 6, true) - - close(testEvents) - wg.Wait() - - assert.Len(t, receivedEvents, 2) - assert.Contains(t, receivedEvents, makeTestSatelEvent(satel.ArmedPartition, 1, true)) - assert.Contains(t, receivedEvents, makeTestSatelEvent(satel.PartitionAlarm, 3, true)) + assert.Len(t, mock.collected, 2) + assert.Contains(t, mock.collected, makeTestSatelEvent(satel.ArmedPartition, 1, true)) + assert.Contains(t, mock.collected, makeTestSatelEvent(satel.PartitionAlarm, 3, true)) } func TestSatelIndexFiltering_NoAllowedEventTypesMeansAllAreAllowed(t *testing.T) { - testEvents := make(chan satel.Event) - receivedEvents := make([]satel.Event, 0) - wg := sync.WaitGroup{} + tested := MakeFilterByTypeOrIndex([]SatelChangeType{{satel.ArmedPartition}, {satel.PartitionFireAlarm}}, + []int{}) + mock := &GenericSyncMockFilter[satel.Event]{} myReasonableMaxIndex := 100 // I wanted to use math.MaxInt at first, but it's kind of a waste of time here - wg.Add(1) - go func() { - defer wg.Done() - - for e := range FilterByTypeOrIndex(testEvents, &wg, []SatelChangeType{}, []int{}) { - receivedEvents = append(receivedEvents, e) - } - }() + tested.Then(mock) for i := 0; i < myReasonableMaxIndex; i++ { - testEvents <- makeTestSatelEvent(satel.ArmedPartition, i, true) + tested.Call(makeTestSatelEvent(satel.ArmedPartition, i, true)) } - close(testEvents) - wg.Wait() - - assert.Len(t, receivedEvents, myReasonableMaxIndex) + assert.Len(t, mock.collected, myReasonableMaxIndex) for i := 0; i < myReasonableMaxIndex; i++ { - assert.Contains(t, receivedEvents, makeTestSatelEvent(satel.ArmedPartition, i, true)) + assert.Contains(t, mock.collected, makeTestSatelEvent(satel.ArmedPartition, i, true)) } } @@ -130,35 +89,26 @@ func TestSatelLastSeenFiltering(t *testing.T) { tempFileName := f.Name() assert.NoError(t, f.Close()) defer os.Remove(f.Name()) - testEvents := make(chan satel.Event) - receivedEvents := make([]satel.Event, 0) - wg := sync.WaitGroup{} + fakeLog := log.New(io.Discard, "", log.Ltime) ds := MakeDataStore(fakeLog, tempFileName) - wg.Add(1) - go func() { - defer wg.Done() + tested := MakeFilterByLastSeen(&ds) + mock := &GenericSyncMockFilter[satel.Event]{} - for e := range FilterByLastSeen(testEvents, &wg, &ds, fakeLog) { - receivedEvents = append(receivedEvents, e) - } - }() + tested.Then(mock) - testEvents <- makeTestSatelEvent(satel.ArmedPartition, 1, true) - testEvents <- makeTestSatelEvent(satel.ArmedPartition, 2, true) - testEvents <- makeTestSatelEvent(satel.ArmedPartition, 1, true) - testEvents <- makeTestSatelEvent(satel.ArmedPartition, 1, false) - testEvents <- makeTestSatelEvent(satel.ArmedPartition, 2, true) - testEvents <- makeTestSatelEvent(satel.ArmedPartition, 1, false) + tested.Call(makeTestSatelEvent(satel.ArmedPartition, 1, true)) + tested.Call(makeTestSatelEvent(satel.ArmedPartition, 2, true)) + tested.Call(makeTestSatelEvent(satel.ArmedPartition, 1, true)) + tested.Call(makeTestSatelEvent(satel.ArmedPartition, 1, false)) + tested.Call(makeTestSatelEvent(satel.ArmedPartition, 2, true)) + tested.Call(makeTestSatelEvent(satel.ArmedPartition, 1, false)) - close(testEvents) - wg.Wait() - - assert.Len(t, receivedEvents, 3) - assert.Contains(t, receivedEvents, makeTestSatelEvent(satel.ArmedPartition, 1, true)) - assert.Contains(t, receivedEvents, makeTestSatelEvent(satel.ArmedPartition, 2, true)) - assert.Contains(t, receivedEvents, makeTestSatelEvent(satel.ArmedPartition, 1, false)) + assert.Len(t, mock.collected, 3) + assert.Contains(t, mock.collected, makeTestSatelEvent(satel.ArmedPartition, 1, true)) + assert.Contains(t, mock.collected, makeTestSatelEvent(satel.ArmedPartition, 2, true)) + assert.Contains(t, mock.collected, makeTestSatelEvent(satel.ArmedPartition, 1, false)) } func TestSatelLastSeenFilteringWithPersistence(t *testing.T) { @@ -167,61 +117,45 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) { tempFileName := f.Name() assert.NoError(t, f.Close()) defer os.Remove(f.Name()) - testEvents := make(chan satel.Event) - receivedEvents := make([]satel.Event, 0) - wg := sync.WaitGroup{} + fakeLog := log.New(io.Discard, "", log.Ltime) ds := MakeDataStore(fakeLog, tempFileName) - wg.Add(1) - go func() { - defer wg.Done() + tested := MakeFilterByLastSeen(&ds) + mock := &GenericSyncMockFilter[satel.Event]{} - for e := range FilterByLastSeen(testEvents, &wg, &ds, fakeLog) { - receivedEvents = append(receivedEvents, e) - } - }() + tested.Then(mock) - testEvents <- makeTestSatelEvent(satel.ArmedPartition, 1, true) - testEvents <- makeTestSatelEvent(satel.ArmedPartition, 2, true) - testEvents <- makeTestSatelEvent(satel.ArmedPartition, 1, true) - testEvents <- makeTestSatelEvent(satel.ArmedPartition, 1, false) - testEvents <- makeTestSatelEvent(satel.ArmedPartition, 2, true) - testEvents <- makeTestSatelEvent(satel.ArmedPartition, 1, false) + tested.Call(makeTestSatelEvent(satel.ArmedPartition, 1, true)) + tested.Call(makeTestSatelEvent(satel.ArmedPartition, 2, true)) + tested.Call(makeTestSatelEvent(satel.ArmedPartition, 1, true)) + tested.Call(makeTestSatelEvent(satel.ArmedPartition, 1, false)) + tested.Call(makeTestSatelEvent(satel.ArmedPartition, 2, true)) + tested.Call(makeTestSatelEvent(satel.ArmedPartition, 1, false)) - close(testEvents) - wg.Wait() + assert.Len(t, mock.collected, 3) + assert.Contains(t, mock.collected, makeTestSatelEvent(satel.ArmedPartition, 1, true)) + assert.Contains(t, mock.collected, makeTestSatelEvent(satel.ArmedPartition, 2, true)) + assert.Contains(t, mock.collected, makeTestSatelEvent(satel.ArmedPartition, 1, false)) - assert.Len(t, receivedEvents, 3) - assert.Contains(t, receivedEvents, makeTestSatelEvent(satel.ArmedPartition, 1, true)) - assert.Contains(t, receivedEvents, makeTestSatelEvent(satel.ArmedPartition, 2, true)) - assert.Contains(t, receivedEvents, makeTestSatelEvent(satel.ArmedPartition, 1, false)) + tested = nil + mock = nil - testEvents = make(chan satel.Event) - receivedEvents = make([]satel.Event, 0) - ds = MakeDataStore(fakeLog, tempFileName) - wg.Add(1) - go func() { - defer wg.Done() + ds2 := MakeDataStore(fakeLog, tempFileName) - for e := range FilterByLastSeen(testEvents, &wg, &ds, fakeLog) { - receivedEvents = append(receivedEvents, e) - } - }() + tested2 := MakeFilterByLastSeen(&ds2) + mock2 := &GenericSyncMockFilter[satel.Event]{} - receivedEvents = make([]satel.Event, 0) + tested2.Then(mock2) - testEvents <- makeTestSatelEvent(satel.ArmedPartition, 1, false) - testEvents <- makeTestSatelEvent(satel.ArmedPartition, 1, false) - testEvents <- makeTestSatelEvent(satel.ArmedPartition, 1, true) - testEvents <- makeTestSatelEvent(satel.ArmedPartition, 2, true) - testEvents <- makeTestSatelEvent(satel.ArmedPartition, 2, true) + tested2.Call(makeTestSatelEvent(satel.ArmedPartition, 1, false)) + tested2.Call(makeTestSatelEvent(satel.ArmedPartition, 1, false)) + tested2.Call(makeTestSatelEvent(satel.ArmedPartition, 1, true)) + tested2.Call(makeTestSatelEvent(satel.ArmedPartition, 2, true)) + tested2.Call(makeTestSatelEvent(satel.ArmedPartition, 2, true)) - close(testEvents) - wg.Wait() - - assert.Len(t, receivedEvents, 1) - assert.Contains(t, receivedEvents, makeTestSatelEvent(satel.ArmedPartition, 1, true)) + assert.Len(t, mock2.collected, 1) + assert.Contains(t, mock2.collected, makeTestSatelEvent(satel.ArmedPartition, 1, true)) } type MockSleeper struct { diff --git a/main.go b/main.go index 7305947..a7f255d 100644 --- a/main.go +++ b/main.go @@ -80,7 +80,14 @@ func main() { dataStore := MakeDataStore(log.New(os.Stderr, "DataStore", log.Lmicroseconds), getPersistenceFilePath()) - collect := CollectFromChannel[GenericMessage]{} + collect := CollectFromChannel[satel.Event]{} + + filterByLastSeen := MakeFilterByLastSeen(&dataStore) + filterByTypeOrIndex := MakeFilterByTypeOrIndex(config.AllowedTypes, config.AllowedIndexes) + convert := MakeConvert( + func(ev satel.Event) GenericMessage { return GenericMessage{ev.BasicEvents} }, + ) + notifyViaHttp := MakeNofityViaHTTPSync(config, log.New(os.Stderr, "HTTPNotify", log.Lmicroseconds)) throttle := MakeThrottleSync(sleeper, log.New(os.Stderr, "MessageThrottle", log.Lmicroseconds), &wg) sendToTg := MakeSendToTelegramSync(tgSender, log.New(os.Stderr, "SendToTg", log.Lmicroseconds), tgTpl) @@ -88,8 +95,16 @@ func main() { log.New(os.Stderr, "SendToMatterbridge", log.Lmicroseconds), ircTpl) - collect.Then(notifyViaHttp).Then(throttle).Then(sendToTg).Then(sendToMatterbridge) - collect.Collect(tgEvents, &wg, func() {}) + collect.Then(filterByLastSeen). + Then(filterByTypeOrIndex). + Then(convert) + + convert.ConvertTo(notifyViaHttp). + Then(throttle). + Then(sendToTg). + Then(sendToMatterbridge) + + collect.Collect(s.Events, &wg, func() {}) go CloseSatelOnCtrlC(s, &cleanShutdown) @@ -98,13 +113,6 @@ func main() { WriteMemoryProfilePeriodically(&wg, log.New(os.Stderr, "DebugTools", log.Lmicroseconds), closeDebugTools) } - for e := range FilterByTypeOrIndex( - FilterByLastSeen(s.Events, &wg, &dataStore, log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)), - &wg, config.AllowedTypes, config.AllowedIndexes) { - logger.Print("Received change from SATEL: ", e) - tgEvents <- GenericMessage{e.BasicEvents} - } - logger.Print("Closing...") close(closeDebugTools) close(tgEvents)