1
0
Fork 0

Compare commits

..

No commits in common. "57c66c469096882134a496757e5ca06744388b8f" and "ed9981afbeda107cebda7adc7636c24539bcf549" have entirely different histories.

5 changed files with 18 additions and 144 deletions

View File

@ -2,7 +2,6 @@ package main
import (
"log"
"sync"
"git.sr.ht/~michalr/go-satel"
)
@ -21,15 +20,12 @@ func isBasicEventElementOkay(basicEventElement satel.BasicEventElement, allowedT
return false
}
func FilterByTypeOrIndex(ev <-chan satel.Event, wg *sync.WaitGroup, allowedTypes []satel.ChangeType, allowedIndexes []int) <-chan satel.Event {
func FilterByTypeOrIndex(ev <-chan satel.Event, allowedTypes []satel.ChangeType, allowedIndexes []int) <-chan satel.Event {
returnChan := make(chan satel.Event)
if (len(allowedTypes) == 0) && (len(allowedIndexes) == 0) {
// no allowed types == all types are allowed
go func() {
wg.Add(1)
defer wg.Done()
for e := range ev {
returnChan <- e
}
@ -37,9 +33,6 @@ func FilterByTypeOrIndex(ev <-chan satel.Event, wg *sync.WaitGroup, allowedTypes
}()
} else {
go func() {
wg.Add(1)
defer wg.Done()
for e := range ev {
retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)}
for _, basicEventElement := range e.BasicEvents {
@ -58,13 +51,10 @@ func FilterByTypeOrIndex(ev <-chan satel.Event, wg *sync.WaitGroup, allowedTypes
return returnChan
}
func FilterByLastSeen(ev <-chan satel.Event, wg *sync.WaitGroup, dataStore *DataStore, logger *log.Logger) <-chan satel.Event {
func FilterByLastSeen(ev <-chan satel.Event, dataStore *DataStore, logger *log.Logger) <-chan satel.Event {
returnChan := make(chan satel.Event)
go func() {
wg.Add(1)
defer wg.Done()
for e := range ev {
retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)}
for _, basicEventElement := range e.BasicEvents {
@ -86,60 +76,3 @@ func FilterByLastSeen(ev <-chan satel.Event, wg *sync.WaitGroup, dataStore *Data
return returnChan
}
func appendToGenericMessage(msg *GenericMessage, new *GenericMessage) *GenericMessage {
if msg == nil {
return new
}
throughNewMessages:
for _, newEv := range new.Messages {
for i, oldEv := range msg.Messages {
if oldEv.Index == newEv.Index && oldEv.Type == newEv.Type {
// this message was seen - update its value
msg.Messages[i].Value = newEv.Value
continue throughNewMessages
}
// apparently this type of message was not yet seen, save it
msg.Messages = append(msg.Messages, newEv)
}
}
return msg
}
func Throttle(inputEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper Sleeper, logger *log.Logger) <-chan GenericMessage {
returnChan := make(chan GenericMessage)
timeoutEvents := make(chan interface{})
go func() {
wg.Add(1)
var currentEvent *GenericMessage = nil
loop:
for {
select {
case ev, ok := <-inputEvents:
if !ok {
break loop
}
if currentEvent == nil {
logger.Print("Waiting for more messages to arrive before sending...")
sleeper.Sleep(timeoutEvents)
}
currentEvent = appendToGenericMessage(currentEvent, &ev)
case <-timeoutEvents:
logger.Print("Time's up, sending all messages we've got for now.")
returnChan <- *currentEvent
currentEvent = nil
}
}
// If anything is left to be sent, send it now
if currentEvent != nil {
returnChan <- *currentEvent
}
close(returnChan)
wg.Done()
}()
return returnChan
}

View File

@ -18,7 +18,7 @@ func TestSatelEventTypeFiltering(t *testing.T) {
go func() {
wg.Add(1)
for e := range FilterByTypeOrIndex(testEvents, &wg, []satel.ChangeType{satel.ArmedPartition, satel.PartitionFireAlarm}, []int{}) {
for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{satel.ArmedPartition, satel.PartitionFireAlarm}, []int{}) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
@ -46,7 +46,7 @@ func TestSatelEventTypeFiltering_NoAllowedEventTypesMeansAllAreAllowed(t *testin
go func() {
wg.Add(1)
for e := range FilterByTypeOrIndex(testEvents, &wg, []satel.ChangeType{}, []int{}) {
for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{}, []int{}) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
@ -72,7 +72,7 @@ func TestSatelIndexFiltering(t *testing.T) {
go func() {
wg.Add(1)
for e := range FilterByTypeOrIndex(testEvents, &wg, []satel.ChangeType{}, []int{1, 3}) {
for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{}, []int{1, 3}) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
@ -101,7 +101,7 @@ func TestSatelIndexFiltering_NoAllowedEventTypesMeansAllAreAllowed(t *testing.T)
go func() {
wg.Add(1)
for e := range FilterByTypeOrIndex(testEvents, &wg, []satel.ChangeType{}, []int{}) {
for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{}, []int{}) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
@ -134,7 +134,7 @@ func TestSatelLastSeenFiltering(t *testing.T) {
go func() {
wg.Add(1)
for e := range FilterByLastSeen(testEvents, &wg, &ds, fakeLog) {
for e := range FilterByLastSeen(testEvents, &ds, fakeLog) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
@ -170,7 +170,7 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) {
go func() {
wg.Add(1)
for e := range FilterByLastSeen(testEvents, &wg, &ds, fakeLog) {
for e := range FilterByLastSeen(testEvents, &ds, fakeLog) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
@ -196,7 +196,7 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) {
ds = MakeDataStore(fakeLog, tempFileName)
go func() {
wg.Add(1)
for e := range FilterByLastSeen(testEvents, &wg, &ds, fakeLog) {
for e := range FilterByLastSeen(testEvents, &ds, fakeLog) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
@ -216,57 +216,3 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) {
assert.Len(t, receivedEvents, 1)
assert.Contains(t, receivedEvents, makeTestSatelEvent(satel.ArmedPartition, 1, true))
}
type MockSleeper struct {
ch *chan<- interface{}
callCount int
}
func (self *MockSleeper) Sleep(ch chan<- interface{}) {
if self.ch == nil {
self.ch = &ch
}
self.callCount += 1
}
func TestThrottle(t *testing.T) {
testEvents := make(chan GenericMessage)
receivedEvents := make([]GenericMessage, 0)
wg := sync.WaitGroup{}
fakeLog := log.New(io.Discard, "", log.Ltime)
mockSleeper := MockSleeper{nil, 0}
var (
tplMessageTest1 = satel.BasicEventElement{Type: satel.ArmedPartition, Index: 1, Value: true}
tplMessageTest2 = satel.BasicEventElement{Type: satel.ZoneViolation, Index: 2, Value: true}
tplMessageTest3 = satel.BasicEventElement{Type: satel.ArmedPartition, Index: 1, Value: false}
tplMessageTest4 = satel.BasicEventElement{Type: satel.ZoneViolation, Index: 2, Value: false}
)
go func() {
wg.Add(1)
for e := range Throttle(testEvents, &wg, &mockSleeper, fakeLog) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
}()
testEvents <- GenericMessage{TgChatId{123}, []satel.BasicEventElement{tplMessageTest1}}
testEvents <- GenericMessage{TgChatId{123}, []satel.BasicEventElement{tplMessageTest2}}
testEvents <- GenericMessage{TgChatId{123}, []satel.BasicEventElement{tplMessageTest3}}
*mockSleeper.ch <- nil
testEvents <- GenericMessage{TgChatId{123}, []satel.BasicEventElement{tplMessageTest4}}
close(testEvents)
wg.Wait()
assert.Equal(t, 2, mockSleeper.callCount)
assert.Len(t, receivedEvents, 2)
assert.Contains(t, receivedEvents[0].Messages, tplMessageTest2)
assert.Contains(t, receivedEvents[0].Messages, tplMessageTest3)
assert.Len(t, receivedEvents[0].Messages, 2)
assert.Contains(t, receivedEvents[1].Messages, tplMessageTest4)
assert.Len(t, receivedEvents[1].Messages, 1)
}

13
main.go
View File

@ -130,7 +130,6 @@ func main() {
wg sync.WaitGroup
tgEvents = make(chan GenericMessage, 5)
logger = log.New(os.Stderr, "Main", log.Lmicroseconds)
sleeper = RealSleeper{time.Second * 60}
)
satelAddr, chatIds, allowedTypes, allowedIndexes, poolInterval := getCmdLineParams(logger)
@ -150,17 +149,17 @@ func main() {
dataStore := MakeDataStore(log.New(os.Stderr, "DataStore", log.Lmicroseconds), getPersistenceFilePath())
Consume(
SendToTg(Throttle(NotifyViaHTTP(tgEvents, &wg, log.New(os.Stderr, "HTTPNotify", log.Lmicroseconds)),
&wg, sleeper, log.New(os.Stderr, "MessageThrottle", log.Lmicroseconds)),
tgSender, &wg, log.New(os.Stderr, "SendToTg", log.Lmicroseconds), tpl),
NotifyViaHTTP(
SendToTg(tgEvents, tgSender, &wg, log.New(os.Stderr, "SendToTg", log.Lmicroseconds), tpl),
&wg,
log.New(os.Stderr, "HTTPNotify", log.Lmicroseconds),
)
go CloseSatelOnCtrlC(s)
for e := range FilterByTypeOrIndex(
FilterByLastSeen(s.Events, &wg, &dataStore, log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)),
&wg, allowedTypes, allowedIndexes) {
FilterByLastSeen(s.Events, &dataStore, log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)),
allowedTypes, allowedIndexes) {
logger.Print("Received change from SATEL: ", e)
for _, chatId := range chatIds {
sendTgMessage(tgEvents, e.BasicEvents, chatId)

View File

@ -77,19 +77,16 @@ func doHttpNotification(url string, logger *log.Logger, wg *sync.WaitGroup) {
logger.Print("Notified via HTTP with result ", res.StatusCode)
}
func NotifyViaHTTP(events <-chan GenericMessage, wg *sync.WaitGroup, logger *log.Logger) <-chan GenericMessage {
returnEvents := make(chan GenericMessage)
func NotifyViaHTTP(events <-chan GenericMessage, wg *sync.WaitGroup, logger *log.Logger) {
armCallbackUrl := os.Getenv("NOTIFY_URL_ARM")
disarmCallbackUrl := os.Getenv("NOTIFY_URL_DISARM")
alarmCallbackUrl := os.Getenv("ALARM_URL_ARM")
armDisarmCallbackEnabled := (len(armCallbackUrl) != 0) && (len(disarmCallbackUrl) != 0)
alarmCallbackEnabled := (len(alarmCallbackUrl) != 0)
go func() {
wg.Add(1)
defer wg.Done()
for e := range events {
returnEvents <- e
if armDisarmCallbackEnabled {
inner_arm:
for _, basicElement := range e.Messages {
@ -116,6 +113,4 @@ func NotifyViaHTTP(events <-chan GenericMessage, wg *sync.WaitGroup, logger *log
}
}
}()
return returnEvents
}

View File

@ -1,6 +1,7 @@
package main
const TelegramMessageTemplate = `{{- range .Messages}}
const TelegramMessageTemplate = `Received following changes:
{{- range .Messages}}
:: {{.GetName}}: {{.FormatEvent}}
{{- else -}}
Huh, no messages - this is a bug