1
0
Fork 0

Initial last synchronous filters

This commit is contained in:
Michał Rudowicz 2025-01-09 07:56:30 +01:00
parent f3fdfd4bbc
commit 7a8aeb4a0d
1 changed files with 46 additions and 0 deletions

View File

@ -139,3 +139,49 @@ func (convert *Convert[InMsgType]) ConvertTo(out SyncFilter[GenericMessage]) {
func (convert *Convert[InMsgType]) Then(_ SyncFilter[InMsgType]) {
panic("Use ConvertTo() with Convert object instead of Then().")
}
type FilterByLastSeenSync struct {
SyncFilterImpl[satel.Event]
dataStore *DataStore
}
func (filter *FilterByLastSeenSync) Call(ev satel.Event) {
retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)}
for _, basicEventElement := range ev.BasicEvents {
lastSeen := filter.dataStore.GetSystemState()
val, ok := lastSeen[EventKey{basicEventElement.Type, basicEventElement.Index}]
if !ok || val.Value != basicEventElement.Value {
retEv.BasicEvents = append(retEv.BasicEvents, basicEventElement)
// TODO: flush to disk only after the loop finishes
filter.dataStore.SetSystemState(EventKey{basicEventElement.Type, basicEventElement.Index},
EventValue{basicEventElement.Value})
}
}
if len(retEv.BasicEvents) != 0 {
filter.CallNext(retEv)
}
}
type FilterByTypeOrIndexSync struct {
SyncFilterImpl[satel.Event]
allowedTypes []SatelChangeType
allowedIndexes []int
}
func (filter *FilterByTypeOrIndexSync) Call(ev satel.Event) {
if (len(filter.allowedTypes) == 0) && (len(filter.allowedIndexes) == 0) {
// no allowed types == all types are allowed
filter.CallNext(ev)
} else {
retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)}
for _, basicEventElement := range ev.BasicEvents {
if isBasicEventElementOkay(basicEventElement, filter.allowedTypes, filter.allowedIndexes) {
retEv.BasicEvents = append(retEv.BasicEvents, basicEventElement)
}
}
if len(retEv.BasicEvents) != 0 {
filter.CallNext(retEv)
}
}
}