Initial attempt at some sort of LXMF chatbot
This commit is contained in:
parent
6018e8b593
commit
dd218c95e5
|
|
@ -0,0 +1,11 @@
|
|||
from lxmfbot import LXMFBot
|
||||
|
||||
bot = LXMFBot("HSWro Conference Bot")
|
||||
|
||||
|
||||
@bot.received
|
||||
def echo_msg(msg):
|
||||
msg.reply(msg.content)
|
||||
|
||||
|
||||
bot.run()
|
||||
|
|
@ -0,0 +1,103 @@
|
|||
import os
|
||||
from time import monotonic, sleep
|
||||
import RNS
|
||||
from LXMF import LXMRouter, LXMessage
|
||||
from appdirs import AppDirs
|
||||
from queue import Queue, Empty
|
||||
from types import SimpleNamespace
|
||||
from typing import NamedTuple, Optional, Callable
|
||||
|
||||
# Based on https://github.com/randogoth/lxmf-bot
|
||||
# Copyright (c) 2023 randogoth, provided under the MIT license
|
||||
|
||||
|
||||
class LXMFBot:
|
||||
path_request_timeout = 15 # seconds
|
||||
|
||||
def __init__(self, name, announce=360):
|
||||
self.name = name
|
||||
self.announce_time = announce # seconds
|
||||
self.delivery_callbacks = []
|
||||
self.receipts = []
|
||||
self.queue = Queue()
|
||||
self.announce_time = 360
|
||||
RNS.Reticulum(loglevel=RNS.LOG_VERBOSE)
|
||||
dirs = AppDirs(name, "hswro")
|
||||
self.config_path = os.path.join(dirs.user_data_dir, name)
|
||||
idfile = os.path.join(self.config_path, "identity")
|
||||
if not os.path.isdir(dirs.user_data_dir):
|
||||
os.mkdir(dirs.user_data_dir)
|
||||
if not os.path.isdir(self.config_path):
|
||||
os.mkdir(self.config_path)
|
||||
if not os.path.isfile(idfile):
|
||||
RNS.log('No Primary Identity file found, creating new...',
|
||||
RNS.LOG_INFO)
|
||||
id = RNS.Identity(True)
|
||||
id.to_file(idfile)
|
||||
self.id = RNS.Identity.from_file(idfile)
|
||||
RNS.log('Loaded identity from file', RNS.LOG_INFO)
|
||||
self.router = LXMRouter(
|
||||
storagepath=os.path.join(self.config_path, "router"))
|
||||
self.router.register_delivery_callback(self._message_received)
|
||||
self.source = self.router.register_delivery_identity(
|
||||
self.id, display_name=name)
|
||||
self.next_announce = None
|
||||
self._announce()
|
||||
|
||||
def _announce(self):
|
||||
if self.next_announce is None or monotonic() > self.next_announce:
|
||||
self.router.announce(self.source.hash)
|
||||
RNS.log(f'LXMF Announced: {RNS.prettyhexrep(self.source.hash)}',
|
||||
RNS.LOG_INFO)
|
||||
self.next_announce = monotonic() + self.announce_time
|
||||
|
||||
def received(self, function):
|
||||
RNS.log("Registering delivery callback function.")
|
||||
self.delivery_callbacks.append(function)
|
||||
return function
|
||||
|
||||
def _message_received(self, message):
|
||||
sender = RNS.hexrep(message.source_hash, delimit=False)
|
||||
receipt = RNS.hexrep(message.hash, delimit=False)
|
||||
RNS.log(f'Received message from <{sender}>', RNS.LOG_INFO)
|
||||
|
||||
if receipt not in self.receipts:
|
||||
self.receipts.append(receipt)
|
||||
if len(self.receipts) > 100:
|
||||
self.receipts.pop(0)
|
||||
for callback in self.delivery_callbacks:
|
||||
obj = {
|
||||
'lxmf': message,
|
||||
'reply': lambda msg: self.send(sender, msg),
|
||||
'sender': sender,
|
||||
'content': message.content.decode('utf-8'),
|
||||
'hash': receipt
|
||||
}
|
||||
msg = SimpleNamespace(**obj)
|
||||
callback(msg)
|
||||
|
||||
def send(self, destination, message, title='Reply',
|
||||
originally_path_requested=monotonic()):
|
||||
recipent_hash = bytes.fromhex(destination)
|
||||
|
||||
if not RNS.Transport.has_path(recipent_hash):
|
||||
RNS.log(f"Destination path to {destination} not known. Requesting")
|
||||
RNS.Transport.request_path(recipent_hash)
|
||||
|
||||
recipent_identity = RNS.Identity.recall(recipent_hash)
|
||||
dest = RNS.Destination(
|
||||
recipent_identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
|
||||
"lxmf", "delivery")
|
||||
lxm = LXMessage(dest, self.source, message,
|
||||
title=title, desired_method=LXMessage.DIRECT)
|
||||
lxm.try_propagation_on_fail = True
|
||||
RNS.log(f"Will send a message to {destination}")
|
||||
self.queue.put(lambda: self.router.handle_outbound(lxm))
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
try:
|
||||
self.queue.get(timeout=10)()
|
||||
except Empty:
|
||||
pass
|
||||
self._announce()
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
appdirs==1.4.4
|
||||
lxmf==0.8
|
||||
rns==1.0
|
||||
Loading…
Reference in New Issue