package main import ( "log" "sync" "git.sr.ht/~michalr/go-satel" ) func isBasicEventElementOkay(basicEventElement satel.BasicEventElement, allowedTypes []SatelChangeType, allowedIndexes []int) bool { for _, allowedType := range allowedTypes { if allowedType.GetChangeType() == basicEventElement.Type { return true } } for _, allowedIndex := range allowedIndexes { if allowedIndex == basicEventElement.Index { return true } } return false } func FilterByTypeOrIndex(ev <-chan satel.Event, wg *sync.WaitGroup, allowedTypes []SatelChangeType, allowedIndexes []int) <-chan satel.Event { returnChan := make(chan satel.Event) if (len(allowedTypes) == 0) && (len(allowedIndexes) == 0) { // no allowed types == all types are allowed go func() { wg.Add(1) defer wg.Done() for e := range ev { returnChan <- e } close(returnChan) }() } else { go func() { wg.Add(1) defer wg.Done() for e := range ev { retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)} for _, basicEventElement := range e.BasicEvents { if isBasicEventElementOkay(basicEventElement, allowedTypes, allowedIndexes) { retEv.BasicEvents = append(retEv.BasicEvents, basicEventElement) } } if len(retEv.BasicEvents) != 0 { returnChan <- retEv } } close(returnChan) }() } return returnChan } func FilterByLastSeen(ev <-chan satel.Event, wg *sync.WaitGroup, dataStore *DataStore, logger *log.Logger) <-chan satel.Event { returnChan := make(chan satel.Event) go func() { wg.Add(1) defer wg.Done() for e := range ev { retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)} for _, basicEventElement := range e.BasicEvents { lastSeen := dataStore.GetSystemState() val, ok := lastSeen[EventKey{basicEventElement.Type, basicEventElement.Index}] if !ok || val.Value != basicEventElement.Value { retEv.BasicEvents = append(retEv.BasicEvents, basicEventElement) // TODO: flush to disk only after the loop finishes dataStore.SetSystemState(EventKey{basicEventElement.Type, basicEventElement.Index}, EventValue{basicEventElement.Value}) } } if len(retEv.BasicEvents) != 0 { returnChan <- retEv } } logger.Print("Satel disconnected.") close(returnChan) }() return returnChan } func appendToGenericMessage(msg *GenericMessage, new *GenericMessage) *GenericMessage { if msg == nil { msg = &GenericMessage{make([]satel.BasicEventElement, 0)} } throughNewMessages: for _, newEv := range new.Messages { for i, oldEv := range msg.Messages { if oldEv.Index == newEv.Index && oldEv.Type == newEv.Type { // this message was seen - update its value msg.Messages[i].Value = newEv.Value continue throughNewMessages } } // apparently this type of message was not yet seen, save it msg.Messages = append(msg.Messages, newEv) } return msg } func Throttle(inputEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper Sleeper, logger *log.Logger) <-chan GenericMessage { returnChan := make(chan GenericMessage) timeoutEvents := make(chan interface{}) go func() { wg.Add(1) var currentEvent *GenericMessage = nil loop: for { select { case ev, ok := <-inputEvents: if !ok { break loop } if currentEvent == nil { logger.Print("Waiting for more messages to arrive before sending...") sleeper.Sleep(timeoutEvents) } currentEvent = appendToGenericMessage(currentEvent, &ev) case <-timeoutEvents: logger.Print("Time's up, sending all messages we've got for now.") returnChan <- *currentEvent currentEvent = nil } } // If anything is left to be sent, send it now if currentEvent != nil { returnChan <- *currentEvent } close(returnChan) wg.Done() }() return returnChan }