diff --git a/filters.go b/filters.go index 9923d22..055263e 100644 --- a/filters.go +++ b/filters.go @@ -1,6 +1,12 @@ package main import ( + "encoding/gob" + "errors" + "io/fs" + "log" + "os" + "github.com/probakowski/go-satel" ) @@ -27,16 +33,70 @@ type EventKey struct { Index int } -func FilterByLastSeen(ev <-chan satel.Event) <-chan satel.Event { +type EventValue struct{ Value bool } + +type PersistenceRecord struct { + Key EventKey + Value EventValue +} + +func persistMapOnDisk(lastSeen *map[EventKey]bool, persistenceFilePath string) { + f, err := os.OpenFile(persistenceFilePath, os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + panic(err) + } + defer f.Close() + enc := gob.NewEncoder(f) + data := make([]PersistenceRecord, len(*lastSeen)) + i := 0 + for k, v := range *lastSeen { + data[i] = PersistenceRecord{Key: k, Value: EventValue{v}} + i += 1 + } + err = enc.Encode(data) + if err != nil { + panic(err) + } +} + +func readMapFromDisk(persistenceFilePath string, logger *log.Logger) map[EventKey]bool { + lastSeen := make(map[EventKey]bool) + f, err := os.OpenFile(persistenceFilePath, os.O_RDONLY|os.O_CREATE, 0600) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + // File not existing is fine, we'll create one later + return lastSeen + } + panic(err) + } + defer f.Close() + + dec := gob.NewDecoder(f) + data := make([]PersistenceRecord, 0) + err = dec.Decode(&data) + if err != nil { + logger.Println("LastSeenFilter: Error reading persistence file", persistenceFilePath, "from disk:", err, ". Discarding and starting over.") + return lastSeen + } + for _, readData := range data { + lastSeen[readData.Key] = readData.Value.Value + } + + return lastSeen +} + +func FilterByLastSeen(ev <-chan satel.Event, persistenceFilePath string, logger *log.Logger) <-chan satel.Event { returnChan := make(chan satel.Event) go func() { - lastSeen := make(map[EventKey]bool) // key: ChangeType/Index; value: Value + lastSeen := readMapFromDisk(persistenceFilePath, logger) for e := range ev { val, ok := lastSeen[EventKey{e.Type, e.Index}] if !ok || val != e.Value { lastSeen[EventKey{e.Type, e.Index}] = e.Value returnChan <- e + + persistMapOnDisk(&lastSeen, persistenceFilePath) } } close(returnChan) diff --git a/filters_test.go b/filters_test.go index 1058d40..e712104 100644 --- a/filters_test.go +++ b/filters_test.go @@ -1,6 +1,9 @@ package main import ( + "io" + "log" + "os" "sync" "testing" @@ -37,13 +40,18 @@ func TestSatelEventTypeFiltering(t *testing.T) { } func TestSatelLastSeenFiltering(t *testing.T) { + f, err := os.CreateTemp("", "TestSatelLastSeenFiltering") + assert.NoError(t, err) + tempFileName := f.Name() + assert.NoError(t, f.Close()) + defer os.Remove(f.Name()) testEvents := make(chan satel.Event) receivedEvents := make([]satel.Event, 0) wg := sync.WaitGroup{} go func() { wg.Add(1) - for e := range FilterByLastSeen(testEvents) { + for e := range FilterByLastSeen(testEvents, tempFileName, log.New(io.Discard, "", log.Ltime)) { receivedEvents = append(receivedEvents, e) } wg.Done() @@ -64,3 +72,61 @@ func TestSatelLastSeenFiltering(t *testing.T) { assert.Contains(t, receivedEvents, satel.Event{Type: satel.ArmedPartition, Index: 2, Value: true}) assert.Contains(t, receivedEvents, satel.Event{Type: satel.ArmedPartition, Index: 1, Value: false}) } + +func TestSatelLastSeenFilteringWithPersistence(t *testing.T) { + f, err := os.CreateTemp("", "TestSatelLastSeenFilteringWithPersistence") + assert.NoError(t, err) + tempFileName := f.Name() + assert.NoError(t, f.Close()) + defer os.Remove(f.Name()) + testEvents := make(chan satel.Event) + receivedEvents := make([]satel.Event, 0) + wg := sync.WaitGroup{} + + go func() { + wg.Add(1) + for e := range FilterByLastSeen(testEvents, tempFileName, log.New(io.Discard, "", log.Ltime)) { + receivedEvents = append(receivedEvents, e) + } + wg.Done() + }() + + testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 1, Value: true} + testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 2, Value: true} + testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 1, Value: true} + testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 1, Value: false} + testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 2, Value: true} + testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 1, Value: false} + + close(testEvents) + wg.Wait() + + assert.Len(t, receivedEvents, 3) + assert.Contains(t, receivedEvents, satel.Event{Type: satel.ArmedPartition, Index: 1, Value: true}) + assert.Contains(t, receivedEvents, satel.Event{Type: satel.ArmedPartition, Index: 2, Value: true}) + assert.Contains(t, receivedEvents, satel.Event{Type: satel.ArmedPartition, Index: 1, Value: false}) + + testEvents = make(chan satel.Event) + receivedEvents = make([]satel.Event, 0) + go func() { + wg.Add(1) + for e := range FilterByLastSeen(testEvents, tempFileName, log.New(io.Discard, "", log.Ltime)) { + receivedEvents = append(receivedEvents, e) + } + wg.Done() + }() + + receivedEvents = make([]satel.Event, 0) + + testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 1, Value: false} + testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 1, Value: false} + testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 1, Value: true} + testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 2, Value: true} + testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 2, Value: true} + + close(testEvents) + wg.Wait() + + assert.Len(t, receivedEvents, 1) + assert.Contains(t, receivedEvents, satel.Event{Type: satel.ArmedPartition, Index: 1, Value: true}) +}