1
0
Fork 0
hswro-alarm-bot/mqtt_worker.go

44 lines
1010 B
Go
Raw Normal View History

2024-08-24 09:56:32 +00:00
package main
import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"log"
"sync"
)
func configure_mqtt(config *MqttConfig) mqtt.Client {
opts := mqtt.NewClientOptions()
opts.AddBroker(config.BrokerUri)
opts.SetClientID("hswro-alarm-bot")
opts.SetUsername(config.Username)
opts.SetPassword(config.Password)
// opts.SetDefaultPublishHandler(messagePubHandler)
// opts.OnConnect = func() {}
// opts.OnConnectionLost = connectLostHandler // TODO: reconnect?
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
return client
}
func HandleMqtt(events <-chan GenericMessage, s Sender, wg *sync.WaitGroup, logger *log.Logger, config *MqttConfig) <-chan GenericMessage {
returnEvents := make(chan GenericMessage)
go func() {
wg.Add(1)
defer wg.Done()
mqttClient := configure_mqtt(config)
for e := range events {
returnEvents <- e
}
close(returnEvents)
mqttClient.Disconnect(250)
}()
return returnEvents
}