1
0
Fork 0

Compare commits

..

2 Commits

Author SHA1 Message Date
Michał Rudowicz 0c873ff77d Fix memory leak* when new message is received
*) technically the memory could still be referenced, so some would argue
   that it was not a memory leak, but when I see a program balooning up
   without a reason for it so OOM Killer deals with it I call it a
   memory leak, sue me
2024-03-10 14:08:39 +01:00
Michał Rudowicz 2b17a34ff7 Fix hang on close 2024-03-10 13:55:29 +01:00
3 changed files with 56 additions and 3 deletions

View File

@ -89,7 +89,7 @@ func FilterByLastSeen(ev <-chan satel.Event, wg *sync.WaitGroup, dataStore *Data
func appendToGenericMessage(msg *GenericMessage, new *GenericMessage) *GenericMessage {
if msg == nil {
return new
msg = &GenericMessage{new.ChatIds, make([]satel.BasicEventElement, 0)}
}
throughNewMessages:
@ -100,10 +100,10 @@ throughNewMessages:
msg.Messages[i].Value = newEv.Value
continue throughNewMessages
}
}
// apparently this type of message was not yet seen, save it
msg.Messages = append(msg.Messages, newEv)
}
}
return msg
}

View File

@ -270,3 +270,55 @@ func TestThrottle(t *testing.T) {
assert.Contains(t, receivedEvents[1].Messages, tplMessageTest4)
assert.Len(t, receivedEvents[1].Messages, 1)
}
func makeMassiveEvent(element satel.BasicEventElement, numElements int) GenericMessage {
retval := GenericMessage{TgChatId{123}, make([]satel.BasicEventElement, 0)}
for i := 0; i < numElements; i++ {
retval.Messages = append(retval.Messages, element)
}
return retval
}
func TestThrottle_ManyMessagesInOneEvent(t *testing.T) {
testEvents := make(chan GenericMessage)
receivedEvents := make([]GenericMessage, 0)
wg := sync.WaitGroup{}
fakeLog := log.New(io.Discard, "", log.Ltime)
mockSleeper := MockSleeper{nil, 0}
var (
tplMessageTest1 = satel.BasicEventElement{Type: satel.ArmedPartition, Index: 1, Value: true}
tplMessageTest2 = satel.BasicEventElement{Type: satel.ZoneViolation, Index: 2, Value: true}
tplMessageTest3 = satel.BasicEventElement{Type: satel.ArmedPartition, Index: 1, Value: false}
tplMessageTest4 = satel.BasicEventElement{Type: satel.ZoneViolation, Index: 2, Value: false}
)
go func() {
wg.Add(1)
for e := range Throttle(testEvents, &wg, &mockSleeper, fakeLog) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
}()
testEvents <- makeMassiveEvent(tplMessageTest1, 100)
testEvents <- makeMassiveEvent(tplMessageTest2, 100)
testEvents <- makeMassiveEvent(tplMessageTest3, 100)
*mockSleeper.ch <- nil
testEvents <- makeMassiveEvent(tplMessageTest4, 100)
close(testEvents)
wg.Wait()
assert.Equal(t, 2, mockSleeper.callCount)
assert.Len(t, receivedEvents, 2)
assert.Contains(t, receivedEvents[0].Messages, tplMessageTest2)
assert.Contains(t, receivedEvents[0].Messages, tplMessageTest3)
assert.Len(t, receivedEvents[0].Messages, 2)
assert.Contains(t, receivedEvents[1].Messages, tplMessageTest4)
assert.Len(t, receivedEvents[1].Messages, 1)
}

View File

@ -115,6 +115,7 @@ func NotifyViaHTTP(events <-chan GenericMessage, wg *sync.WaitGroup, logger *log
}
}
}
close(returnEvents)
}()
return returnEvents