145 lines
3.0 KiB
Go
145 lines
3.0 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/gob"
|
|
"errors"
|
|
"io/fs"
|
|
"log"
|
|
"os"
|
|
|
|
"github.com/probakowski/go-satel"
|
|
)
|
|
|
|
func FilterByType(ev <-chan satel.Event, allowedTypes []satel.ChangeType) <-chan satel.Event {
|
|
returnChan := make(chan satel.Event)
|
|
|
|
if len(allowedTypes) == 0 {
|
|
// no allowed types == all types are allowed
|
|
go func() {
|
|
for e := range ev {
|
|
returnChan <- e
|
|
}
|
|
close(returnChan)
|
|
}()
|
|
} else {
|
|
go func() {
|
|
for e := range ev {
|
|
for _, allowedType := range allowedTypes {
|
|
if allowedType == e.Type {
|
|
returnChan <- e
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
close(returnChan)
|
|
}()
|
|
}
|
|
|
|
return returnChan
|
|
}
|
|
|
|
func FilterByIndex(ev <-chan satel.Event, allowedIndexes []int) <-chan satel.Event {
|
|
returnChan := make(chan satel.Event)
|
|
|
|
if len(allowedIndexes) == 0 {
|
|
// no allowed indexes == all indexes are allowed
|
|
go func() {
|
|
for e := range ev {
|
|
returnChan <- e
|
|
}
|
|
close(returnChan)
|
|
}()
|
|
} else {
|
|
go func() {
|
|
for e := range ev {
|
|
for _, allowedIndex := range allowedIndexes {
|
|
if allowedIndex == e.Index {
|
|
returnChan <- e
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
close(returnChan)
|
|
}()
|
|
}
|
|
|
|
return returnChan
|
|
}
|
|
|
|
type EventKey struct {
|
|
ChangeType satel.ChangeType
|
|
Index int
|
|
}
|
|
|
|
type EventValue struct{ Value bool }
|
|
|
|
type PersistenceRecord struct {
|
|
Key EventKey
|
|
Value EventValue
|
|
}
|
|
|
|
func persistMapOnDisk(lastSeen *map[EventKey]bool, persistenceFilePath string) {
|
|
f, err := os.OpenFile(persistenceFilePath, os.O_WRONLY|os.O_CREATE, 0600)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer f.Close()
|
|
enc := gob.NewEncoder(f)
|
|
data := make([]PersistenceRecord, len(*lastSeen))
|
|
i := 0
|
|
for k, v := range *lastSeen {
|
|
data[i] = PersistenceRecord{Key: k, Value: EventValue{v}}
|
|
i += 1
|
|
}
|
|
err = enc.Encode(data)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func readMapFromDisk(persistenceFilePath string, logger *log.Logger) map[EventKey]bool {
|
|
lastSeen := make(map[EventKey]bool)
|
|
f, err := os.OpenFile(persistenceFilePath, os.O_RDONLY|os.O_CREATE, 0600)
|
|
if err != nil {
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
// File not existing is fine, we'll create one later
|
|
return lastSeen
|
|
}
|
|
panic(err)
|
|
}
|
|
defer f.Close()
|
|
|
|
dec := gob.NewDecoder(f)
|
|
data := make([]PersistenceRecord, 0)
|
|
err = dec.Decode(&data)
|
|
if err != nil {
|
|
logger.Println("Error reading persistence file", persistenceFilePath, "from disk:", err, ". Discarding and starting over.")
|
|
return lastSeen
|
|
}
|
|
for _, readData := range data {
|
|
lastSeen[readData.Key] = readData.Value.Value
|
|
}
|
|
|
|
return lastSeen
|
|
}
|
|
|
|
func FilterByLastSeen(ev <-chan satel.Event, persistenceFilePath string, logger *log.Logger) <-chan satel.Event {
|
|
returnChan := make(chan satel.Event)
|
|
|
|
go func() {
|
|
lastSeen := readMapFromDisk(persistenceFilePath, logger)
|
|
for e := range ev {
|
|
val, ok := lastSeen[EventKey{e.Type, e.Index}]
|
|
if !ok || val != e.Value {
|
|
lastSeen[EventKey{e.Type, e.Index}] = e.Value
|
|
returnChan <- e
|
|
|
|
persistMapOnDisk(&lastSeen, persistenceFilePath)
|
|
}
|
|
}
|
|
close(returnChan)
|
|
}()
|
|
|
|
return returnChan
|
|
}
|