Compare commits
4 Commits
ed9981afbe
...
57c66c4690
Author | SHA1 | Date |
---|---|---|
Michał Rudowicz | 57c66c4690 | |
Michał Rudowicz | 3e3eb3bc5d | |
Michał Rudowicz | 6b69fed5b8 | |
Michał Rudowicz | 0f858c7767 |
71
filters.go
71
filters.go
|
@ -2,6 +2,7 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"git.sr.ht/~michalr/go-satel"
|
"git.sr.ht/~michalr/go-satel"
|
||||||
)
|
)
|
||||||
|
@ -20,12 +21,15 @@ func isBasicEventElementOkay(basicEventElement satel.BasicEventElement, allowedT
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func FilterByTypeOrIndex(ev <-chan satel.Event, allowedTypes []satel.ChangeType, allowedIndexes []int) <-chan satel.Event {
|
func FilterByTypeOrIndex(ev <-chan satel.Event, wg *sync.WaitGroup, allowedTypes []satel.ChangeType, allowedIndexes []int) <-chan satel.Event {
|
||||||
returnChan := make(chan satel.Event)
|
returnChan := make(chan satel.Event)
|
||||||
|
|
||||||
if (len(allowedTypes) == 0) && (len(allowedIndexes) == 0) {
|
if (len(allowedTypes) == 0) && (len(allowedIndexes) == 0) {
|
||||||
// no allowed types == all types are allowed
|
// no allowed types == all types are allowed
|
||||||
go func() {
|
go func() {
|
||||||
|
wg.Add(1)
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
for e := range ev {
|
for e := range ev {
|
||||||
returnChan <- e
|
returnChan <- e
|
||||||
}
|
}
|
||||||
|
@ -33,6 +37,9 @@ func FilterByTypeOrIndex(ev <-chan satel.Event, allowedTypes []satel.ChangeType,
|
||||||
}()
|
}()
|
||||||
} else {
|
} else {
|
||||||
go func() {
|
go func() {
|
||||||
|
wg.Add(1)
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
for e := range ev {
|
for e := range ev {
|
||||||
retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)}
|
retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)}
|
||||||
for _, basicEventElement := range e.BasicEvents {
|
for _, basicEventElement := range e.BasicEvents {
|
||||||
|
@ -51,10 +58,13 @@ func FilterByTypeOrIndex(ev <-chan satel.Event, allowedTypes []satel.ChangeType,
|
||||||
return returnChan
|
return returnChan
|
||||||
}
|
}
|
||||||
|
|
||||||
func FilterByLastSeen(ev <-chan satel.Event, dataStore *DataStore, logger *log.Logger) <-chan satel.Event {
|
func FilterByLastSeen(ev <-chan satel.Event, wg *sync.WaitGroup, dataStore *DataStore, logger *log.Logger) <-chan satel.Event {
|
||||||
returnChan := make(chan satel.Event)
|
returnChan := make(chan satel.Event)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
wg.Add(1)
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
for e := range ev {
|
for e := range ev {
|
||||||
retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)}
|
retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)}
|
||||||
for _, basicEventElement := range e.BasicEvents {
|
for _, basicEventElement := range e.BasicEvents {
|
||||||
|
@ -76,3 +86,60 @@ func FilterByLastSeen(ev <-chan satel.Event, dataStore *DataStore, logger *log.L
|
||||||
|
|
||||||
return returnChan
|
return returnChan
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func appendToGenericMessage(msg *GenericMessage, new *GenericMessage) *GenericMessage {
|
||||||
|
if msg == nil {
|
||||||
|
return new
|
||||||
|
}
|
||||||
|
|
||||||
|
throughNewMessages:
|
||||||
|
for _, newEv := range new.Messages {
|
||||||
|
for i, oldEv := range msg.Messages {
|
||||||
|
if oldEv.Index == newEv.Index && oldEv.Type == newEv.Type {
|
||||||
|
// this message was seen - update its value
|
||||||
|
msg.Messages[i].Value = newEv.Value
|
||||||
|
continue throughNewMessages
|
||||||
|
}
|
||||||
|
// apparently this type of message was not yet seen, save it
|
||||||
|
msg.Messages = append(msg.Messages, newEv)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return msg
|
||||||
|
}
|
||||||
|
|
||||||
|
func Throttle(inputEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper Sleeper, logger *log.Logger) <-chan GenericMessage {
|
||||||
|
returnChan := make(chan GenericMessage)
|
||||||
|
timeoutEvents := make(chan interface{})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
wg.Add(1)
|
||||||
|
var currentEvent *GenericMessage = nil
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case ev, ok := <-inputEvents:
|
||||||
|
if !ok {
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
if currentEvent == nil {
|
||||||
|
logger.Print("Waiting for more messages to arrive before sending...")
|
||||||
|
sleeper.Sleep(timeoutEvents)
|
||||||
|
}
|
||||||
|
currentEvent = appendToGenericMessage(currentEvent, &ev)
|
||||||
|
case <-timeoutEvents:
|
||||||
|
logger.Print("Time's up, sending all messages we've got for now.")
|
||||||
|
returnChan <- *currentEvent
|
||||||
|
currentEvent = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If anything is left to be sent, send it now
|
||||||
|
if currentEvent != nil {
|
||||||
|
returnChan <- *currentEvent
|
||||||
|
}
|
||||||
|
close(returnChan)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
return returnChan
|
||||||
|
}
|
||||||
|
|
|
@ -18,7 +18,7 @@ func TestSatelEventTypeFiltering(t *testing.T) {
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{satel.ArmedPartition, satel.PartitionFireAlarm}, []int{}) {
|
for e := range FilterByTypeOrIndex(testEvents, &wg, []satel.ChangeType{satel.ArmedPartition, satel.PartitionFireAlarm}, []int{}) {
|
||||||
receivedEvents = append(receivedEvents, e)
|
receivedEvents = append(receivedEvents, e)
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
|
@ -46,7 +46,7 @@ func TestSatelEventTypeFiltering_NoAllowedEventTypesMeansAllAreAllowed(t *testin
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{}, []int{}) {
|
for e := range FilterByTypeOrIndex(testEvents, &wg, []satel.ChangeType{}, []int{}) {
|
||||||
receivedEvents = append(receivedEvents, e)
|
receivedEvents = append(receivedEvents, e)
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
|
@ -72,7 +72,7 @@ func TestSatelIndexFiltering(t *testing.T) {
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{}, []int{1, 3}) {
|
for e := range FilterByTypeOrIndex(testEvents, &wg, []satel.ChangeType{}, []int{1, 3}) {
|
||||||
receivedEvents = append(receivedEvents, e)
|
receivedEvents = append(receivedEvents, e)
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
|
@ -101,7 +101,7 @@ func TestSatelIndexFiltering_NoAllowedEventTypesMeansAllAreAllowed(t *testing.T)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{}, []int{}) {
|
for e := range FilterByTypeOrIndex(testEvents, &wg, []satel.ChangeType{}, []int{}) {
|
||||||
receivedEvents = append(receivedEvents, e)
|
receivedEvents = append(receivedEvents, e)
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
|
@ -134,7 +134,7 @@ func TestSatelLastSeenFiltering(t *testing.T) {
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
for e := range FilterByLastSeen(testEvents, &ds, fakeLog) {
|
for e := range FilterByLastSeen(testEvents, &wg, &ds, fakeLog) {
|
||||||
receivedEvents = append(receivedEvents, e)
|
receivedEvents = append(receivedEvents, e)
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
|
@ -170,7 +170,7 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) {
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
for e := range FilterByLastSeen(testEvents, &ds, fakeLog) {
|
for e := range FilterByLastSeen(testEvents, &wg, &ds, fakeLog) {
|
||||||
receivedEvents = append(receivedEvents, e)
|
receivedEvents = append(receivedEvents, e)
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
|
@ -196,7 +196,7 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) {
|
||||||
ds = MakeDataStore(fakeLog, tempFileName)
|
ds = MakeDataStore(fakeLog, tempFileName)
|
||||||
go func() {
|
go func() {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
for e := range FilterByLastSeen(testEvents, &ds, fakeLog) {
|
for e := range FilterByLastSeen(testEvents, &wg, &ds, fakeLog) {
|
||||||
receivedEvents = append(receivedEvents, e)
|
receivedEvents = append(receivedEvents, e)
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
|
@ -216,3 +216,57 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) {
|
||||||
assert.Len(t, receivedEvents, 1)
|
assert.Len(t, receivedEvents, 1)
|
||||||
assert.Contains(t, receivedEvents, makeTestSatelEvent(satel.ArmedPartition, 1, true))
|
assert.Contains(t, receivedEvents, makeTestSatelEvent(satel.ArmedPartition, 1, true))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MockSleeper struct {
|
||||||
|
ch *chan<- interface{}
|
||||||
|
callCount int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *MockSleeper) Sleep(ch chan<- interface{}) {
|
||||||
|
if self.ch == nil {
|
||||||
|
self.ch = &ch
|
||||||
|
}
|
||||||
|
self.callCount += 1
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestThrottle(t *testing.T) {
|
||||||
|
testEvents := make(chan GenericMessage)
|
||||||
|
receivedEvents := make([]GenericMessage, 0)
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
fakeLog := log.New(io.Discard, "", log.Ltime)
|
||||||
|
mockSleeper := MockSleeper{nil, 0}
|
||||||
|
|
||||||
|
var (
|
||||||
|
tplMessageTest1 = satel.BasicEventElement{Type: satel.ArmedPartition, Index: 1, Value: true}
|
||||||
|
tplMessageTest2 = satel.BasicEventElement{Type: satel.ZoneViolation, Index: 2, Value: true}
|
||||||
|
tplMessageTest3 = satel.BasicEventElement{Type: satel.ArmedPartition, Index: 1, Value: false}
|
||||||
|
tplMessageTest4 = satel.BasicEventElement{Type: satel.ZoneViolation, Index: 2, Value: false}
|
||||||
|
)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
wg.Add(1)
|
||||||
|
for e := range Throttle(testEvents, &wg, &mockSleeper, fakeLog) {
|
||||||
|
receivedEvents = append(receivedEvents, e)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
testEvents <- GenericMessage{TgChatId{123}, []satel.BasicEventElement{tplMessageTest1}}
|
||||||
|
testEvents <- GenericMessage{TgChatId{123}, []satel.BasicEventElement{tplMessageTest2}}
|
||||||
|
testEvents <- GenericMessage{TgChatId{123}, []satel.BasicEventElement{tplMessageTest3}}
|
||||||
|
*mockSleeper.ch <- nil
|
||||||
|
|
||||||
|
testEvents <- GenericMessage{TgChatId{123}, []satel.BasicEventElement{tplMessageTest4}}
|
||||||
|
|
||||||
|
close(testEvents)
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
assert.Equal(t, 2, mockSleeper.callCount)
|
||||||
|
assert.Len(t, receivedEvents, 2)
|
||||||
|
assert.Contains(t, receivedEvents[0].Messages, tplMessageTest2)
|
||||||
|
assert.Contains(t, receivedEvents[0].Messages, tplMessageTest3)
|
||||||
|
assert.Len(t, receivedEvents[0].Messages, 2)
|
||||||
|
|
||||||
|
assert.Contains(t, receivedEvents[1].Messages, tplMessageTest4)
|
||||||
|
assert.Len(t, receivedEvents[1].Messages, 1)
|
||||||
|
}
|
||||||
|
|
13
main.go
13
main.go
|
@ -130,6 +130,7 @@ func main() {
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
tgEvents = make(chan GenericMessage, 5)
|
tgEvents = make(chan GenericMessage, 5)
|
||||||
logger = log.New(os.Stderr, "Main", log.Lmicroseconds)
|
logger = log.New(os.Stderr, "Main", log.Lmicroseconds)
|
||||||
|
sleeper = RealSleeper{time.Second * 60}
|
||||||
)
|
)
|
||||||
|
|
||||||
satelAddr, chatIds, allowedTypes, allowedIndexes, poolInterval := getCmdLineParams(logger)
|
satelAddr, chatIds, allowedTypes, allowedIndexes, poolInterval := getCmdLineParams(logger)
|
||||||
|
@ -149,17 +150,17 @@ func main() {
|
||||||
|
|
||||||
dataStore := MakeDataStore(log.New(os.Stderr, "DataStore", log.Lmicroseconds), getPersistenceFilePath())
|
dataStore := MakeDataStore(log.New(os.Stderr, "DataStore", log.Lmicroseconds), getPersistenceFilePath())
|
||||||
|
|
||||||
NotifyViaHTTP(
|
Consume(
|
||||||
SendToTg(tgEvents, tgSender, &wg, log.New(os.Stderr, "SendToTg", log.Lmicroseconds), tpl),
|
SendToTg(Throttle(NotifyViaHTTP(tgEvents, &wg, log.New(os.Stderr, "HTTPNotify", log.Lmicroseconds)),
|
||||||
&wg,
|
&wg, sleeper, log.New(os.Stderr, "MessageThrottle", log.Lmicroseconds)),
|
||||||
log.New(os.Stderr, "HTTPNotify", log.Lmicroseconds),
|
tgSender, &wg, log.New(os.Stderr, "SendToTg", log.Lmicroseconds), tpl),
|
||||||
)
|
)
|
||||||
|
|
||||||
go CloseSatelOnCtrlC(s)
|
go CloseSatelOnCtrlC(s)
|
||||||
|
|
||||||
for e := range FilterByTypeOrIndex(
|
for e := range FilterByTypeOrIndex(
|
||||||
FilterByLastSeen(s.Events, &dataStore, log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)),
|
FilterByLastSeen(s.Events, &wg, &dataStore, log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)),
|
||||||
allowedTypes, allowedIndexes) {
|
&wg, allowedTypes, allowedIndexes) {
|
||||||
logger.Print("Received change from SATEL: ", e)
|
logger.Print("Received change from SATEL: ", e)
|
||||||
for _, chatId := range chatIds {
|
for _, chatId := range chatIds {
|
||||||
sendTgMessage(tgEvents, e.BasicEvents, chatId)
|
sendTgMessage(tgEvents, e.BasicEvents, chatId)
|
||||||
|
|
|
@ -77,16 +77,19 @@ func doHttpNotification(url string, logger *log.Logger, wg *sync.WaitGroup) {
|
||||||
logger.Print("Notified via HTTP with result ", res.StatusCode)
|
logger.Print("Notified via HTTP with result ", res.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NotifyViaHTTP(events <-chan GenericMessage, wg *sync.WaitGroup, logger *log.Logger) {
|
func NotifyViaHTTP(events <-chan GenericMessage, wg *sync.WaitGroup, logger *log.Logger) <-chan GenericMessage {
|
||||||
|
returnEvents := make(chan GenericMessage)
|
||||||
armCallbackUrl := os.Getenv("NOTIFY_URL_ARM")
|
armCallbackUrl := os.Getenv("NOTIFY_URL_ARM")
|
||||||
disarmCallbackUrl := os.Getenv("NOTIFY_URL_DISARM")
|
disarmCallbackUrl := os.Getenv("NOTIFY_URL_DISARM")
|
||||||
alarmCallbackUrl := os.Getenv("ALARM_URL_ARM")
|
alarmCallbackUrl := os.Getenv("ALARM_URL_ARM")
|
||||||
armDisarmCallbackEnabled := (len(armCallbackUrl) != 0) && (len(disarmCallbackUrl) != 0)
|
armDisarmCallbackEnabled := (len(armCallbackUrl) != 0) && (len(disarmCallbackUrl) != 0)
|
||||||
alarmCallbackEnabled := (len(alarmCallbackUrl) != 0)
|
alarmCallbackEnabled := (len(alarmCallbackUrl) != 0)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for e := range events {
|
for e := range events {
|
||||||
|
returnEvents <- e
|
||||||
if armDisarmCallbackEnabled {
|
if armDisarmCallbackEnabled {
|
||||||
inner_arm:
|
inner_arm:
|
||||||
for _, basicElement := range e.Messages {
|
for _, basicElement := range e.Messages {
|
||||||
|
@ -113,4 +116,6 @@ func NotifyViaHTTP(events <-chan GenericMessage, wg *sync.WaitGroup, logger *log
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
return returnEvents
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
const TelegramMessageTemplate = `Received following changes:
|
const TelegramMessageTemplate = `{{- range .Messages}}
|
||||||
{{- range .Messages}}
|
|
||||||
:: {{.GetName}}: {{.FormatEvent}}
|
:: {{.GetName}}: {{.FormatEvent}}
|
||||||
{{- else -}}
|
{{- else -}}
|
||||||
Huh, no messages - this is a bug
|
Huh, no messages - this is a bug
|
||||||
|
|
Loading…
Reference in New Issue