1
0
Fork 0

Synchronize filtering goroutines

This commit is contained in:
Michał Rudowicz 2024-03-10 10:12:04 +01:00
parent 0f858c7767
commit 6b69fed5b8
3 changed files with 21 additions and 11 deletions

View File

@ -2,6 +2,7 @@ package main
import ( import (
"log" "log"
"sync"
"git.sr.ht/~michalr/go-satel" "git.sr.ht/~michalr/go-satel"
) )
@ -20,12 +21,15 @@ func isBasicEventElementOkay(basicEventElement satel.BasicEventElement, allowedT
return false return false
} }
func FilterByTypeOrIndex(ev <-chan satel.Event, allowedTypes []satel.ChangeType, allowedIndexes []int) <-chan satel.Event { func FilterByTypeOrIndex(ev <-chan satel.Event, wg *sync.WaitGroup, allowedTypes []satel.ChangeType, allowedIndexes []int) <-chan satel.Event {
returnChan := make(chan satel.Event) returnChan := make(chan satel.Event)
if (len(allowedTypes) == 0) && (len(allowedIndexes) == 0) { if (len(allowedTypes) == 0) && (len(allowedIndexes) == 0) {
// no allowed types == all types are allowed // no allowed types == all types are allowed
go func() { go func() {
wg.Add(1)
defer wg.Done()
for e := range ev { for e := range ev {
returnChan <- e returnChan <- e
} }
@ -33,6 +37,9 @@ func FilterByTypeOrIndex(ev <-chan satel.Event, allowedTypes []satel.ChangeType,
}() }()
} else { } else {
go func() { go func() {
wg.Add(1)
defer wg.Done()
for e := range ev { for e := range ev {
retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)} retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)}
for _, basicEventElement := range e.BasicEvents { for _, basicEventElement := range e.BasicEvents {
@ -51,10 +58,13 @@ func FilterByTypeOrIndex(ev <-chan satel.Event, allowedTypes []satel.ChangeType,
return returnChan return returnChan
} }
func FilterByLastSeen(ev <-chan satel.Event, dataStore *DataStore, logger *log.Logger) <-chan satel.Event { func FilterByLastSeen(ev <-chan satel.Event, wg *sync.WaitGroup, dataStore *DataStore, logger *log.Logger) <-chan satel.Event {
returnChan := make(chan satel.Event) returnChan := make(chan satel.Event)
go func() { go func() {
wg.Add(1)
defer wg.Done()
for e := range ev { for e := range ev {
retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)} retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)}
for _, basicEventElement := range e.BasicEvents { for _, basicEventElement := range e.BasicEvents {

View File

@ -18,7 +18,7 @@ func TestSatelEventTypeFiltering(t *testing.T) {
go func() { go func() {
wg.Add(1) wg.Add(1)
for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{satel.ArmedPartition, satel.PartitionFireAlarm}, []int{}) { for e := range FilterByTypeOrIndex(testEvents, &wg, []satel.ChangeType{satel.ArmedPartition, satel.PartitionFireAlarm}, []int{}) {
receivedEvents = append(receivedEvents, e) receivedEvents = append(receivedEvents, e)
} }
wg.Done() wg.Done()
@ -46,7 +46,7 @@ func TestSatelEventTypeFiltering_NoAllowedEventTypesMeansAllAreAllowed(t *testin
go func() { go func() {
wg.Add(1) wg.Add(1)
for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{}, []int{}) { for e := range FilterByTypeOrIndex(testEvents, &wg, []satel.ChangeType{}, []int{}) {
receivedEvents = append(receivedEvents, e) receivedEvents = append(receivedEvents, e)
} }
wg.Done() wg.Done()
@ -72,7 +72,7 @@ func TestSatelIndexFiltering(t *testing.T) {
go func() { go func() {
wg.Add(1) wg.Add(1)
for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{}, []int{1, 3}) { for e := range FilterByTypeOrIndex(testEvents, &wg, []satel.ChangeType{}, []int{1, 3}) {
receivedEvents = append(receivedEvents, e) receivedEvents = append(receivedEvents, e)
} }
wg.Done() wg.Done()
@ -101,7 +101,7 @@ func TestSatelIndexFiltering_NoAllowedEventTypesMeansAllAreAllowed(t *testing.T)
go func() { go func() {
wg.Add(1) wg.Add(1)
for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{}, []int{}) { for e := range FilterByTypeOrIndex(testEvents, &wg, []satel.ChangeType{}, []int{}) {
receivedEvents = append(receivedEvents, e) receivedEvents = append(receivedEvents, e)
} }
wg.Done() wg.Done()
@ -134,7 +134,7 @@ func TestSatelLastSeenFiltering(t *testing.T) {
go func() { go func() {
wg.Add(1) wg.Add(1)
for e := range FilterByLastSeen(testEvents, &ds, fakeLog) { for e := range FilterByLastSeen(testEvents, &wg, &ds, fakeLog) {
receivedEvents = append(receivedEvents, e) receivedEvents = append(receivedEvents, e)
} }
wg.Done() wg.Done()
@ -170,7 +170,7 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) {
go func() { go func() {
wg.Add(1) wg.Add(1)
for e := range FilterByLastSeen(testEvents, &ds, fakeLog) { for e := range FilterByLastSeen(testEvents, &wg, &ds, fakeLog) {
receivedEvents = append(receivedEvents, e) receivedEvents = append(receivedEvents, e)
} }
wg.Done() wg.Done()
@ -196,7 +196,7 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) {
ds = MakeDataStore(fakeLog, tempFileName) ds = MakeDataStore(fakeLog, tempFileName)
go func() { go func() {
wg.Add(1) wg.Add(1)
for e := range FilterByLastSeen(testEvents, &ds, fakeLog) { for e := range FilterByLastSeen(testEvents, &wg, &ds, fakeLog) {
receivedEvents = append(receivedEvents, e) receivedEvents = append(receivedEvents, e)
} }
wg.Done() wg.Done()

View File

@ -158,8 +158,8 @@ func main() {
go CloseSatelOnCtrlC(s) go CloseSatelOnCtrlC(s)
for e := range FilterByTypeOrIndex( for e := range FilterByTypeOrIndex(
FilterByLastSeen(s.Events, &dataStore, log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)), FilterByLastSeen(s.Events, &wg, &dataStore, log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)),
allowedTypes, allowedIndexes) { &wg, allowedTypes, allowedIndexes) {
logger.Print("Received change from SATEL: ", e) logger.Print("Received change from SATEL: ", e)
for _, chatId := range chatIds { for _, chatId := range chatIds {
sendTgMessage(tgEvents, e.BasicEvents, chatId) sendTgMessage(tgEvents, e.BasicEvents, chatId)