1
0
Fork 0

Compare commits

..

3 Commits

6 changed files with 146 additions and 71 deletions

126
data_store.go Normal file
View File

@ -0,0 +1,126 @@
package main
import (
"encoding/gob"
"errors"
"io/fs"
"log"
"os"
"sync"
"github.com/probakowski/go-satel"
)
var EXPECTED_PERSISTENCE_FILE_MAGIC = [...]byte{'H', 'S', 'W', 'R', 'O', 'A', 'L', 'A', 'R', 'M', 'B', 'O', 'T'}
const EXPECTED_PERSISTENCE_FILE_VERSION = 0
type EventKey struct {
ChangeType satel.ChangeType
Index int
}
type EventValue struct{ Value bool }
type LastSeenRecord struct {
Key EventKey
Value EventValue
}
type PersistenceData struct {
Magic [len(EXPECTED_PERSISTENCE_FILE_MAGIC)]byte
FileVersion uint32
LastSeen []LastSeenRecord
}
type DataStore struct {
mtx sync.Mutex
logger *log.Logger
persistenceFilePath string
lastSeen map[EventKey]EventValue
}
func loadSystemState(logger *log.Logger, persistenceFilePath string) map[EventKey]EventValue {
lastSeen := make(map[EventKey]EventValue)
f, err := os.OpenFile(persistenceFilePath, os.O_RDONLY|os.O_CREATE, 0600)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
// File not existing is fine, we'll create one later
return lastSeen
}
panic(err)
}
defer f.Close()
dec := gob.NewDecoder(f)
data := PersistenceData{}
err = dec.Decode(&data)
if err != nil {
logger.Println("Error reading persistence file", persistenceFilePath, "from disk:", err, ". Discarding and starting over.")
return lastSeen
}
if data.Magic != EXPECTED_PERSISTENCE_FILE_MAGIC {
logger.Println("Error reading persistence file", persistenceFilePath, "from disk: Wrong magic string. Discarding and starting over.")
return lastSeen
}
if data.FileVersion != EXPECTED_PERSISTENCE_FILE_VERSION {
logger.Println("Error reading persistence file", persistenceFilePath, "from disk: Wrong version: expected ",
EXPECTED_PERSISTENCE_FILE_VERSION, ", got ", data.FileVersion, ". Discarding and starting over.")
return lastSeen
}
for _, readData := range data.LastSeen {
lastSeen[readData.Key] = readData.Value
}
return lastSeen
}
func (self *DataStore) saveSystemState() {
f, err := os.OpenFile(self.persistenceFilePath, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
panic(err)
}
defer f.Close()
enc := gob.NewEncoder(f)
data := PersistenceData{
Magic: EXPECTED_PERSISTENCE_FILE_MAGIC,
FileVersion: EXPECTED_PERSISTENCE_FILE_VERSION,
LastSeen: make([]LastSeenRecord, len(self.lastSeen)),
}
i := 0
for k, v := range self.lastSeen {
data.LastSeen[i] = LastSeenRecord{Key: k, Value: v}
i += 1
}
err = enc.Encode(data)
if err != nil {
panic(err)
}
}
func MakeDataStore(logger *log.Logger, persistenceFilePath string) DataStore {
return DataStore{
logger: logger,
persistenceFilePath: persistenceFilePath,
lastSeen: loadSystemState(logger, persistenceFilePath),
}
}
func (self *DataStore) GetSystemState() map[EventKey]EventValue {
self.mtx.Lock()
defer self.mtx.Unlock()
copiedMap := make(map[EventKey]EventValue)
for key, value := range self.lastSeen {
copiedMap[key] = value
}
return copiedMap
}
func (self *DataStore) SetSystemState(key EventKey, value EventValue) {
self.mtx.Lock()
self.lastSeen[key] = value
self.saveSystemState()
self.mtx.Unlock()
}

View File

