Synchronous filter for type conversion in stream
This commit is contained in:
parent
1e899a5351
commit
b0d87a2d64
|
@ -115,3 +115,25 @@ func (throttle *ThrottleSync) Close() { close(throttle.events) }
|
||||||
func (throttle *ThrottleSync) Call(msg GenericMessage) {
|
func (throttle *ThrottleSync) Call(msg GenericMessage) {
|
||||||
throttle.events <- msg
|
throttle.events <- msg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Convert[InMsgType any] struct {
|
||||||
|
SyncFilterImpl[InMsgType]
|
||||||
|
out SyncFilter[GenericMessage]
|
||||||
|
convert func(InMsgType) GenericMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
func MakeConvert[InMsgType any](convertFunc func(InMsgType) GenericMessage) *Convert[InMsgType] {
|
||||||
|
return &Convert[InMsgType]{SyncFilterImpl[InMsgType]{}, nil, convertFunc}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (convert *Convert[InMsgType]) Call(msg InMsgType) {
|
||||||
|
convert.out.Call(convert.convert(msg))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (convert *Convert[InMsgType]) ConvertTo(out SyncFilter[GenericMessage]) {
|
||||||
|
convert.out = out
|
||||||
|
}
|
||||||
|
|
||||||
|
func (convert *Convert[InMsgType]) Then(_ SyncFilter[InMsgType]) {
|
||||||
|
panic("Use ConvertTo() with Convert object")
|
||||||
|
}
|
||||||
|
|
|
@ -236,16 +236,18 @@ func (self *MockSleeper) Sleep(ch chan<- interface{}) {
|
||||||
self.callCount += 1
|
self.callCount += 1
|
||||||
}
|
}
|
||||||
|
|
||||||
type SyncMockFilter struct {
|
type GenericSyncMockFilter[T any] struct {
|
||||||
SyncFilterImpl[GenericMessage]
|
SyncFilterImpl[T]
|
||||||
collected []GenericMessage
|
collected []T
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *SyncMockFilter) Call(msg GenericMessage) {
|
func (self *GenericSyncMockFilter[T]) Call(msg T) {
|
||||||
self.collected = append(self.collected, msg)
|
self.collected = append(self.collected, msg)
|
||||||
self.CallNext(msg)
|
self.CallNext(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SyncMockFilter = GenericSyncMockFilter[GenericMessage]
|
||||||
|
|
||||||
func TestSyncCollect(t *testing.T) {
|
func TestSyncCollect(t *testing.T) {
|
||||||
testEvents := make(chan GenericMessage)
|
testEvents := make(chan GenericMessage)
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
|
@ -322,3 +324,33 @@ func TestThrottleSync(t *testing.T) {
|
||||||
assert.Contains(t, mock.collected[1].Messages, tplMessageTest4)
|
assert.Contains(t, mock.collected[1].Messages, tplMessageTest4)
|
||||||
assert.Len(t, mock.collected[1].Messages, 1)
|
assert.Len(t, mock.collected[1].Messages, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConvert_failsWhenNotConverting(t *testing.T) {
|
||||||
|
a := assert.New(t)
|
||||||
|
tested := MakeConvert[int](func(in int) GenericMessage {
|
||||||
|
a.Equal(in, 1)
|
||||||
|
return GenericMessage{}
|
||||||
|
})
|
||||||
|
mock := &GenericSyncMockFilter[int]{}
|
||||||
|
|
||||||
|
a.Panics(func() {
|
||||||
|
tested.Then(mock)
|
||||||
|
tested.Call(1)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConvert(t *testing.T) {
|
||||||
|
a := assert.New(t)
|
||||||
|
numCalled := 0
|
||||||
|
tested := MakeConvert[int](func(in int) GenericMessage {
|
||||||
|
a.Equal(in, 1)
|
||||||
|
numCalled += 1
|
||||||
|
return GenericMessage{}
|
||||||
|
})
|
||||||
|
mock := &SyncMockFilter{}
|
||||||
|
|
||||||
|
tested.ConvertTo(mock)
|
||||||
|
tested.Call(1)
|
||||||
|
|
||||||
|
a.Equal(numCalled, 1)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue