Compare commits
No commits in common. "0cb91aed62912cc45ec7e39e0c87d826b9503fd2" and "a8fdc59d5eb82dc17e0423880ad8ad458cb4e645" have entirely different histories.
0cb91aed62
...
a8fdc59d5e
126
data_store.go
126
data_store.go
|
@ -1,126 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/gob"
|
|
||||||
"errors"
|
|
||||||
"io/fs"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/probakowski/go-satel"
|
|
||||||
)
|
|
||||||
|
|
||||||
var EXPECTED_PERSISTENCE_FILE_MAGIC = [...]byte{'H', 'S', 'W', 'R', 'O', 'A', 'L', 'A', 'R', 'M', 'B', 'O', 'T'}
|
|
||||||
|
|
||||||
const EXPECTED_PERSISTENCE_FILE_VERSION = 0
|
|
||||||
|
|
||||||
type EventKey struct {
|
|
||||||
ChangeType satel.ChangeType
|
|
||||||
Index int
|
|
||||||
}
|
|
||||||
|
|
||||||
type EventValue struct{ Value bool }
|
|
||||||
|
|
||||||
type LastSeenRecord struct {
|
|
||||||
Key EventKey
|
|
||||||
Value EventValue
|
|
||||||
}
|
|
||||||
|
|
||||||
type PersistenceData struct {
|
|
||||||
Magic [len(EXPECTED_PERSISTENCE_FILE_MAGIC)]byte
|
|
||||||
FileVersion uint32
|
|
||||||
LastSeen []LastSeenRecord
|
|
||||||
}
|
|
||||||
|
|
||||||
type DataStore struct {
|
|
||||||
mtx sync.Mutex
|
|
||||||
logger *log.Logger
|
|
||||||
persistenceFilePath string
|
|
||||||
|
|
||||||
lastSeen map[EventKey]EventValue
|
|
||||||
}
|
|
||||||
|
|
||||||
func loadSystemState(logger *log.Logger, persistenceFilePath string) map[EventKey]EventValue {
|
|
||||||
lastSeen := make(map[EventKey]EventValue)
|
|
||||||
f, err := os.OpenFile(persistenceFilePath, os.O_RDONLY|os.O_CREATE, 0600)
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, fs.ErrNotExist) {
|
|
||||||
// File not existing is fine, we'll create one later
|
|
||||||
return lastSeen
|
|
||||||
}
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
|
|
||||||
dec := gob.NewDecoder(f)
|
|
||||||
data := PersistenceData{}
|
|
||||||
err = dec.Decode(&data)
|
|
||||||
if err != nil {
|
|
||||||
logger.Println("Error reading persistence file", persistenceFilePath, "from disk:", err, ". Discarding and starting over.")
|
|
||||||
return lastSeen
|
|
||||||
}
|
|
||||||
if data.Magic != EXPECTED_PERSISTENCE_FILE_MAGIC {
|
|
||||||
logger.Println("Error reading persistence file", persistenceFilePath, "from disk: Wrong magic string. Discarding and starting over.")
|
|
||||||
return lastSeen
|
|
||||||
}
|
|
||||||
if data.FileVersion != EXPECTED_PERSISTENCE_FILE_VERSION {
|
|
||||||
logger.Println("Error reading persistence file", persistenceFilePath, "from disk: Wrong version: expected ",
|
|
||||||
EXPECTED_PERSISTENCE_FILE_VERSION, ", got ", data.FileVersion, ". Discarding and starting over.")
|
|
||||||
return lastSeen
|
|
||||||
}
|
|
||||||
for _, readData := range data.LastSeen {
|
|
||||||
lastSeen[readData.Key] = readData.Value
|
|
||||||
}
|
|
||||||
|
|
||||||
return lastSeen
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *DataStore) saveSystemState() {
|
|
||||||
f, err := os.OpenFile(self.persistenceFilePath, os.O_WRONLY|os.O_CREATE, 0600)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
enc := gob.NewEncoder(f)
|
|
||||||
data := PersistenceData{
|
|
||||||
Magic: EXPECTED_PERSISTENCE_FILE_MAGIC,
|
|
||||||
FileVersion: EXPECTED_PERSISTENCE_FILE_VERSION,
|
|
||||||
LastSeen: make([]LastSeenRecord, len(self.lastSeen)),
|
|
||||||
}
|
|
||||||
i := 0
|
|
||||||
for k, v := range self.lastSeen {
|
|
||||||
data.LastSeen[i] = LastSeenRecord{Key: k, Value: v}
|
|
||||||
i += 1
|
|
||||||
}
|
|
||||||
err = enc.Encode(data)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func MakeDataStore(logger *log.Logger, persistenceFilePath string) DataStore {
|
|
||||||
return DataStore{
|
|
||||||
logger: logger,
|
|
||||||
persistenceFilePath: persistenceFilePath,
|
|
||||||
lastSeen: loadSystemState(logger, persistenceFilePath),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *DataStore) GetSystemState() map[EventKey]EventValue {
|
|
||||||
self.mtx.Lock()
|
|
||||||
defer self.mtx.Unlock()
|
|
||||||
|
|
||||||
copiedMap := make(map[EventKey]EventValue)
|
|
||||||
for key, value := range self.lastSeen {
|
|
||||||
copiedMap[key] = value
|
|
||||||
}
|
|
||||||
return copiedMap
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *DataStore) SetSystemState(key EventKey, value EventValue) {
|
|
||||||
self.mtx.Lock()
|
|
||||||
self.lastSeen[key] = value
|
|
||||||
self.saveSystemState()
|
|
||||||
self.mtx.Unlock()
|
|
||||||
}
|
|
71
filters.go
71
filters.go
|
@ -1,7 +1,11 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/gob"
|
||||||
|
"errors"
|
||||||
|
"io/fs"
|
||||||
"log"
|
"log"
|
||||||
|
"os"
|
||||||
|
|
||||||
"github.com/probakowski/go-satel"
|
"github.com/probakowski/go-satel"
|
||||||
)
|
)
|
||||||
|
@ -62,16 +66,75 @@ func FilterByIndex(ev <-chan satel.Event, allowedIndexes []int) <-chan satel.Eve
|
||||||
return returnChan
|
return returnChan
|
||||||
}
|
}
|
||||||
|
|
||||||
func FilterByLastSeen(ev <-chan satel.Event, dataStore *DataStore, logger *log.Logger) <-chan satel.Event {
|
type EventKey struct {
|
||||||
|
ChangeType satel.ChangeType
|
||||||
|
Index int
|
||||||
|
}
|
||||||
|
|
||||||
|
type EventValue struct{ Value bool }
|
||||||
|
|
||||||
|
type PersistenceRecord struct {
|
||||||
|
Key EventKey
|
||||||
|
Value EventValue
|
||||||
|
}
|
||||||
|
|
||||||
|
func persistMapOnDisk(lastSeen *map[EventKey]bool, persistenceFilePath string) {
|
||||||
|
f, err := os.OpenFile(persistenceFilePath, os.O_WRONLY|os.O_CREATE, 0600)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
enc := gob.NewEncoder(f)
|
||||||
|
data := make([]PersistenceRecord, len(*lastSeen))
|
||||||
|
i := 0
|
||||||
|
for k, v := range *lastSeen {
|
||||||
|
data[i] = PersistenceRecord{Key: k, Value: EventValue{v}}
|
||||||
|
i += 1
|
||||||
|
}
|
||||||
|
err = enc.Encode(data)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func readMapFromDisk(persistenceFilePath string, logger *log.Logger) map[EventKey]bool {
|
||||||
|
lastSeen := make(map[EventKey]bool)
|
||||||
|
f, err := os.OpenFile(persistenceFilePath, os.O_RDONLY|os.O_CREATE, 0600)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, fs.ErrNotExist) {
|
||||||
|
// File not existing is fine, we'll create one later
|
||||||
|
return lastSeen
|
||||||
|
}
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
dec := gob.NewDecoder(f)
|
||||||
|
data := make([]PersistenceRecord, 0)
|
||||||
|
err = dec.Decode(&data)
|
||||||
|
if err != nil {
|
||||||
|
logger.Println("Error reading persistence file", persistenceFilePath, "from disk:", err, ". Discarding and starting over.")
|
||||||
|
return lastSeen
|
||||||
|
}
|
||||||
|
for _, readData := range data {
|
||||||
|
lastSeen[readData.Key] = readData.Value.Value
|
||||||
|
}
|
||||||
|
|
||||||
|
return lastSeen
|
||||||
|
}
|
||||||
|
|
||||||
|
func FilterByLastSeen(ev <-chan satel.Event, persistenceFilePath string, logger *log.Logger) <-chan satel.Event {
|
||||||
returnChan := make(chan satel.Event)
|
returnChan := make(chan satel.Event)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
lastSeen := readMapFromDisk(persistenceFilePath, logger)
|
||||||
for e := range ev {
|
for e := range ev {
|
||||||
lastSeen := dataStore.GetSystemState()
|
|
||||||
val, ok := lastSeen[EventKey{e.Type, e.Index}]
|
val, ok := lastSeen[EventKey{e.Type, e.Index}]
|
||||||
if !ok || val.Value != e.Value {
|
if !ok || val != e.Value {
|
||||||
|
lastSeen[EventKey{e.Type, e.Index}] = e.Value
|
||||||
returnChan <- e
|
returnChan <- e
|
||||||
dataStore.SetSystemState(EventKey{e.Type, e.Index}, EventValue{e.Value})
|
|
||||||
|
persistMapOnDisk(&lastSeen, persistenceFilePath)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
close(returnChan)
|
close(returnChan)
|
||||||
|
|
|
@ -129,12 +129,10 @@ func TestSatelLastSeenFiltering(t *testing.T) {
|
||||||
testEvents := make(chan satel.Event)
|
testEvents := make(chan satel.Event)
|
||||||
receivedEvents := make([]satel.Event, 0)
|
receivedEvents := make([]satel.Event, 0)
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
fakeLog := log.New(io.Discard, "", log.Ltime)
|
|
||||||
ds := MakeDataStore(fakeLog, tempFileName)
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
for e := range FilterByLastSeen(testEvents, &ds, fakeLog) {
|
for e := range FilterByLastSeen(testEvents, tempFileName, log.New(io.Discard, "", log.Ltime)) {
|
||||||
receivedEvents = append(receivedEvents, e)
|
receivedEvents = append(receivedEvents, e)
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
|
@ -165,12 +163,10 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) {
|
||||||
testEvents := make(chan satel.Event)
|
testEvents := make(chan satel.Event)
|
||||||
receivedEvents := make([]satel.Event, 0)
|
receivedEvents := make([]satel.Event, 0)
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
fakeLog := log.New(io.Discard, "", log.Ltime)
|
|
||||||
ds := MakeDataStore(fakeLog, tempFileName)
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
for e := range FilterByLastSeen(testEvents, &ds, fakeLog) {
|
for e := range FilterByLastSeen(testEvents, tempFileName, log.New(io.Discard, "", log.Ltime)) {
|
||||||
receivedEvents = append(receivedEvents, e)
|
receivedEvents = append(receivedEvents, e)
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
|
@ -193,10 +189,9 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) {
|
||||||
|
|
||||||
testEvents = make(chan satel.Event)
|
testEvents = make(chan satel.Event)
|
||||||
receivedEvents = make([]satel.Event, 0)
|
receivedEvents = make([]satel.Event, 0)
|
||||||
ds = MakeDataStore(fakeLog, tempFileName)
|
|
||||||
go func() {
|
go func() {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
for e := range FilterByLastSeen(testEvents, &ds, fakeLog) {
|
for e := range FilterByLastSeen(testEvents, tempFileName, log.New(io.Discard, "", log.Ltime)) {
|
||||||
receivedEvents = append(receivedEvents, e)
|
receivedEvents = append(receivedEvents, e)
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -13,5 +13,3 @@ require (
|
||||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
)
|
)
|
||||||
|
|
||||||
replace github.com/probakowski/go-satel => git.sr.ht/~michalr/go-satel v0.0.0-20211120120346-bed9818777ce
|
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -1,5 +1,3 @@
|
||||||
git.sr.ht/~michalr/go-satel v0.0.0-20211120120346-bed9818777ce h1:6navIHdH9NaXuAd6R2rACpqz2Das2Gu31E1JxT3Tv8w=
|
|
||||||
git.sr.ht/~michalr/go-satel v0.0.0-20211120120346-bed9818777ce/go.mod h1:q3DquDWRcoFWZ61dGZFg3snucolljixMoAzJIiCjWoY=
|
|
||||||
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
|
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
|
||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
|
5
main.go
5
main.go
|
@ -18,7 +18,6 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
MessageNotMoreOftenThanSeconds = 15
|
MessageNotMoreOftenThanSeconds = 15
|
||||||
PersistenceFilename = "hs_wro_last_seen.bin"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type TgSender struct {
|
type TgSender struct {
|
||||||
|
@ -137,8 +136,6 @@ func main() {
|
||||||
|
|
||||||
tpl := template.Must(template.New("TelegramMessage").Parse(TelegramMessageTemplate))
|
tpl := template.Must(template.New("TelegramMessage").Parse(TelegramMessageTemplate))
|
||||||
|
|
||||||
dataStore := MakeDataStore(log.New(os.Stderr, "DataStore", log.Lmicroseconds), PersistenceFilename)
|
|
||||||
|
|
||||||
Consume(
|
Consume(
|
||||||
SendToTg(
|
SendToTg(
|
||||||
tgSenderWorker(tgEvents, &wg, sleeper, log.New(os.Stderr, "TgSender", log.Lmicroseconds)),
|
tgSenderWorker(tgEvents, &wg, sleeper, log.New(os.Stderr, "TgSender", log.Lmicroseconds)),
|
||||||
|
@ -147,7 +144,7 @@ func main() {
|
||||||
go CloseSatelOnCtrlC(s)
|
go CloseSatelOnCtrlC(s)
|
||||||
|
|
||||||
for e := range FilterByIndex(FilterByType(
|
for e := range FilterByIndex(FilterByType(
|
||||||
FilterByLastSeen(s.Events, &dataStore, log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)),
|
FilterByLastSeen(s.Events, "hs_wro_last_seen.bin", log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)),
|
||||||
allowedTypes),
|
allowedTypes),
|
||||||
allowedIndexes) {
|
allowedIndexes) {
|
||||||
logger.Print("Received change from SATEL: ", e)
|
logger.Print("Received change from SATEL: ", e)
|
||||||
|
|
Loading…
Reference in New Issue