@ -1,11 +1,7 @@
package main package main
import ( import (
"encoding/gob"
"errors"
"io/fs"
"log" "log"
"os"
"github.com/probakowski/go-satel" "github.com/probakowski/go-satel"
) )
@ -66,75 +62,16 @@ func FilterByIndex(ev <-chan satel.Event, allowedIndexes []int) <-chan satel.Eve
return returnChan return returnChan
} }
type EventKey struct { func FilterByLastSeen(ev <-chan satel.Event, dataStore *DataStore, logger *log.Logger) <-chan satel.Event {
ChangeType satel.ChangeType
Index int
}
type EventValue struct{ Value bool }
type PersistenceRecord struct {
Key EventKey
Value EventValue
}
func persistMapOnDisk(lastSeen *map[EventKey]bool, persistenceFilePath string) {
f, err := os.OpenFile(persistenceFilePath, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
panic(err)
}
defer f.Close()
enc := gob.NewEncoder(f)
data := make([]PersistenceRecord, len(*lastSeen))
i := 0
for k, v := range *lastSeen {
data[i] = PersistenceRecord{Key: k, Value: EventValue{v}}
i += 1
}
err = enc.Encode(data)
if err != nil {
panic(err)
}
}
func readMapFromDisk(persistenceFilePath string, logger *log.Logger) map[EventKey]bool {
lastSeen := make(map[EventKey]bool)
f, err := os.OpenFile(persistenceFilePath, os.O_RDONLY|os.O_CREATE, 0600)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
// File not existing is fine, we'll create one later
return lastSeen
}
panic(err)
}
defer f.Close()
dec := gob.NewDecoder(f)
data := make([]PersistenceRecord, 0)
err = dec.Decode(&data)
if err != nil {
logger.Println("Error reading persistence file", persistenceFilePath, "from disk:", err, ". Discarding and starting over.")
return lastSeen
}
for _, readData := range data {
lastSeen[readData.Key] = readData.Value.Value
}
return lastSeen
}
func FilterByLastSeen(ev <-chan satel.Event, persistenceFilePath string, logger *log.Logger) <-chan satel.Event {
returnChan := make(chan satel.Event) returnChan := make(chan satel.Event)
go func() { go func() {
lastSeen := readMapFromDisk(persistenceFilePath, logger)
for e := range ev { for e := range ev {
lastSeen := dataStore.GetSystemState()
val, ok := lastSeen[EventKey{e.Type, e.Index}] val, ok := lastSeen[EventKey{e.Type, e.Index}]
if !ok || val != e.Value { if !ok || val.Value != e.Value {
lastSeen[EventKey{e.Type, e.Index}] = e.Value
returnChan <- e returnChan <- e
dataStore.SetSystemState(EventKey{e.Type, e.Index}, EventValue{e.Value})
persistMapOnDisk(&lastSeen, persistenceFilePath)
} }
} }
close(returnChan) close(returnChan)

View File

@ -129,10 +129,12 @@ func TestSatelLastSeenFiltering(t *testing.T) {
testEvents := make(chan satel.Event) testEvents := make(chan satel.Event)
receivedEvents := make([]satel.Event, 0) receivedEvents := make([]satel.Event, 0)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
fakeLog := log.New(io.Discard, "", log.Ltime)
ds := MakeDataStore(fakeLog, tempFileName)
go func() { go func() {
wg.Add(1) wg.Add(1)
for e := range FilterByLastSeen(testEvents, tempFileName, log.New(io.Discard, "", log.Ltime)) { for e := range FilterByLastSeen(testEvents, &ds, fakeLog) {
receivedEvents = append(receivedEvents, e) receivedEvents = append(receivedEvents, e)
} }
wg.Done() wg.Done()
@ -163,10 +165,12 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) {
testEvents := make(chan satel.Event) testEvents := make(chan satel.Event)
receivedEvents := make([]satel.Event, 0) receivedEvents := make([]satel.Event, 0)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
fakeLog := log.New(io.Discard, "", log.Ltime)
ds := MakeDataStore(fakeLog, tempFileName)
go func() { go func() {
wg.Add(1) wg.Add(1)
for e := range FilterByLastSeen(testEvents, tempFileName, log.New(io.Discard, "", log.Ltime)) { for e := range FilterByLastSeen(testEvents, &ds, fakeLog) {
receivedEvents = append(receivedEvents, e) receivedEvents = append(receivedEvents, e)
} }
wg.Done() wg.Done()
@ -189,9 +193,10 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) {
testEvents = make(chan satel.Event) testEvents = make(chan satel.Event)
receivedEvents = make([]satel.Event, 0) receivedEvents = make([]satel.Event, 0)
ds = MakeDataStore(fakeLog, tempFileName)
go func() { go func() {
wg.Add(1) wg.Add(1)
for e := range FilterByLastSeen(testEvents, tempFileName, log.New(io.Discard, "", log.Ltime)) { for e := range FilterByLastSeen(testEvents, &ds, fakeLog) {
receivedEvents = append(receivedEvents, e) receivedEvents = append(receivedEvents, e)
} }
wg.Done() wg.Done()

2
go.mod
View File

@ -13,3 +13,5 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )
replace github.com/probakowski/go-satel => git.sr.ht/~michalr/go-satel v0.0.0-20211120120346-bed9818777ce

2
go.sum
View File

@ -1,3 +1,5 @@
git.sr.ht/~michalr/go-satel v0.0.0-20211120120346-bed9818777ce h1:6navIHdH9NaXuAd6R2rACpqz2Das2Gu31E1JxT3Tv8w=
git.sr.ht/~michalr/go-satel v0.0.0-20211120120346-bed9818777ce/go.mod h1:q3DquDWRcoFWZ61dGZFg3snucolljixMoAzJIiCjWoY=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

View File

@ -18,6 +18,7 @@ import (
const ( const (
MessageNotMoreOftenThanSeconds = 15 MessageNotMoreOftenThanSeconds = 15
PersistenceFilename = "hs_wro_last_seen.bin"
) )
type TgSender struct { type TgSender struct {
@ -136,6 +137,8 @@ func main() {
tpl := template.Must(template.New("TelegramMessage").Parse(TelegramMessageTemplate)) tpl := template.Must(template.New("TelegramMessage").Parse(TelegramMessageTemplate))
dataStore := MakeDataStore(log.New(os.Stderr, "DataStore", log.Lmicroseconds), PersistenceFilename)
Consume( Consume(
SendToTg( SendToTg(
tgSenderWorker(tgEvents, &wg, sleeper, log.New(os.Stderr, "TgSender", log.Lmicroseconds)), tgSenderWorker(tgEvents, &wg, sleeper, log.New(os.Stderr, "TgSender", log.Lmicroseconds)),
@ -144,7 +147,7 @@ func main() {
go CloseSatelOnCtrlC(s) go CloseSatelOnCtrlC(s)
for e := range FilterByIndex(FilterByType( for e := range FilterByIndex(FilterByType(
FilterByLastSeen(s.Events, "hs_wro_last_seen.bin", log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)), FilterByLastSeen(s.Events, &dataStore, log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)),
allowedTypes), allowedTypes),
allowedIndexes) { allowedIndexes) {
logger.Print("Received change from SATEL: ", e) logger.Print("Received change from SATEL: ", e)