From 1e899a535128c0f4e4f26581ab37684eb1732198 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Rudowicz?= Date: Wed, 8 Jan 2025 20:24:45 +0100 Subject: [PATCH] Actual usage of synchronous workers --- filters.go | 58 -------------------- filters_sync.go | 22 ++++++++ filters_test.go | 95 -------------------------------- main.go | 17 +++--- sender_sync.go | 36 ++++++++++--- sender_worker.go | 134 ---------------------------------------------- templates_test.go | 19 ++----- 7 files changed, 67 insertions(+), 314 deletions(-) diff --git a/filters.go b/filters.go index 3ffe4ce..a0d16c2 100644 --- a/filters.go +++ b/filters.go @@ -86,61 +86,3 @@ func FilterByLastSeen(ev <-chan satel.Event, wg *sync.WaitGroup, dataStore *Data return returnChan } - -func appendToGenericMessage(msg *GenericMessage, new *GenericMessage) *GenericMessage { - if msg == nil { - msg = &GenericMessage{make([]satel.BasicEventElement, 0)} - } - -throughNewMessages: - for _, newEv := range new.Messages { - for i, oldEv := range msg.Messages { - if oldEv.Index == newEv.Index && oldEv.Type == newEv.Type { - // this message was seen - update its value - msg.Messages[i].Value = newEv.Value - continue throughNewMessages - } - } - // apparently this type of message was not yet seen, save it - msg.Messages = append(msg.Messages, newEv) - } - return msg -} - -func Throttle(inputEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper Sleeper, logger *log.Logger) <-chan GenericMessage { - returnChan := make(chan GenericMessage) - timeoutEvents := make(chan interface{}) - - wg.Add(1) - go func() { - defer wg.Done() - defer close(returnChan) - - var currentEvent *GenericMessage = nil - loop: - for { - select { - case ev, ok := <-inputEvents: - if !ok { - break loop - } - if currentEvent == nil { - logger.Print("Waiting for more messages to arrive before sending...") - sleeper.Sleep(timeoutEvents) - } - currentEvent = appendToGenericMessage(currentEvent, &ev) - case <-timeoutEvents: - logger.Print("Time's up, sending all messages we've got for now.") - returnChan <- *currentEvent - currentEvent = nil - } - } - - // If anything is left to be sent, send it now - if currentEvent != nil { - returnChan <- *currentEvent - } - }() - - return returnChan -} diff --git a/filters_sync.go b/filters_sync.go index c8a3f5e..747c95f 100644 --- a/filters_sync.go +++ b/filters_sync.go @@ -3,6 +3,8 @@ package main import ( "log" "sync" + + "git.sr.ht/~michalr/go-satel" ) type SyncFilter[MsgType any] interface { @@ -49,6 +51,26 @@ type ThrottleSync struct { events chan GenericMessage } +func appendToGenericMessage(msg *GenericMessage, new *GenericMessage) *GenericMessage { + if msg == nil { + msg = &GenericMessage{make([]satel.BasicEventElement, 0)} + } + +throughNewMessages: + for _, newEv := range new.Messages { + for i, oldEv := range msg.Messages { + if oldEv.Index == newEv.Index && oldEv.Type == newEv.Type { + // this message was seen - update its value + msg.Messages[i].Value = newEv.Value + continue throughNewMessages + } + } + // apparently this type of message was not yet seen, save it + msg.Messages = append(msg.Messages, newEv) + } + return msg +} + func MakeThrottleSync(sleeper Sleeper, logger *log.Logger, wg *sync.WaitGroup) *ThrottleSync { events := make(chan GenericMessage) diff --git a/filters_test.go b/filters_test.go index 54cd0c1..feb1ddb 100644 --- a/filters_test.go +++ b/filters_test.go @@ -236,101 +236,6 @@ func (self *MockSleeper) Sleep(ch chan<- interface{}) { self.callCount += 1 } -func TestThrottle(t *testing.T) { - testEvents := make(chan GenericMessage) - receivedEvents := make([]GenericMessage, 0) - wg := sync.WaitGroup{} - fakeLog := log.New(io.Discard, "", log.Ltime) - mockSleeper := MockSleeper{nil, 0} - - var ( - tplMessageTest1 = satel.BasicEventElement{Type: satel.ArmedPartition, Index: 1, Value: true} - tplMessageTest2 = satel.BasicEventElement{Type: satel.ZoneViolation, Index: 2, Value: true} - tplMessageTest3 = satel.BasicEventElement{Type: satel.ArmedPartition, Index: 1, Value: false} - tplMessageTest4 = satel.BasicEventElement{Type: satel.ZoneViolation, Index: 2, Value: false} - ) - - wg.Add(1) - go func() { - defer wg.Done() - - for e := range Throttle(testEvents, &wg, &mockSleeper, fakeLog) { - receivedEvents = append(receivedEvents, e) - } - }() - - testEvents <- GenericMessage{[]satel.BasicEventElement{tplMessageTest1}} - testEvents <- GenericMessage{[]satel.BasicEventElement{tplMessageTest2}} - testEvents <- GenericMessage{[]satel.BasicEventElement{tplMessageTest3}} - *mockSleeper.ch <- nil - - testEvents <- GenericMessage{[]satel.BasicEventElement{tplMessageTest4}} - - close(testEvents) - wg.Wait() - - assert.Equal(t, 2, mockSleeper.callCount) - assert.Len(t, receivedEvents, 2) - assert.Contains(t, receivedEvents[0].Messages, tplMessageTest2) - assert.Contains(t, receivedEvents[0].Messages, tplMessageTest3) - assert.Len(t, receivedEvents[0].Messages, 2) - - assert.Contains(t, receivedEvents[1].Messages, tplMessageTest4) - assert.Len(t, receivedEvents[1].Messages, 1) -} - -func makeMassiveEvent(element satel.BasicEventElement, numElements int) GenericMessage { - retval := GenericMessage{make([]satel.BasicEventElement, 0)} - - for i := 0; i < numElements; i++ { - retval.Messages = append(retval.Messages, element) - } - - return retval -} - -func TestThrottle_ManyMessagesInOneEvent(t *testing.T) { - testEvents := make(chan GenericMessage) - receivedEvents := make([]GenericMessage, 0) - wg := sync.WaitGroup{} - fakeLog := log.New(io.Discard, "", log.Ltime) - mockSleeper := MockSleeper{nil, 0} - - var ( - tplMessageTest1 = satel.BasicEventElement{Type: satel.ArmedPartition, Index: 1, Value: true} - tplMessageTest2 = satel.BasicEventElement{Type: satel.ZoneViolation, Index: 2, Value: true} - tplMessageTest3 = satel.BasicEventElement{Type: satel.ArmedPartition, Index: 1, Value: false} - tplMessageTest4 = satel.BasicEventElement{Type: satel.ZoneViolation, Index: 2, Value: false} - ) - - wg.Add(1) - go func() { - defer wg.Done() - for e := range Throttle(testEvents, &wg, &mockSleeper, fakeLog) { - receivedEvents = append(receivedEvents, e) - } - }() - - testEvents <- makeMassiveEvent(tplMessageTest1, 100) - testEvents <- makeMassiveEvent(tplMessageTest2, 100) - testEvents <- makeMassiveEvent(tplMessageTest3, 100) - *mockSleeper.ch <- nil - - testEvents <- makeMassiveEvent(tplMessageTest4, 100) - - close(testEvents) - wg.Wait() - - assert.Equal(t, 2, mockSleeper.callCount) - assert.Len(t, receivedEvents, 2) - assert.Contains(t, receivedEvents[0].Messages, tplMessageTest2) - assert.Contains(t, receivedEvents[0].Messages, tplMessageTest3) - assert.Len(t, receivedEvents[0].Messages, 2) - - assert.Contains(t, receivedEvents[1].Messages, tplMessageTest4) - assert.Len(t, receivedEvents[1].Messages, 1) -} - type SyncMockFilter struct { SyncFilterImpl[GenericMessage] collected []GenericMessage diff --git a/main.go b/main.go index 9c7a4be..7305947 100644 --- a/main.go +++ b/main.go @@ -80,13 +80,16 @@ func main() { dataStore := MakeDataStore(log.New(os.Stderr, "DataStore", log.Lmicroseconds), getPersistenceFilePath()) - Consume( - SendToMatterbridge( - SendToTg(Throttle(NotifyViaHTTP(tgEvents, config, &wg, log.New(os.Stderr, "HTTPNotify", log.Lmicroseconds)), - &wg, sleeper, log.New(os.Stderr, "MessageThrottle", log.Lmicroseconds)), - tgSender, &wg, log.New(os.Stderr, "SendToTg", log.Lmicroseconds), tgTpl), - s, config, &wg, log.New(os.Stderr, "SendToMatterbridge", log.Lmicroseconds), ircTpl), - ) + collect := CollectFromChannel[GenericMessage]{} + notifyViaHttp := MakeNofityViaHTTPSync(config, log.New(os.Stderr, "HTTPNotify", log.Lmicroseconds)) + throttle := MakeThrottleSync(sleeper, log.New(os.Stderr, "MessageThrottle", log.Lmicroseconds), &wg) + sendToTg := MakeSendToTelegramSync(tgSender, log.New(os.Stderr, "SendToTg", log.Lmicroseconds), tgTpl) + sendToMatterbridge := MakeSendToMatterbridgeSync(s, config, + log.New(os.Stderr, "SendToMatterbridge", log.Lmicroseconds), + ircTpl) + + collect.Then(notifyViaHttp).Then(throttle).Then(sendToTg).Then(sendToMatterbridge) + collect.Collect(tgEvents, &wg, func() {}) go CloseSatelOnCtrlC(s, &cleanShutdown) diff --git a/sender_sync.go b/sender_sync.go index e65af69..87cd886 100644 --- a/sender_sync.go +++ b/sender_sync.go @@ -21,8 +21,8 @@ type SendToTelegramSync struct { func MakeSendToTelegramSync(sender Sender, logger *log.Logger, - tpl *template.Template) SendToTelegramSync { - return SendToTelegramSync{SyncFilterImpl[GenericMessage]{}, sender, logger, tpl} + tpl *template.Template) *SendToTelegramSync { + return &SendToTelegramSync{SyncFilterImpl[GenericMessage]{}, sender, logger, tpl} } func (sendToTg *SendToTelegramSync) Call(msg GenericMessage) { @@ -47,8 +47,8 @@ type SendToMatterbridgeSync struct { func MakeSendToMatterbridgeSync(s SatelNameGetter, config AppConfig, logger *log.Logger, - tpl *template.Template) SendToMatterbridgeSync { - return SendToMatterbridgeSync{SyncFilterImpl[GenericMessage]{}, s, config, logger, tpl} + tpl *template.Template) *SendToMatterbridgeSync { + return &SendToMatterbridgeSync{SyncFilterImpl[GenericMessage]{}, s, config, logger, tpl} } func (mbridge *SendToMatterbridgeSync) Call(msg GenericMessage) { @@ -84,8 +84,32 @@ type NotifyViaHTTPSync struct { logger *log.Logger } -func MakeNofityViaHTTPSync(config AppConfig, logger *log.Logger) NotifyViaHTTPSync { - return NotifyViaHTTPSync{SyncFilterImpl[GenericMessage]{}, config, logger} +func MakeNofityViaHTTPSync(config AppConfig, logger *log.Logger) *NotifyViaHTTPSync { + return &NotifyViaHTTPSync{SyncFilterImpl[GenericMessage]{}, config, logger} +} + +func doHttpNotification(url string, logger *log.Logger) { + + if len(url) == 0 { + return + } + req, err := http.NewRequest(http.MethodPost, url, nil) + + client := http.Client{ + Timeout: httpTimeout, + } + res, err := client.Do(req) + if err != nil { + logger.Print("Could not POST ", url, ": ", err) + return + } + logger.Print("Notified via HTTP with result ", res.StatusCode) +} + +func notifyAllHttp(urls []string, logger *log.Logger) { + for _, uri := range urls { + doHttpNotification(uri, logger) + } } func (notifyViaHttp *NotifyViaHTTPSync) Call(msg GenericMessage) { diff --git a/sender_worker.go b/sender_worker.go index 36e9ee2..09e26c7 100644 --- a/sender_worker.go +++ b/sender_worker.go @@ -1,16 +1,8 @@ package main import ( - "bytes" - "encoding/json" - "fmt" "html/template" - "log" - "net/http" - "sync" "time" - - "git.sr.ht/~michalr/go-satel" ) const ( @@ -26,134 +18,8 @@ type Sleeper interface { Sleep(ch chan<- interface{}) } -func Consume(events <-chan GenericMessage) { - go func() { - for range events { - } - }() -} - -func SendToTg(events <-chan GenericMessage, s Sender, wg *sync.WaitGroup, logger *log.Logger, tpl *template.Template) <-chan GenericMessage { - returnEvents := make(chan GenericMessage) - - wg.Add(1) - go func() { - defer wg.Done() - defer close(returnEvents) - - for e := range events { - returnEvents <- e - err := s.Send(e, tpl) - if err != nil { - // TODO: handle it better - panic(err) - } - } - }() - - return returnEvents -} - -func doHttpNotification(url string, logger *log.Logger) { - - if len(url) == 0 { - return - } - req, err := http.NewRequest(http.MethodPost, url, nil) - - client := http.Client{ - Timeout: httpTimeout, - } - res, err := client.Do(req) - if err != nil { - logger.Print("Could not POST ", url, ": ", err) - return - } - logger.Print("Notified via HTTP with result ", res.StatusCode) -} - -func notifyAllHttp(urls []string, logger *log.Logger) { - for _, uri := range urls { - go doHttpNotification(uri, logger) - } -} - -func NotifyViaHTTP(events <-chan GenericMessage, config AppConfig, wg *sync.WaitGroup, logger *log.Logger) <-chan GenericMessage { - returnEvents := make(chan GenericMessage) - - wg.Add(1) - go func() { - defer wg.Done() - defer close(returnEvents) - - for e := range events { - returnEvents <- e - inner_arm: - for _, basicElement := range e.Messages { - if (basicElement.Index == NotificationPartitionIndex) && (basicElement.Type == satel.ArmedPartition) { - if basicElement.Value == ArmedPartition_Armed { - notifyAllHttp(config.ArmCallbackUrls, logger) - } else { - notifyAllHttp(config.DisarmCallbackUrls, logger) - } - break inner_arm - } - } - inner_alarm: - for _, basicElement := range e.Messages { - if basicElement.Type == satel.PartitionAlarm { - if basicElement.Value == PartitionAlarm_Alarm { - notifyAllHttp(config.AlarmCallbackUrls, logger) - break inner_alarm - } - } - } - - } - }() - - return returnEvents -} - type MatterbridgeMessage struct { Text string `json:"text"` Username string `json:"username"` Gateway string `json:"gateway"` } - -func SendToMatterbridge(events <-chan GenericMessage, s SatelNameGetter, config AppConfig, wg *sync.WaitGroup, logger *log.Logger, tpl *template.Template) <-chan GenericMessage { - returnEvents := make(chan GenericMessage) - - wg.Add(1) - go func() { - defer wg.Done() - defer close(returnEvents) - - for e := range events { - returnEvents <- e - for _, matterbridgeConfig := range config.Matterbridge { - body, err := json.Marshal(MatterbridgeMessage{ - Text: e.Format(tpl, s, logger), - Username: matterbridgeConfig.Username, - Gateway: matterbridgeConfig.Gateway, - }) - if err != nil { - logger.Fatal("Could not marshal a JSON message: ", err) - } - req, err := http.NewRequest(http.MethodPost, matterbridgeConfig.URI, bytes.NewBuffer(body)) - req.Header["Authorization"] = []string{fmt.Sprint("Bearer ", matterbridgeConfig.Token)} - client := http.Client{ - Timeout: httpTimeout, - } - res, err := client.Do(req) - if err != nil { - logger.Print("Could not POST ", matterbridgeConfig.URI, ": ", err) - return - } - logger.Print("Notified via Matterbridge with result ", res.StatusCode) - } - } - }() - - return returnEvents -} diff --git a/templates_test.go b/templates_test.go index ae7b790..0e6d558 100644 --- a/templates_test.go +++ b/templates_test.go @@ -4,7 +4,6 @@ import ( "html/template" "io" "log" - "sync" "testing" "git.sr.ht/~michalr/go-satel" @@ -27,29 +26,21 @@ var ( ) func TestTelegramTemplate(t *testing.T) { - testEvents := make(chan GenericMessage) - wg := sync.WaitGroup{} mockSender := MockTemplateSender{s: MockSatelNameGetter{"mockPart"}} tpl, err := template.New("TestTemplate").Parse(TelegramMessageTemplate) assert.NoError(t, err) - Consume(SendToTg(testEvents, &mockSender, &wg, log.New(io.Discard, "", log.Ltime), tpl)) - testEvents <- GenericMessage{[]satel.BasicEventElement{tplMessageTest1, tplMessageTest2}} - close(testEvents) - wg.Wait() + tgSender := MakeSendToTelegramSync(&mockSender, log.New(io.Discard, "", log.Ltime), tpl) + tgSender.Call(GenericMessage{[]satel.BasicEventElement{tplMessageTest1, tplMessageTest2}}) // assert.Equal(t, "siemka", mockSender.message) } func TestIRCTemplate(t *testing.T) { - testEvents := make(chan GenericMessage) - wg := sync.WaitGroup{} mockSender := MockTemplateSender{s: MockSatelNameGetter{"mockPart"}} - tpl, err := template.New("TestIRCTemplate").Parse(IRCMessageTemplate) + tpl, err := template.New("TestTemplate").Parse(IRCMessageTemplate) assert.NoError(t, err) - Consume(SendToTg(testEvents, &mockSender, &wg, log.New(io.Discard, "", log.Ltime), tpl)) - testEvents <- GenericMessage{[]satel.BasicEventElement{tplMessageTest1, tplMessageTest2}} - close(testEvents) - wg.Wait() + tgSender := MakeSendToTelegramSync(&mockSender, log.New(io.Discard, "", log.Ltime), tpl) + tgSender.Call(GenericMessage{[]satel.BasicEventElement{tplMessageTest1, tplMessageTest2}}) // assert.Equal(t, "siemka", mockSender.message) }