1
0
Fork 0

FilterByLastSeen() persistence

This commit is contained in:
Michał Rudowicz 2024-02-15 20:06:06 +01:00
parent 919ddf569e
commit eca858c01e
2 changed files with 129 additions and 3 deletions

View File

@ -1,6 +1,12 @@
package main
import (
"encoding/gob"
"errors"
"io/fs"
"log"
"os"
"github.com/probakowski/go-satel"
)
@ -27,16 +33,70 @@ type EventKey struct {
Index int
}
func FilterByLastSeen(ev <-chan satel.Event) <-chan satel.Event {
type EventValue struct{ Value bool }
type PersistenceRecord struct {
Key EventKey
Value EventValue
}
func persistMapOnDisk(lastSeen *map[EventKey]bool, persistenceFilePath string) {
f, err := os.OpenFile(persistenceFilePath, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
panic(err)
}
defer f.Close()
enc := gob.NewEncoder(f)
data := make([]PersistenceRecord, len(*lastSeen))
i := 0
for k, v := range *lastSeen {
data[i] = PersistenceRecord{Key: k, Value: EventValue{v}}
i += 1
}
err = enc.Encode(data)
if err != nil {
panic(err)
}
}
func readMapFromDisk(persistenceFilePath string, logger *log.Logger) map[EventKey]bool {
lastSeen := make(map[EventKey]bool)
f, err := os.OpenFile(persistenceFilePath, os.O_RDONLY|os.O_CREATE, 0600)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
// File not existing is fine, we'll create one later
return lastSeen
}
panic(err)
}
defer f.Close()
dec := gob.NewDecoder(f)
data := make([]PersistenceRecord, 0)
err = dec.Decode(&data)
if err != nil {
logger.Println("LastSeenFilter: Error reading persistence file", persistenceFilePath, "from disk:", err, ". Discarding and starting over.")
return lastSeen
}
for _, readData := range data {
lastSeen[readData.Key] = readData.Value.Value
}
return lastSeen
}
func FilterByLastSeen(ev <-chan satel.Event, persistenceFilePath string, logger *log.Logger) <-chan satel.Event {
returnChan := make(chan satel.Event)
go func() {
lastSeen := make(map[EventKey]bool) // key: ChangeType/Index; value: Value
lastSeen := readMapFromDisk(persistenceFilePath, logger)
for e := range ev {
val, ok := lastSeen[EventKey{e.Type, e.Index}]
if !ok || val != e.Value {
lastSeen[EventKey{e.Type, e.Index}] = e.Value
returnChan <- e
persistMapOnDisk(&lastSeen, persistenceFilePath)
}
}
close(returnChan)

View File

@ -1,6 +1,9 @@
package main
import (
"io"
"log"
"os"
"sync"
"testing"
@ -37,13 +40,18 @@ func TestSatelEventTypeFiltering(t *testing.T) {
}
func TestSatelLastSeenFiltering(t *testing.T) {
f, err := os.CreateTemp("", "TestSatelLastSeenFiltering")
assert.NoError(t, err)
tempFileName := f.Name()
assert.NoError(t, f.Close())
defer os.Remove(f.Name())
testEvents := make(chan satel.Event)
receivedEvents := make([]satel.Event, 0)
wg := sync.WaitGroup{}
go func() {
wg.Add(1)
for e := range FilterByLastSeen(testEvents) {
for e := range FilterByLastSeen(testEvents, tempFileName, log.New(io.Discard, "", log.Ltime)) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
@ -64,3 +72,61 @@ func TestSatelLastSeenFiltering(t *testing.T) {
assert.Contains(t, receivedEvents, satel.Event{Type: satel.ArmedPartition, Index: 2, Value: true})
assert.Contains(t, receivedEvents, satel.Event{Type: satel.ArmedPartition, Index: 1, Value: false})
}
func TestSatelLastSeenFilteringWithPersistence(t *testing.T) {
f, err := os.CreateTemp("", "TestSatelLastSeenFilteringWithPersistence")
assert.NoError(t, err)
tempFileName := f.Name()
assert.NoError(t, f.Close())
defer os.Remove(f.Name())
testEvents := make(chan satel.Event)
receivedEvents := make([]satel.Event, 0)
wg := sync.WaitGroup{}
go func() {
wg.Add(1)
for e := range FilterByLastSeen(testEvents, tempFileName, log.New(io.Discard, "", log.Ltime)) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
}()
testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 1, Value: true}
testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 2, Value: true}
testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 1, Value: true}
testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 1, Value: false}
testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 2, Value: true}
testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 1, Value: false}
close(testEvents)
wg.Wait()
assert.Len(t, receivedEvents, 3)
assert.Contains(t, receivedEvents, satel.Event{Type: satel.ArmedPartition, Index: 1, Value: true})
assert.Contains(t, receivedEvents, satel.Event{Type: satel.ArmedPartition, Index: 2, Value: true})
assert.Contains(t, receivedEvents, satel.Event{Type: satel.ArmedPartition, Index: 1, Value: false})
testEvents = make(chan satel.Event)
receivedEvents = make([]satel.Event, 0)
go func() {
wg.Add(1)
for e := range FilterByLastSeen(testEvents, tempFileName, log.New(io.Discard, "", log.Ltime)) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
}()
receivedEvents = make([]satel.Event, 0)
testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 1, Value: false}
testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 1, Value: false}
testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 1, Value: true}
testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 2, Value: true}
testEvents <- satel.Event{Type: satel.ArmedPartition, Index: 2, Value: true}
close(testEvents)
wg.Wait()
assert.Len(t, receivedEvents, 1)
assert.Contains(t, receivedEvents, satel.Event{Type: satel.ArmedPartition, Index: 1, Value: true})
}