From 6d0c67f92a8a087284ad440524f5714ad9015559 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Rudowicz?= Date: Wed, 28 Feb 2024 21:26:20 +0100 Subject: [PATCH] Separate DataStore object to allow for querying from separate place --- data_store.go | 126 ++++++++++++++++++++++++++++++++++++++++++++++++ filters.go | 71 ++------------------------- filters_test.go | 11 +++-- main.go | 5 +- 4 files changed, 142 insertions(+), 71 deletions(-) create mode 100644 data_store.go diff --git a/data_store.go b/data_store.go new file mode 100644 index 0000000..12d2f4c --- /dev/null +++ b/data_store.go @@ -0,0 +1,126 @@ +package main + +import ( + "encoding/gob" + "errors" + "io/fs" + "log" + "os" + "sync" + + "github.com/probakowski/go-satel" +) + +var EXPECTED_PERSISTENCE_FILE_MAGIC = [...]byte{'H', 'S', 'W', 'R', 'O', 'A', 'L', 'A', 'R', 'M', 'B', 'O', 'T'} + +const EXPECTED_PERSISTENCE_FILE_VERSION = 0 + +type EventKey struct { + ChangeType satel.ChangeType + Index int +} + +type EventValue struct{ Value bool } + +type LastSeenRecord struct { + Key EventKey + Value EventValue +} + +type PersistenceData struct { + Magic [len(EXPECTED_PERSISTENCE_FILE_MAGIC)]byte + FileVersion uint32 + LastSeen []LastSeenRecord +} + +type DataStore struct { + mtx sync.Mutex + logger *log.Logger + persistenceFilePath string + + lastSeen map[EventKey]EventValue +} + +func loadSystemState(logger *log.Logger, persistenceFilePath string) map[EventKey]EventValue { + lastSeen := make(map[EventKey]EventValue) + f, err := os.OpenFile(persistenceFilePath, os.O_RDONLY|os.O_CREATE, 0600) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + // File not existing is fine, we'll create one later + return lastSeen + } + panic(err) + } + defer f.Close() + + dec := gob.NewDecoder(f) + data := PersistenceData{} + err = dec.Decode(&data) + if err != nil { + logger.Println("Error reading persistence file", persistenceFilePath, "from disk:", err, ". Discarding and starting over.") + return lastSeen + } + if data.Magic != EXPECTED_PERSISTENCE_FILE_MAGIC { + logger.Println("Error reading persistence file", persistenceFilePath, "from disk: Wrong magic string. Discarding and starting over.") + return lastSeen + } + if data.FileVersion != EXPECTED_PERSISTENCE_FILE_VERSION { + logger.Println("Error reading persistence file", persistenceFilePath, "from disk: Wrong version: expected ", + EXPECTED_PERSISTENCE_FILE_VERSION, ", got ", data.FileVersion, ". Discarding and starting over.") + return lastSeen + } + for _, readData := range data.LastSeen { + lastSeen[readData.Key] = readData.Value + } + + return lastSeen +} + +func (self *DataStore) saveSystemState() { + f, err := os.OpenFile(self.persistenceFilePath, os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + panic(err) + } + defer f.Close() + enc := gob.NewEncoder(f) + data := PersistenceData{ + Magic: EXPECTED_PERSISTENCE_FILE_MAGIC, + FileVersion: EXPECTED_PERSISTENCE_FILE_VERSION, + LastSeen: make([]LastSeenRecord, len(self.lastSeen)), + } + i := 0 + for k, v := range self.lastSeen { + data.LastSeen[i] = LastSeenRecord{Key: k, Value: v} + i += 1 + } + err = enc.Encode(data) + if err != nil { + panic(err) + } +} + +func MakeDataStore(logger *log.Logger, persistenceFilePath string) DataStore { + return DataStore{ + logger: logger, + persistenceFilePath: persistenceFilePath, + lastSeen: loadSystemState(logger, persistenceFilePath), + } +} + +func (self *DataStore) GetSystemState() map[EventKey]EventValue { + self.mtx.Lock() + defer self.mtx.Unlock() + + copiedMap := make(map[EventKey]EventValue) + for key, value := range self.lastSeen { + copiedMap[key] = value + } + return copiedMap +} + +func (self *DataStore) SetSystemState(key EventKey, value EventValue) { + self.mtx.Lock() + self.lastSeen[key] = value + self.saveSystemState() + self.mtx.Unlock() +} diff --git a/filters.go b/filters.go index b59daf0..041615c 100644 --- a/filters.go +++ b/filters.go @@ -1,11 +1,7 @@ package main import ( - "encoding/gob" - "errors" - "io/fs" "log" - "os" "github.com/probakowski/go-satel" ) @@ -66,75 +62,16 @@ func FilterByIndex(ev <-chan satel.Event, allowedIndexes []int) <-chan satel.Eve return returnChan } -type EventKey struct { - ChangeType satel.ChangeType - Index int -} - -type EventValue struct{ Value bool } - -type PersistenceRecord struct { - Key EventKey - Value EventValue -} - -func persistMapOnDisk(lastSeen *map[EventKey]bool, persistenceFilePath string) { - f, err := os.OpenFile(persistenceFilePath, os.O_WRONLY|os.O_CREATE, 0600) - if err != nil { - panic(err) - } - defer f.Close() - enc := gob.NewEncoder(f) - data := make([]PersistenceRecord, len(*lastSeen)) - i := 0 - for k, v := range *lastSeen { - data[i] = PersistenceRecord{Key: k, Value: EventValue{v}} - i += 1 - } - err = enc.Encode(data) - if err != nil { - panic(err) - } -} - -func readMapFromDisk(persistenceFilePath string, logger *log.Logger) map[EventKey]bool { - lastSeen := make(map[EventKey]bool) - f, err := os.OpenFile(persistenceFilePath, os.O_RDONLY|os.O_CREATE, 0600) - if err != nil { - if errors.Is(err, fs.ErrNotExist) { - // File not existing is fine, we'll create one later - return lastSeen - } - panic(err) - } - defer f.Close() - - dec := gob.NewDecoder(f) - data := make([]PersistenceRecord, 0) - err = dec.Decode(&data) - if err != nil { - logger.Println("Error reading persistence file", persistenceFilePath, "from disk:", err, ". Discarding and starting over.") - return lastSeen - } - for _, readData := range data { - lastSeen[readData.Key] = readData.Value.Value - } - - return lastSeen -} - -func FilterByLastSeen(ev <-chan satel.Event, persistenceFilePath string, logger *log.Logger) <-chan satel.Event { +func FilterByLastSeen(ev <-chan satel.Event, dataStore *DataStore, logger *log.Logger) <-chan satel.Event { returnChan := make(chan satel.Event) go func() { - lastSeen := readMapFromDisk(persistenceFilePath, logger) for e := range ev { + lastSeen := dataStore.GetSystemState() val, ok := lastSeen[EventKey{e.Type, e.Index}] - if !ok || val != e.Value { - lastSeen[EventKey{e.Type, e.Index}] = e.Value + if !ok || val.Value != e.Value { returnChan <- e - - persistMapOnDisk(&lastSeen, persistenceFilePath) + dataStore.SetSystemState(EventKey{e.Type, e.Index}, EventValue{e.Value}) } } close(returnChan) diff --git a/filters_test.go b/filters_test.go index 39b3fa8..7b0117d 100644 --- a/filters_test.go +++ b/filters_test.go @@ -129,10 +129,12 @@ func TestSatelLastSeenFiltering(t *testing.T) { testEvents := make(chan satel.Event) receivedEvents := make([]satel.Event, 0) wg := sync.WaitGroup{} + fakeLog := log.New(io.Discard, "", log.Ltime) + ds := MakeDataStore(fakeLog, tempFileName) go func() { wg.Add(1) - for e := range FilterByLastSeen(testEvents, tempFileName, log.New(io.Discard, "", log.Ltime)) { + for e := range FilterByLastSeen(testEvents, &ds, fakeLog) { receivedEvents = append(receivedEvents, e) } wg.Done() @@ -163,10 +165,12 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) { testEvents := make(chan satel.Event) receivedEvents := make([]satel.Event, 0) wg := sync.WaitGroup{} + fakeLog := log.New(io.Discard, "", log.Ltime) + ds := MakeDataStore(fakeLog, tempFileName) go func() { wg.Add(1) - for e := range FilterByLastSeen(testEvents, tempFileName, log.New(io.Discard, "", log.Ltime)) { + for e := range FilterByLastSeen(testEvents, &ds, fakeLog) { receivedEvents = append(receivedEvents, e) } wg.Done() @@ -189,9 +193,10 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) { testEvents = make(chan satel.Event) receivedEvents = make([]satel.Event, 0) + ds = MakeDataStore(fakeLog, tempFileName) go func() { wg.Add(1) - for e := range FilterByLastSeen(testEvents, tempFileName, log.New(io.Discard, "", log.Ltime)) { + for e := range FilterByLastSeen(testEvents, &ds, fakeLog) { receivedEvents = append(receivedEvents, e) } wg.Done() diff --git a/main.go b/main.go index 3560cd6..b44ac84 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,7 @@ import ( const ( MessageNotMoreOftenThanSeconds = 15 + PersistenceFilename = "hs_wro_last_seen.bin" ) type TgSender struct { @@ -136,6 +137,8 @@ func main() { tpl := template.Must(template.New("TelegramMessage").Parse(TelegramMessageTemplate)) + dataStore := MakeDataStore(log.New(os.Stderr, "DataStore", log.Lmicroseconds), PersistenceFilename) + Consume( SendToTg( tgSenderWorker(tgEvents, &wg, sleeper, log.New(os.Stderr, "TgSender", log.Lmicroseconds)), @@ -144,7 +147,7 @@ func main() { go CloseSatelOnCtrlC(s) for e := range FilterByIndex(FilterByType( - FilterByLastSeen(s.Events, "hs_wro_last_seen.bin", log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)), + FilterByLastSeen(s.Events, &dataStore, log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)), allowedTypes), allowedIndexes) { logger.Print("Received change from SATEL: ", e)