1
0
Fork 0
hswro-alarm-bot/filters.go

221 lines
5.8 KiB
Go
Raw Normal View History

2024-02-13 22:20:15 +00:00
package main
import (
2024-02-15 19:06:06 +00:00
"log"
2024-03-10 09:12:04 +00:00
"sync"
2024-02-15 19:06:06 +00:00
2024-03-04 20:11:34 +00:00
"git.sr.ht/~michalr/go-satel"
2024-02-13 22:20:15 +00:00
)
type SyncFilter[MsgType any] interface {
Then(what SyncFilter[MsgType]) SyncFilter[MsgType]
Call(msg MsgType)
}
type SyncFilterImpl[MsgType any] struct {
next SyncFilter[MsgType]
}
func (impl *SyncFilterImpl[MsgType]) Then(what SyncFilter[MsgType]) SyncFilter[MsgType] {
impl.next = what
return what
}
func (impl *SyncFilterImpl[MsgType]) CallNext(msg MsgType) {
if impl.next != nil {
impl.next.Call(msg)
2024-02-18 17:44:08 +00:00
}
}
type CollectFromChannel[MsgType any] struct{ SyncFilterImpl[MsgType] }
func (collect *CollectFromChannel[MsgType]) Call(msg MsgType) {
collect.CallNext(msg)
}
func (collect CollectFromChannel[MsgType]) Collect(events <-chan MsgType, wg *sync.WaitGroup, onClose func()) {
wg.Add(1)
go func() {
defer wg.Done()
defer onClose()
for e := range events {
collect.Call(e)
}
}()
2024-02-13 22:20:15 +00:00
}
2024-02-14 20:34:16 +00:00
type ThrottleSync struct {
SyncFilterImpl[GenericMessage]
2024-02-18 18:00:49 +00:00
events chan GenericMessage
}
2024-03-10 09:12:04 +00:00
func appendToGenericMessage(msg *GenericMessage, new *GenericMessage) *GenericMessage {
if msg == nil {
msg = &GenericMessage{make([]satel.BasicEventElement, 0)}
2024-02-18 18:00:49 +00:00
}
throughNewMessages:
for _, newEv := range new.Messages {
for i, oldEv := range msg.Messages {
if oldEv.Index == newEv.Index && oldEv.Type == newEv.Type {
// this message was seen - update its value
msg.Messages[i].Value = newEv.Value
continue throughNewMessages
}
}
// apparently this type of message was not yet seen, save it
msg.Messages = append(msg.Messages, newEv)
}
return msg
2024-02-18 18:00:49 +00:00
}
func MakeThrottleSync(sleeper Sleeper, logger *log.Logger, wg *sync.WaitGroup) *ThrottleSync {
events := make(chan GenericMessage)
throttle := ThrottleSync{SyncFilterImpl[GenericMessage]{}, events}
2024-02-14 20:34:16 +00:00
wg.Add(1)
2024-02-14 20:34:16 +00:00
go func() {
timeoutEvents := make(chan interface{})
var currentEvent *GenericMessage = nil
loop:
for {
select {
case ev, ok := <-events:
if !ok {
break loop
}
if currentEvent == nil {
logger.Print("Waiting for more messages to arrive before sending...")
sleeper.Sleep(timeoutEvents)
}
currentEvent = appendToGenericMessage(currentEvent, &ev)
case <-timeoutEvents:
logger.Print("Time's up, sending all messages we've got for now.")
throttle.CallNext(*currentEvent)
currentEvent = nil
2024-02-14 20:34:16 +00:00
}
}
// If anything is left to be sent, send it now
if currentEvent != nil {
throttle.CallNext(*currentEvent)
}
wg.Done()
logger.Print("Throttling goroutine finishing")
2024-03-03 11:59:02 +00:00
}()
return &throttle
}
func (throttle *ThrottleSync) Close() { close(throttle.events) }
func (throttle *ThrottleSync) Call(msg GenericMessage) {
throttle.events <- msg
}
type Convert[InMsgType any] struct {
out SyncFilter[GenericMessage]
convert func(InMsgType) GenericMessage
}
func MakeConvert[InMsgType any](convertFunc func(InMsgType) GenericMessage) *Convert[InMsgType] {
return &Convert[InMsgType]{nil, convertFunc}
}
func (convert *Convert[InMsgType]) Call(msg InMsgType) {
if convert.out == nil {
panic("Use ConvertTo() to set next element in the chain.")
}
convert.out.Call(convert.convert(msg))
}
func (convert *Convert[InMsgType]) ConvertTo(out SyncFilter[GenericMessage]) SyncFilter[GenericMessage] {
convert.out = out
return out
}
func (convert *Convert[InMsgType]) Then(_ SyncFilter[InMsgType]) SyncFilter[InMsgType] {
panic("Use ConvertTo() with Convert object instead of Then().")
}
type FilterByLastSeen struct {
SyncFilterImpl[satel.Event]
dataStore *DataStore
}
func MakeFilterByLastSeen(dataStore *DataStore) *FilterByLastSeen {
return &FilterByLastSeen{SyncFilterImpl[satel.Event]{}, dataStore}
}
func (filter *FilterByLastSeen) Call(ev satel.Event) {
retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)}
for _, basicEventElement := range ev.BasicEvents {
lastSeen := filter.dataStore.GetSystemState()
val, ok := lastSeen[EventKey{basicEventElement.Type, basicEventElement.Index}]
if !ok || val.Value != basicEventElement.Value {
retEv.BasicEvents = append(retEv.BasicEvents, basicEventElement)
// TODO: flush to disk only after the loop finishes
filter.dataStore.SetSystemState(EventKey{basicEventElement.Type, basicEventElement.Index},
EventValue{basicEventElement.Value})
}
}
if len(retEv.BasicEvents) != 0 {
filter.CallNext(retEv)
}
}
type FilterByTypeOrIndex struct {
SyncFilterImpl[satel.Event]
allowedTypes []SatelChangeType
allowedIndexes []int
}
func MakeFilterByTypeOrIndex(allowedTypes []SatelChangeType, allowedIndexes []int) *FilterByTypeOrIndex {
return &FilterByTypeOrIndex{SyncFilterImpl[satel.Event]{}, allowedTypes, allowedIndexes}
}
func isBasicEventElementOkay(basicEventElement satel.BasicEventElement, allowedTypes []SatelChangeType, allowedIndexes []int) bool {
for _, allowedType := range allowedTypes {
if allowedType.GetChangeType() == basicEventElement.Type {
return true
}
}
for _, allowedIndex := range allowedIndexes {
if allowedIndex == basicEventElement.Index {
return true
}
}
return false
}
func (filter *FilterByTypeOrIndex) Call(ev satel.Event) {
if (len(filter.allowedTypes) == 0) && (len(filter.allowedIndexes) == 0) {
// no allowed types == all types are allowed
filter.CallNext(ev)
} else {
retEv := satel.Event{BasicEvents: FilterBasicElementsByTypeOrIndex(ev.BasicEvents,
filter.allowedTypes, filter.allowedIndexes)}
if len(retEv.BasicEvents) != 0 {
filter.CallNext(retEv)
}
}
2024-03-03 11:59:02 +00:00
}
func FilterBasicElementsByTypeOrIndex(input []satel.BasicEventElement, allowedTypes []SatelChangeType, allowedIndexes []int) []satel.BasicEventElement {
if (len(allowedTypes) == 0) && (len(allowedIndexes) == 0) {
return input
}
retval := make([]satel.BasicEventElement, 0)
for _, basicEventElement := range input {
if isBasicEventElementOkay(basicEventElement, allowedTypes, allowedIndexes) {
retval = append(retval, basicEventElement)
}
}
return retval
}