1
0
Fork 0

Join filters etc. to limit numbers of goroutines that aren't needed

References: https://todo.sr.ht/~michalr/hswro-alarm-bot/7
This commit is contained in:
Michał Rudowicz 2024-03-05 23:01:38 +01:00
parent 93b9c0c822
commit 8237eda13d
3 changed files with 26 additions and 65 deletions

View File

@ -6,10 +6,24 @@ import (
"git.sr.ht/~michalr/go-satel" "git.sr.ht/~michalr/go-satel"
) )
func FilterByType(ev <-chan satel.Event, allowedTypes []satel.ChangeType) <-chan satel.Event { func isBasicEventElementOkay(basicEventElement satel.BasicEventElement, allowedTypes []satel.ChangeType, allowedIndexes []int) bool {
for _, allowedType := range allowedTypes {
if allowedType == basicEventElement.Type {
return true
}
}
for _, allowedIndex := range allowedIndexes {
if allowedIndex == basicEventElement.Index {
return true
}
}
return false
}
func FilterByTypeOrIndex(ev <-chan satel.Event, allowedTypes []satel.ChangeType, allowedIndexes []int) <-chan satel.Event {
returnChan := make(chan satel.Event) returnChan := make(chan satel.Event)
if len(allowedTypes) == 0 { if (len(allowedTypes) == 0) && (len(allowedIndexes) == 0) {
// no allowed types == all types are allowed // no allowed types == all types are allowed
go func() { go func() {
for e := range ev { for e := range ev {
@ -22,45 +36,8 @@ func FilterByType(ev <-chan satel.Event, allowedTypes []satel.ChangeType) <-chan
for e := range ev { for e := range ev {
retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)} retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)}
for _, basicEventElement := range e.BasicEvents { for _, basicEventElement := range e.BasicEvents {
for _, allowedType := range allowedTypes { if isBasicEventElementOkay(basicEventElement, allowedTypes, allowedIndexes) {
if allowedType == basicEventElement.Type { retEv.BasicEvents = append(retEv.BasicEvents, basicEventElement)
retEv.BasicEvents = append(retEv.BasicEvents, basicEventElement)
continue
}
}
}
if len(retEv.BasicEvents) != 0 {
returnChan <- retEv
}
}
close(returnChan)
}()
}
return returnChan
}
func FilterByIndex(ev <-chan satel.Event, allowedIndexes []int) <-chan satel.Event {
returnChan := make(chan satel.Event)
if len(allowedIndexes) == 0 {
// no allowed indexes == all indexes are allowed
go func() {
for e := range ev {
returnChan <- e
}
close(returnChan)
}()
} else {
go func() {
for e := range ev {
retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)}
for _, basicEventElement := range e.BasicEvents {
for _, allowedIndex := range allowedIndexes {
if allowedIndex == basicEventElement.Index {
retEv.BasicEvents = append(retEv.BasicEvents, basicEventElement)
continue
}
} }
} }
if len(retEv.BasicEvents) != 0 { if len(retEv.BasicEvents) != 0 {
@ -93,20 +70,7 @@ func FilterByLastSeen(ev <-chan satel.Event, dataStore *DataStore, logger *log.L
returnChan <- retEv returnChan <- retEv
} }
} }
close(returnChan) logger.Print("Satel disconnected.")
}()
return returnChan
}
func CallWhenClosed(ev <-chan satel.Event, cbk func()) <-chan satel.Event {
returnChan := make(chan satel.Event)
go func() {
for e := range ev {
returnChan <- e
}
cbk()
close(returnChan) close(returnChan)
}() }()

View File

@ -18,7 +18,7 @@ func TestSatelEventTypeFiltering(t *testing.T) {
go func() { go func() {
wg.Add(1) wg.Add(1)
for e := range FilterByType(testEvents, []satel.ChangeType{satel.ArmedPartition, satel.PartitionFireAlarm}) { for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{satel.ArmedPartition, satel.PartitionFireAlarm}, []int{}) {
receivedEvents = append(receivedEvents, e) receivedEvents = append(receivedEvents, e)
} }
wg.Done() wg.Done()
@ -46,7 +46,7 @@ func TestSatelEventTypeFiltering_NoAllowedEventTypesMeansAllAreAllowed(t *testin
go func() { go func() {
wg.Add(1) wg.Add(1)
for e := range FilterByType(testEvents, []satel.ChangeType{}) { for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{}, []int{}) {
receivedEvents = append(receivedEvents, e) receivedEvents = append(receivedEvents, e)
} }
wg.Done() wg.Done()
@ -72,7 +72,7 @@ func TestSatelIndexFiltering(t *testing.T) {
go func() { go func() {
wg.Add(1) wg.Add(1)
for e := range FilterByIndex(testEvents, []int{1, 3}) { for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{}, []int{1, 3}) {
receivedEvents = append(receivedEvents, e) receivedEvents = append(receivedEvents, e)
} }
wg.Done() wg.Done()
@ -101,7 +101,7 @@ func TestSatelIndexFiltering_NoAllowedEventTypesMeansAllAreAllowed(t *testing.T)
go func() { go func() {
wg.Add(1) wg.Add(1)
for e := range FilterByIndex(testEvents, []int{}) { for e := range FilterByTypeOrIndex(testEvents, []satel.ChangeType{}, []int{}) {
receivedEvents = append(receivedEvents, e) receivedEvents = append(receivedEvents, e)
} }
wg.Done() wg.Done()

View File

@ -153,12 +153,9 @@ func main() {
go CloseSatelOnCtrlC(s) go CloseSatelOnCtrlC(s)
for e := range FilterByIndex(FilterByType( for e := range FilterByTypeOrIndex(
FilterByLastSeen( FilterByLastSeen(s.Events, &dataStore, log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)),
CallWhenClosed(s.Events, func() { logger.Print("Satel disconnected.") }), allowedTypes, allowedIndexes) {
&dataStore, log.New(os.Stderr, "FilterByLastSeen", log.Lmicroseconds)),
allowedTypes),
allowedIndexes) {
logger.Print("Received change from SATEL: ", e) logger.Print("Received change from SATEL: ", e)
for _, chatId := range chatIds { for _, chatId := range chatIds {
sendTgMessage(tgEvents, e.BasicEvents, chatId) sendTgMessage(tgEvents, e.BasicEvents, chatId)