diff --git a/main.go b/main.go index 2aa4ecf..4075042 100644 --- a/main.go +++ b/main.go @@ -23,14 +23,14 @@ type TgSender struct { } func (self TgSender) Send(msg GenericMessage) error { - to_send := tgbotapi.NewMessage(msg.chat_id, msg.msg) - _, err := self.bot.Send(to_send) + toSend := tgbotapi.NewMessage(msg.chatId, msg.msg) + _, err := self.bot.Send(toSend) return err } -func send_tg_message(tg_events chan GenericMessage, msg string, chat_ids []int64) { - for _, chat_id := range chat_ids { - tg_events <- GenericMessage{chat_id, msg} +func sendTgMessage(tgEvents chan GenericMessage, msg string, chatIds []int64) { + for _, chatId := range chatIds { + tgEvents <- GenericMessage{chatId, msg} } } @@ -43,49 +43,61 @@ func (self RealSleeper) Sleep(ch chan<- interface{}) { ch <- nil } -func main() { - var ( - wg sync.WaitGroup - tg_events = make(chan GenericMessage) - sleeper = RealSleeper{time.Second * 15} - ) - satel_api_addr := flag.String("satel-addr", "", "Address that should be used to connect to the SATEL device") - satel_api_port := flag.String("satel-port", "7094", "Port that should be used to connect to the SATEL device") - chat_id_raw := flag.String("tg-chat-id", "", "Telegram Chat ID where to send updates. Use \",\" to specify multiple IDs.") +func getCmdLineParams() (string, []int64) { + satelApiAddr := flag.String("satel-addr", "", "Address that should be used to connect to the SATEL device") + satelApiPort := flag.String("satel-port", "7094", "Port that should be used to connect to the SATEL device") + chatIdRaw := flag.String("tg-chat-id", "", "Telegram Chat ID where to send updates. Use \",\" to specify multiple IDs.") flag.Parse() - if len(*satel_api_addr) == 0 || len(*satel_api_port) == 0 || len(*chat_id_raw) == 0 { + if len(*satelApiAddr) == 0 || len(*satelApiPort) == 0 || len(*chatIdRaw) == 0 { fmt.Println("Use --satel-addr=ADDR, --satel-port=PORT and --tg-chat-id=CHAT_ID command line flags to continue.") os.Exit(1) } - chat_ids_strings := strings.Split(*chat_id_raw, ",") - var chat_ids []int64 - for _, chat_id_str := range chat_ids_strings { - chat_id, err := strconv.ParseInt(chat_id_str, 10, 64) + chatIdsStrings := strings.Split(*chatIdRaw, ",") + var chatIds []int64 + for _, chatIdStr := range chatIdsStrings { + chatId, err := strconv.ParseInt(chatIdStr, 10, 64) if err != nil { - fmt.Printf("Tried to use a non-int value for one of tg_chat_ids: %s. That's bad.", chat_id_str) + fmt.Printf("Tried to use a non-int value for one of tg_chatIds: %s. That's bad.", chatIdStr) os.Exit(1) } - chat_ids = append(chat_ids, chat_id) + chatIds = append(chatIds, chatId) } - satel_addr := fmt.Sprintf("%s:%s", *satel_api_addr, *satel_api_port) - satel_conn, err := net.Dial("tcp", satel_addr) + satelAddr := fmt.Sprintf("%s:%s", *satelApiAddr, *satelApiPort) + return satelAddr, chatIds +} + +func makeSatel(satelAddr string) *satel.Satel { + satelConn, err := net.Dial("tcp", satelAddr) if err != nil { panic(err) } - s := satel.NewConfig(satel_conn, satel.Config{EventsQueueSize: 1000}) + return satel.NewConfig(satelConn, satel.Config{EventsQueueSize: 10}) +} + +func main() { + var ( + wg sync.WaitGroup + tgEvents = make(chan GenericMessage, 100) + sleeper = RealSleeper{time.Second * MessageNotMoreOftenThanSeconds} + ) + + satelAddr, chatIds := getCmdLineParams() + + s := makeSatel(satelAddr) + bot, err := tgbotapi.NewBotAPI(os.Getenv("TELEGRAM_APITOKEN")) if err != nil { panic(err) } tgSender := TgSender{bot} - go tg_sender_worker(tg_events, tgSender, &wg, sleeper) - for e, ok := <-s.Events; ok; e, ok = <-s.Events { - send_tg_message(tg_events, fmt.Sprintln("Change from SATEL: ", "type", e.Type, "index", e.Index, "value", e.Value), chat_ids) + go tgSenderWorker(tgEvents, tgSender, &wg, sleeper) + for e := range s.Events { + sendTgMessage(tgEvents, fmt.Sprintln("Change from SATEL: ", "type", e.Type, "index", e.Index, "value", e.Value), chatIds) } - close(tg_events) + close(tgEvents) wg.Wait() } diff --git a/sender_worker.go b/sender_worker.go index 55713ba..36ba108 100644 --- a/sender_worker.go +++ b/sender_worker.go @@ -6,8 +6,8 @@ import ( ) type GenericMessage struct { - chat_id int64 - msg string + chatId int64 + msg string } type Sender interface { @@ -18,7 +18,7 @@ type Sleeper interface { Sleep(ch chan<- interface{}) } -func tg_sender_worker(tg_events <-chan GenericMessage, s Sender, wg *sync.WaitGroup, sleeper Sleeper) { +func tgSenderWorker(tgEvents <-chan GenericMessage, s Sender, wg *sync.WaitGroup, sleeper Sleeper) { wg.Add(1) defer wg.Done() messagesToSend := make(map[int64]*strings.Builder) @@ -28,37 +28,37 @@ func tg_sender_worker(tg_events <-chan GenericMessage, s Sender, wg *sync.WaitGr loop: for { select { - case ev, ok := <-tg_events: + case ev, ok := <-tgEvents: if !ok { break loop } // Collect all messages to send them at once - _, messageBuilderExists := messagesToSend[ev.chat_id] + _, messageBuilderExists := messagesToSend[ev.chatId] if !messageBuilderExists { - messagesToSend[ev.chat_id] = &strings.Builder{} + messagesToSend[ev.chatId] = &strings.Builder{} } - messagesToSend[ev.chat_id].WriteString(ev.msg) - messagesToSend[ev.chat_id].WriteRune('\n') + messagesToSend[ev.chatId].WriteString(ev.msg) + messagesToSend[ev.chatId].WriteRune('\n') if !waitingStarted { waitingStarted = true sleeper.Sleep(timeoutEvents) } case <-timeoutEvents: waitingStarted = false - for chat_id, msgBuilder := range messagesToSend { - err := s.Send(GenericMessage{chat_id, msgBuilder.String()}) + for chatId, msgBuilder := range messagesToSend { + err := s.Send(GenericMessage{chatId, msgBuilder.String()}) if err != nil { // TODO: handle it better panic(err) } - delete(messagesToSend, chat_id) + delete(messagesToSend, chatId) } } } // If anything is left to be sent, send it now - for chat_id, msgBuilder := range messagesToSend { - err := s.Send(GenericMessage{chat_id, msgBuilder.String()}) + for chatId, msgBuilder := range messagesToSend { + err := s.Send(GenericMessage{chatId, msgBuilder.String()}) if err != nil { // TODO: handle it better panic(err) diff --git a/sender_worker_test.go b/sender_worker_test.go index 0ad381a..4001793 100644 --- a/sender_worker_test.go +++ b/sender_worker_test.go @@ -16,29 +16,34 @@ func (self *MockSender) Send(msg GenericMessage) error { } type MockSleeper struct { - ch *chan<- interface{} + ch *chan<- interface{} + callCount int } func (self *MockSleeper) Sleep(ch chan<- interface{}) { if self.ch == nil { self.ch = &ch } + self.callCount += 1 } func TestMessageThrottling(t *testing.T) { testEvents := make(chan GenericMessage) wg := sync.WaitGroup{} mockSender := MockSender{make([]GenericMessage, 0)} - mockSleeper := MockSleeper{} - go tg_sender_worker(testEvents, &mockSender, &wg, &mockSleeper) + mockSleeper := MockSleeper{nil, 0} + go tgSenderWorker(testEvents, &mockSender, &wg, &mockSleeper) testEvents <- GenericMessage{123, "test1"} testEvents <- GenericMessage{124, "test3"} testEvents <- GenericMessage{123, "test2"} testEvents <- GenericMessage{124, "test4"} + assert.Equal(t, 1, mockSleeper.callCount) *mockSleeper.ch <- nil + assert.Equal(t, 1, mockSleeper.callCount) testEvents <- GenericMessage{123, "test5"} close(testEvents) wg.Wait() + assert.Equal(t, 2, mockSleeper.callCount) assert.Len(t, mockSender.messages, 3) assert.Contains(t, mockSender.messages, GenericMessage{123, "test1\ntest2\n"})