From 6b69fed5b8d7fb06bf36b6941f0a655467c2d39e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Rudowicz?= Date: Sun, 10 Mar 2024 10:12:04 +0100 Subject: [PATCH] Synchronize filtering goroutines --- filters.go | 14 ++++++++++++-- filters_test.go | 14 +++++++------- main.go | 4 ++-- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/filters.go b/filters.go index 9c7cbff..23b4768 100644 --- a/filters.go +++ b/filters.go @@ -2,6 +2,7 @@ package main import ( "log" + "sync" "git.sr.ht/~michalr/go-satel" ) @@ -20,12 +21,15 @@ func isBasicEventElementOkay(basicEventElement satel.BasicEventElement, allowedT return false } -func FilterByTypeOrIndex(ev <-chan satel.Event, allowedTypes []satel.ChangeType, allowedIndexes []int) <-chan satel.Event { +func FilterByTypeOrIndex(ev <-chan satel.Event, wg *sync.WaitGroup, allowedTypes []satel.ChangeType, allowedIndexes []int) <-chan satel.Event { returnChan := make(chan satel.Event) if (len(allowedTypes) == 0) && (len(allowedIndexes) == 0) { // no allowed types == all types are allowed go func() { + wg.Add(1) + defer wg.Done() + for e := range ev { returnChan <- e } @@ -33,6 +37,9 @@ func FilterByTypeOrIndex(ev <-chan satel.Event, allowedTypes []satel.ChangeType, }() } else { go func() { + wg.Add(1) + defer wg.Done() + for e := range ev { retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)} for _, basicEventElement := range e.BasicEvents { @@ -51,10 +58,13 @@ func FilterByTypeOrIndex(ev <-chan satel.Event, allowedTypes []satel.ChangeType, return returnChan } -func FilterByLastSeen(ev <-chan satel.Event, dataStore *DataStore, logger *log.Logger) <-chan satel.Event { +func FilterByLastSeen(ev <-chan satel.Event, wg *sync.WaitGroup, dataStore *DataStore, logger *log.Logger) <-chan satel.Event { returnChan := make(chan satel.Event) go func() { + wg.Add(1) + defer wg.Done() + for e := range ev { retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)} for _, basicEventElement := range e.BasicEvents { diff --git a/filters_test.go b/filters_test.go index 32470e4..f6f52df 100644 --- a/filters_test.go +++ b/filters_test.go @@ -18,7 +18,7 @@ func TestSatelEventTypeFiltering(t *testing.T) { go func() { wg.Add(1) - for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{satel.ArmedPartition, satel.PartitionFireAlarm}, []int{}) { + for e := range FilterByTypeOrIndex(testEvents, &wg, []satel.ChangeType{satel.ArmedPartition, satel.PartitionFireAlarm}, []int{}) { receivedEvents = append(receivedEvents, e) } wg.Done() @@ -46,7 +46,7 @@ func TestSatelEventTypeFiltering_NoAllowedEventTypesMeansAllAreAllowed(t *testin go func() { wg.Add(1) - for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{}, []int{}) { + for e := range FilterByTypeOrIndex(testEvents, &wg, []satel.ChangeType{}, []int{}) { receivedEvents = append(receivedEvents, e) } wg.Done() @@ -72,7 +72,7 @@ func TestSatelIndexFiltering(t *testing.T) { go func() { wg.Add(1) - for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{}, []int{1, 3}) { + for e := range FilterByTypeOrIndex(testEvents, &wg, []satel.ChangeType{}, []int{1, 3}) { receivedEvents = append(receivedEvents, e) } wg.Done() @@ -101,7 +101,7 @@ func TestSatelIndexFiltering_NoAllowedEventTypesMeansAllAreAllowed(t *testing.T) go func() { wg.Add(1) - for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{}, []int{}) { + for e := range FilterByTypeOrIndex(testEvents, &wg, []satel.ChangeType{}, []int{}) { receivedEvents = append(receivedEvents, e) } wg.Done() @@ -134,7 +134,7 @@ func TestSatelLastSeenFiltering(t *testing.T) { go func() { wg.Add(1) - for e := range FilterByLastSeen(testEvents, &ds, fakeLog) { + for e := range FilterByLastSeen(testEvents, &wg, &ds, fakeLog) { receivedEvents = append(receivedEvents, e) } wg.Done() @@ -170,7 +170,7 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) { go func() { wg.Add(1) - for e := range FilterByLastSeen(testEvents, &ds, fakeLog) { + for e := range FilterByLastSeen(testEvents, &wg, &ds, fakeLog) { receivedEvents = append(receivedEvents, e) } wg.Done() @@ -196,7 +196,7 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) { ds = MakeDataStore(fakeLog, tempFileName) go func() { wg.Add(1) - for e := range FilterByLastSeen(testEvents, &ds, fakeLog) { + for e := range FilterByLastSeen(testEvents, &wg, &ds, fakeLog) { receivedEvents = append(receivedEvents, e) } wg.Done() diff --git a/main.go b/main.go index b5adef4..78c427a 100644 --- a/main.go +++ b/main.go @@ -158,8 +158,8 @@ func main() { go CloseSatelOnCtrlC(s) for e := range FilterByTypeOrIndex( - FilterByLastSeen(s.Events, &dataStore, log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)), - allowedTypes, allowedIndexes) { + FilterByLastSeen(s.Events, &wg, &dataStore, log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)), + &wg, allowedTypes, allowedIndexes) { logger.Print("Received change from SATEL: ", e) for _, chatId := range chatIds { sendTgMessage(tgEvents, e.BasicEvents, chatId)