1
0
Fork 0

Compare commits

...

5 Commits

Author SHA1 Message Date
Michał Rudowicz 7e9c341270 Documentation 2025-01-02 14:09:04 +01:00
Michał Rudowicz 64780e249c Fix race condition in Matterbridge sender 2025-01-02 14:08:39 +01:00
Michał Rudowicz 15f6e8bc24 Matterbridge is optional and can appear many times 2025-01-02 13:42:53 +01:00
Michał Rudowicz 94a47b2962 Matterbridge sender 2025-01-02 13:42:53 +01:00
Michał Rudowicz 64f4930b4e Avoid possible race condition with waitgroups
Looks like calling wg.Add(1) from a goroutine can cause a race condition
with wg.Wait() - can be easily avoided by calling Add() before the
subroutine is created.
2025-01-02 13:40:25 +01:00
10 changed files with 208 additions and 57 deletions

View File

@ -7,24 +7,74 @@ Warning: this is a proof of concept, don't rely on it
It was very basically tested, and while it seems to work, approach it without any expectations.
In other words - treat it as a toy, not as a tool that will save your life or valuables, because it probably won't.
## Usage
## Running
```
$ TELEGRAM_APITOKEN=YOUR_API_TOKEN ./alarm_bot --satel_addr=127.0.0.1 --satel_port=31337 --tg_chat_id=YOUR_CHAT_ID_FROM_BOTFATHER
$ TELEGRAM_APITOKEN=YOUR_API_TOKEN ./alarm-bot
```
Remember that `hswro-alarm-bot.yml` should be present in the current directory.
## Configuration via YAML file
Configuration should be stored in `hswro-alarm-bot.yml` in current directory.
```yaml
satel-addr: "192.168.13.37"
tg-chat-ids:
- 1234
- 5678
- 9876
allowed-types:
- "zone-isolate"
- "zone-alarm"
allowed-indexes:
- 5678
- 1337
pool-interval: 5m
arm-callback-urls:
- "http://192.168.1.10/hello"
- "http://example.com/api"
disarm-callback-urls:
- "http://192.168.1.10/bye"
- "http://example.com/api2"
alarm-callback-urls:
- "http://192.168.1.10/ohno"
- "http://example.com/api3"
matterbridge:
- uri: "http://localhost:4242/api/message"
token: "test_token_1"
gateway: "test_gateway_1"
username: "test_username_1"
```
- `pool-interval` sets how often will the Satel device be asked for changes
- `satel-addr` sets the IP address of the Satel device
- `tg-chat-ids` sets which telegram groups will receive notifications about alarm state changes
### Matterbridge integration
As Telegram is kinda questionable it's beneficial to get the notifications somewhere else as well. This bot can utilize Matterbridge's API to send notifications to many different chat platforms. Check the [Matterbridge API documentation](https://github.com/42wim/matterbridge/wiki/Api) to learn how to configure your existing MAtterbridge installation.
Tip: You can still have independent Telegram and Matterbridge notifications - simply set up an additional gateway in Matterbridge that pushes the messages to your non-telegram chat groups but omits the Telegram group to avoid duplicated messages. Those other chat groups can be the same as those where your regular messages are bridged - Matterbridge will handle that just fine without the need for duplicated bots etc.
To configure that set the `matterbridge` part of the config file with the following parameters:
- `uri` - URI to your Matterbridge's API
- `token` - Token that you've set up for yout Matterbridge API. Situation where this token is not configured is not taken into account.
- `gateway` - Set it to the same value that is set in the `name` parameter to your `[[gateway]]` in `matterbridge.toml`
- `username` - The username from which the alarm bot messages will appear to API.
### Notification via HTTP callbacks
Set the following environment variables:
Set the values in following parts of the config file:
- `NOTIFY_URL_ARM` - for an URL that will be POST when partition **0** is armed
- `NOTIFY_URL_DISARM` - for an URL that will be POST when partition **0** is unarmed
- `ALARM_URL_ARM` - for an URL that will be POST when **any** partition alarm is activated
- `arm-callback-urls` - for an URL that will be POST when partition **0** is armed
- `disarm-callback-urls` - for an URL that will be POST when partition **0** is unarmed
- `alarm-callback-urls` - for an URL that will be POST when **any** partition alarm is activated
### Filtering events by change type
It's possible to filter events by change type. Use the `--allowed-types=TYPE1,TYPE2,...` command line parameter to do that. If that parameter is not provided, then all change types are allowed, otherwise only the provided ones will be used for notifications.
It's possible to filter events by change type. Use the `allowed-types` part of the config file to do that. If that parameter is not provided, then all change types are allowed, otherwise only the provided ones will be used for notifications.
Supported types are:
- `zone-violation`
@ -69,7 +119,7 @@ Supported types are:
### Filtering events by index (which meaning depends on the change type)
Use the `--allowed-indexes=1,2,3,...` command line parameter to set the list of allowed indexes (of course provide your own list instead of `1,2,3,...`). If that parameter is not provided, then all indexes are allowed; otherwise the notification is sent for all indexes.
Use the `allowed-indexes` part of config file to set the list of allowed indexes. If that parameter is not provided, then all indexes are allowed; otherwise the notification is sent only for provided indexes.
## example systemd unit
@ -81,10 +131,8 @@ After=network.target
[Service]
Type=simple
ExecStart=/path/to/alarm_bot --satel-addr=192.168.13.37 --satel-port=7094 --tg-chat-id=1234,4567,9876
ExecStart=/path/to/alarm_bot
Environment=TELEGRAM_APITOKEN=YOUR_API_TOKEN
Environment=NOTIFY_URL_DISARM="http://localhost/disarmed"
Environment=NOTIFY_URL_ARM="http://localhost/armed"
DynamicUser=True
RuntimeDirectory=hswro-alarm-bot
StateDirectory=hswro-alarm-bot

View File

@ -25,16 +25,25 @@ type SatelChangeType struct {
changeType satel.ChangeType
}
type MatterbridgeConfig struct {
URI string `yaml:"uri"`
Token string `yaml:"token"`
Gateway string `yaml:"gateway"`
/// Username from which the messages will appear
Username string `yaml:"username"`
}
type AppConfig struct {
SatelAddr string `yaml:"satel-addr"`
ChatIds []int64 `yaml:"tg-chat-ids"`
AllowedTypes []SatelChangeType `yaml:"allowed-types"`
AllowedIndexes []int `yaml:"allowed-indexes"`
PoolInterval OwnDuration `yaml:"pool-interval"`
ArmCallbackUrls []string `yaml:"arm-callback-urls"`
DisarmCallbackUrls []string `yaml:"disarm-callback-urls"`
AlarmCallbackUrls []string `yaml:"alarm-callback-urls"`
WriteMemoryProfile bool `yaml:"write-memory-profile"`
SatelAddr string `yaml:"satel-addr"`
ChatIds []int64 `yaml:"tg-chat-ids"`
AllowedTypes []SatelChangeType `yaml:"allowed-types"`
AllowedIndexes []int `yaml:"allowed-indexes"`
PoolInterval OwnDuration `yaml:"pool-interval"`
ArmCallbackUrls []string `yaml:"arm-callback-urls"`
DisarmCallbackUrls []string `yaml:"disarm-callback-urls"`
AlarmCallbackUrls []string `yaml:"alarm-callback-urls"`
WriteMemoryProfile bool `yaml:"write-memory-profile"`
Matterbridge []MatterbridgeConfig `yaml:"matterbridge"`
}
func (m *SatelChangeType) UnmarshalYAML(unmarshal func(interface{}) error) error {

View File

@ -32,6 +32,15 @@ disarm-callback-urls:
alarm-callback-urls:
- "test alarm callback url"
- "second test alarm callback url"
matterbridge:
- uri: test_uri_1
token: test_token_1
gateway: test_gateway_1
username: test_username_1
- uri: test_uri_2
token: test_token_2
gateway: test_gateway_2
username: test_username_2
`
func TestParseYamlConfig(t *testing.T) {
@ -47,4 +56,13 @@ func TestParseYamlConfig(t *testing.T) {
a.ElementsMatch([]string{"test arm callback url", "second test arm callback url"}, actualConfig.ArmCallbackUrls)
a.ElementsMatch([]string{"test disarm callback url", "second test disarm callback url"}, actualConfig.DisarmCallbackUrls)
a.ElementsMatch([]string{"test alarm callback url", "second test alarm callback url"}, actualConfig.AlarmCallbackUrls)
a.Equal(actualConfig.Matterbridge[0].URI, "test_uri_1")
a.Equal(actualConfig.Matterbridge[0].Token, "test_token_1")
a.Equal(actualConfig.Matterbridge[0].Gateway, "test_gateway_1")
a.Equal(actualConfig.Matterbridge[0].Username, "test_username_1")
a.Equal(actualConfig.Matterbridge[1].URI, "test_uri_2")
a.Equal(actualConfig.Matterbridge[1].Token, "test_token_2")
a.Equal(actualConfig.Matterbridge[1].Gateway, "test_gateway_2")
a.Equal(actualConfig.Matterbridge[1].Username, "test_username_2")
}

View File

@ -21,9 +21,10 @@ func dumpMemoryProfile(log *log.Logger) {
}
func WriteMemoryProfilePeriodically(wg *sync.WaitGroup, log *log.Logger, close <-chan interface{}) {
wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()
memoryProfileTicker := time.NewTicker(24 * time.Hour)
defer memoryProfileTicker.Stop()
select {

View File

@ -26,19 +26,20 @@ func FilterByTypeOrIndex(ev <-chan satel.Event, wg *sync.WaitGroup, allowedTypes
if (len(allowedTypes) == 0) && (len(allowedIndexes) == 0) {
// no allowed types == all types are allowed
wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()
defer close(returnChan)
for e := range ev {
returnChan <- e
}
close(returnChan)
}()
} else {
wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()
defer close(returnChan)
for e := range ev {
retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)}
@ -51,7 +52,6 @@ func FilterByTypeOrIndex(ev <-chan satel.Event, wg *sync.WaitGroup, allowedTypes
returnChan <- retEv
}
}
close(returnChan)
}()
}
@ -61,9 +61,10 @@ func FilterByTypeOrIndex(ev <-chan satel.Event, wg *sync.WaitGroup, allowedTypes
func FilterByLastSeen(ev <-chan satel.Event, wg *sync.WaitGroup, dataStore *DataStore, logger *log.Logger) <-chan satel.Event {
returnChan := make(chan satel.Event)
wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()
defer close(returnChan)
for e := range ev {
retEv := satel.Event{BasicEvents: make([]satel.BasicEventElement, 0)}
@ -81,7 +82,6 @@ func FilterByLastSeen(ev <-chan satel.Event, wg *sync.WaitGroup, dataStore *Data
}
}
logger.Print("Satel disconnected.")
close(returnChan)
}()
return returnChan
@ -111,8 +111,11 @@ func Throttle(inputEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper Sle
returnChan := make(chan GenericMessage)
timeoutEvents := make(chan interface{})
wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()
defer close(returnChan)
var currentEvent *GenericMessage = nil
loop:
for {
@ -137,8 +140,6 @@ func Throttle(inputEvents <-chan GenericMessage, wg *sync.WaitGroup, sleeper Sle
if currentEvent != nil {
returnChan <- *currentEvent
}
close(returnChan)
wg.Done()
}()
return returnChan

View File

@ -16,12 +16,13 @@ func TestSatelEventTypeFiltering(t *testing.T) {
receivedEvents := make([]satel.Event, 0)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()
for e := range FilterByTypeOrIndex(testEvents, &wg, []SatelChangeType{{satel.ArmedPartition}, {satel.PartitionFireAlarm}}, []int{}) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
}()
testEvents <- makeTestSatelEvent(satel.ArmedPartition, 1, true)
@ -44,12 +45,13 @@ func TestSatelEventTypeFiltering_NoAllowedEventTypesMeansAllAreAllowed(t *testin
receivedEvents := make([]satel.Event, 0)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()
for e := range FilterByTypeOrIndex(testEvents, &wg, []SatelChangeType{}, []int{}) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
}()
for index, ct := range SUPPORTED_CHANGE_TYPES {
@ -70,12 +72,13 @@ func TestSatelIndexFiltering(t *testing.T) {
receivedEvents := make([]satel.Event, 0)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()
for e := range FilterByTypeOrIndex(testEvents, &wg, []SatelChangeType{}, []int{1, 3}) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
}()
testEvents <- makeTestSatelEvent(satel.ArmedPartition, 1, true)
@ -99,12 +102,13 @@ func TestSatelIndexFiltering_NoAllowedEventTypesMeansAllAreAllowed(t *testing.T)
wg := sync.WaitGroup{}
myReasonableMaxIndex := 100 // I wanted to use math.MaxInt at first, but it's kind of a waste of time here
wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()
for e := range FilterByTypeOrIndex(testEvents, &wg, []SatelChangeType{}, []int{}) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
}()
for i := 0; i < myReasonableMaxIndex; i++ {
@ -132,12 +136,13 @@ func TestSatelLastSeenFiltering(t *testing.T) {
fakeLog := log.New(io.Discard, "", log.Ltime)
ds := MakeDataStore(fakeLog, tempFileName)
wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()
for e := range FilterByLastSeen(testEvents, &wg, &ds, fakeLog) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
}()
testEvents <- makeTestSatelEvent(satel.ArmedPartition, 1, true)
@ -168,12 +173,13 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) {
fakeLog := log.New(io.Discard, "", log.Ltime)
ds := MakeDataStore(fakeLog, tempFileName)
wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()
for e := range FilterByLastSeen(testEvents, &wg, &ds, fakeLog) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
}()
testEvents <- makeTestSatelEvent(satel.ArmedPartition, 1, true)
@ -194,12 +200,13 @@ func TestSatelLastSeenFilteringWithPersistence(t *testing.T) {
testEvents = make(chan satel.Event)
receivedEvents = make([]satel.Event, 0)
ds = MakeDataStore(fakeLog, tempFileName)
wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()
for e := range FilterByLastSeen(testEvents, &wg, &ds, fakeLog) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
}()
receivedEvents = make([]satel.Event, 0)
@ -243,12 +250,13 @@ func TestThrottle(t *testing.T) {
tplMessageTest4 = satel.BasicEventElement{Type: satel.ZoneViolation, Index: 2, Value: false}
)
wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()
for e := range Throttle(testEvents, &wg, &mockSleeper, fakeLog) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
}()
testEvents <- GenericMessage{[]satel.BasicEventElement{tplMessageTest1}}
@ -295,12 +303,12 @@ func TestThrottle_ManyMessagesInOneEvent(t *testing.T) {
tplMessageTest4 = satel.BasicEventElement{Type: satel.ZoneViolation, Index: 2, Value: false}
)
wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()
for e := range Throttle(testEvents, &wg, &mockSleeper, fakeLog) {
receivedEvents = append(receivedEvents, e)
}
wg.Done()
}()
testEvents <- makeMassiveEvent(tplMessageTest1, 100)

11
main.go
View File

@ -69,14 +69,17 @@ func main() {
tgSender := TgSender{bot, s, log.New(os.Stderr, "TgFormatter", log.Lmicroseconds), config.ChatIds}
tpl := template.Must(template.New("TelegramMessage").Parse(TelegramMessageTemplate))
tgTpl := template.Must(template.New("TelegramMessage").Parse(TelegramMessageTemplate))
ircTpl := template.Must(template.New("IRCMessage").Parse(IRCMessageTemplate))
dataStore := MakeDataStore(log.New(os.Stderr, "DataStore", log.Lmicroseconds), getPersistenceFilePath())
Consume(
SendToTg(Throttle(NotifyViaHTTP(tgEvents, config, &wg, log.New(os.Stderr, "HTTPNotify", log.Lmicroseconds)),
&wg, sleeper, log.New(os.Stderr, "MessageThrottle", log.Lmicroseconds)),
tgSender, &wg, log.New(os.Stderr, "SendToTg", log.Lmicroseconds), tpl),
SendToMatterbridge(
SendToTg(Throttle(NotifyViaHTTP(tgEvents, config, &wg, log.New(os.Stderr, "HTTPNotify", log.Lmicroseconds)),
&wg, sleeper, log.New(os.Stderr, "MessageThrottle", log.Lmicroseconds)),
tgSender, &wg, log.New(os.Stderr, "SendToTg", log.Lmicroseconds), tgTpl),
s, config, &wg, log.New(os.Stderr, "SendToMatterbridge", log.Lmicroseconds), ircTpl),
)
go CloseSatelOnCtrlC(s, &cleanShutdown)

View File

@ -1,6 +1,9 @@
package main
import (
"bytes"
"encoding/json"
"fmt"
"html/template"
"log"
"net/http"
@ -31,9 +34,11 @@ func Consume(events <-chan GenericMessage) {
func SendToTg(events <-chan GenericMessage, s Sender, wg *sync.WaitGroup, logger *log.Logger, tpl *template.Template) <-chan GenericMessage {
returnEvents := make(chan GenericMessage)
wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()
defer close(returnEvents)
for e := range events {
returnEvents <- e
err := s.Send(e, tpl)
@ -42,7 +47,6 @@ func SendToTg(events <-chan GenericMessage, s Sender, wg *sync.WaitGroup, logger
panic(err)
}
}
close(returnEvents)
}()
return returnEvents
@ -74,9 +78,11 @@ func notifyAllHttp(urls []string, logger *log.Logger, wg *sync.WaitGroup) {
func NotifyViaHTTP(events <-chan GenericMessage, config AppConfig, wg *sync.WaitGroup, logger *log.Logger) <-chan GenericMessage {
returnEvents := make(chan GenericMessage)
wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()
defer close(returnEvents)
for e := range events {
returnEvents <- e
inner_arm:
@ -101,7 +107,46 @@ func NotifyViaHTTP(events <-chan GenericMessage, config AppConfig, wg *sync.Wait
}
}
close(returnEvents)
}()
return returnEvents
}
type MatterbridgeMessage struct {
Text string `json:"text"`
Username string `json:"username"`
Gateway string `json:"gateway"`
}
func SendToMatterbridge(events <-chan GenericMessage, s SatelNameGetter, config AppConfig, wg *sync.WaitGroup, logger *log.Logger, tpl *template.Template) <-chan GenericMessage {
returnEvents := make(chan GenericMessage)
wg.Add(1)
go func() {
defer wg.Done()
defer close(returnEvents)
for e := range events {
returnEvents <- e
for _, matterbridgeConfig := range config.Matterbridge {
body, err := json.Marshal(MatterbridgeMessage{
Text: e.Format(tpl, s, logger),
Username: matterbridgeConfig.Username,
Gateway: matterbridgeConfig.Gateway,
})
if err != nil {
logger.Fatal("Could not marshal a JSON message: ", err)
}
req, err := http.NewRequest(http.MethodPost, matterbridgeConfig.URI, bytes.NewBuffer(body))
req.Header["Authorization"] = []string{fmt.Sprint("Bearer ", matterbridgeConfig.Token)}
res, err := http.DefaultClient.Do(req)
if err != nil {
logger.Print("Could not POST ", matterbridgeConfig.URI, ": ", err)
return
}
logger.Print("Notified via Matterbridge with result ", res.StatusCode)
}
}
}()
return returnEvents

View File

@ -5,3 +5,7 @@ const TelegramMessageTemplate = `{{- range .Messages}}
{{- else -}}
Huh, no messages - this is a bug
{{- end}}`
const IRCMessageTemplate = `{{- range .Messages}} {{.GetName}}: {{.FormatEvent}}; {{- else -}}
Huh, no messages - this is a bug
{{- end}}`

View File

@ -53,3 +53,17 @@ func TestTelegramTemplate(t *testing.T) {
// assert.Equal(t, "siemka", mockSender.message)
}
func TestIRCTemplate(t *testing.T) {
testEvents := make(chan GenericMessage)
wg := sync.WaitGroup{}
mockSender := MockTemplateSender{s: MockSatelNameGetter{"mockPart"}}
tpl, err := template.New("TestIRCTemplate").Parse(IRCMessageTemplate)
assert.NoError(t, err)
Consume(SendToTg(testEvents, &mockSender, &wg, log.New(io.Discard, "", log.Ltime), tpl))
testEvents <- GenericMessage{[]satel.BasicEventElement{tplMessageTest1, tplMessageTest2}}
close(testEvents)
wg.Wait()
// assert.Equal(t, "siemka", mockSender.message)
}