nomadnet-website/groupchatbot/lxmfbot.py

104 lines
3.9 KiB
Python

import os
from time import monotonic, sleep
import RNS
from LXMF import LXMRouter, LXMessage
from appdirs import AppDirs
from queue import Queue, Empty
from types import SimpleNamespace
from typing import NamedTuple, Optional, Callable
# Based on https://github.com/randogoth/lxmf-bot
# Copyright (c) 2023 randogoth, provided under the MIT license
class LXMFBot:
path_request_timeout = 15 # seconds
def __init__(self, name, announce=360):
self.name = name
self.announce_time = announce # seconds
self.delivery_callbacks = []
self.receipts = []
self.queue = Queue()
self.announce_time = 360
RNS.Reticulum(loglevel=RNS.LOG_VERBOSE)
dirs = AppDirs(name, "hswro")
self.config_path = os.path.join(dirs.user_data_dir, name)
idfile = os.path.join(self.config_path, "identity")
if not os.path.isdir(dirs.user_data_dir):
os.mkdir(dirs.user_data_dir)
if not os.path.isdir(self.config_path):
os.mkdir(self.config_path)
if not os.path.isfile(idfile):
RNS.log('No Primary Identity file found, creating new...',
RNS.LOG_INFO)
id = RNS.Identity(True)
id.to_file(idfile)
self.id = RNS.Identity.from_file(idfile)
RNS.log('Loaded identity from file', RNS.LOG_INFO)
self.router = LXMRouter(
storagepath=os.path.join(self.config_path, "router"))
self.router.register_delivery_callback(self._message_received)
self.source = self.router.register_delivery_identity(
self.id, display_name=name)
self.next_announce = None
self._announce()
def _announce(self):
if self.next_announce is None or monotonic() > self.next_announce:
self.router.announce(self.source.hash)
RNS.log(f'LXMF Announced: {RNS.prettyhexrep(self.source.hash)}',
RNS.LOG_INFO)
self.next_announce = monotonic() + self.announce_time
def received(self, function):
RNS.log("Registering delivery callback function.")
self.delivery_callbacks.append(function)
return function
def _message_received(self, message):
sender = RNS.hexrep(message.source_hash, delimit=False)
receipt = RNS.hexrep(message.hash, delimit=False)
RNS.log(f'Received message from <{sender}>', RNS.LOG_INFO)
if receipt not in self.receipts:
self.receipts.append(receipt)
if len(self.receipts) > 100:
self.receipts.pop(0)
for callback in self.delivery_callbacks:
obj = {
'lxmf': message,
'reply': lambda msg: self.send(sender, msg),
'sender': sender,
'content': message.content.decode('utf-8'),
'hash': receipt
}
msg = SimpleNamespace(**obj)
callback(msg)
def send(self, destination, message, title='Reply',
originally_path_requested=monotonic()):
recipent_hash = bytes.fromhex(destination)
if not RNS.Transport.has_path(recipent_hash):
RNS.log(f"Destination path to {destination} not known. Requesting")
RNS.Transport.request_path(recipent_hash)
recipent_identity = RNS.Identity.recall(recipent_hash)
dest = RNS.Destination(
recipent_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
"lxmf", "delivery")
lxm = LXMessage(dest, self.source, message,
title=title, desired_method=LXMessage.DIRECT)
lxm.try_propagation_on_fail = True
RNS.log(f"Will send a message to {destination}")
self.queue.put(lambda: self.router.handle_outbound(lxm))
def run(self):
while True:
try:
self.queue.get(timeout=10)()
except Empty:
pass
self._announce()