from lxmfbot import LXMFBot from redis import Redis from time import monotonic from hashlib import sha256 import json from functools import reduce from typing import Iterable bot = LXMFBot("HSWro Conference Bot") PASS = '34a77e61000b2ba1f56201332ef64d93f9cdc60e63ab48bc255102586ef7e592' WRONG_LOGIN_BAN_DURATION = 30 USERS_NAME = "hswroconference:users" BANNED_NAME = "hswroconference:banned" def check_password(password: str) -> bool: gotten = sha256(f"hswrosalt{password}".encode( 'utf-8'), usedforsecurity=True).hexdigest() return PASS == gotten def ban_sender(r, userhash, until): r.rpush(BANNED_NAME, str(json.dumps({ "hash": userhash, "until": str(until), }))) def check_banned(r, userhash): for raw in r.lrange(BANNED_NAME, 0, -1): m = json.loads(raw.decode('utf-8')) if m['hash'] == userhash: if monotonic() > float(m['until']): r.lrem(BANNED_NAME, 0, raw) print(f"Ban expired for {userhash}, unbanning") return False return True def check_name(username: str) -> bool: if len(username) >= 16: return False return all(map(lambda c: c.isalnum(), username)) def handle_login(msg, r, from_known_sender, known_senders: Iterable): def login_usage(): msg.reply("Usage: `/login [NICK] [PASSWORD]`") parts = msg.content.split() if len(parts) != 3: login_usage() return assert parts[0] == "/login" if from_known_sender: msg.reply("You are already logged in.") return if not check_password(parts[2]): ban_sender(r, msg.sender, monotonic() + WRONG_LOGIN_BAN_DURATION) return if not check_name(parts[1]): msg.reply("Name can only consist of alphanumeric characters.") return r.rpush(USERS_NAME, str(json.dumps({ "hash": msg.sender, "nick": parts[1], }))) msg.reply(f"Logged in as {parts[1]}.") send_to(f"{parts[1]} joined the chat.", map( lambda x: x['hash'], known_senders)) def handle_logout(msg, r, from_known_sender, known_senders: Iterable): if not from_known_sender: return handle_help(msg) for raw in r.lrange(USERS_NAME, 0, -1): parsed = json.loads(raw.decode('utf-8')) if msg.sender == parsed['hash']: r.lrem(USERS_NAME, 0, raw) msg.reply(f"User {parsed['nick']} logged out.") send_to(f"{parsed['nick']} left the chat.", map( lambda x: x['hash'], filter(lambda x: x['hash'] != parsed['hash'], known_senders))) return def handle_whoami(msg, _, from_known_sender, known_senders: Iterable): if not from_known_sender: msg.reply("You not logged in.") return for s in known_senders: if s['hash'] == msg.sender: msg.reply(f"Logged in as {s['nick']}.") return assert False def handle_help(msg, _, __, ___): msg.reply("""Supported commands: /login [NICK] [PASSWORD] - Logs user in. Partyline messages will be sent. /logout - Logs user out. Partyline messages will stop. /whoami - Tells you your nic if you're logged in. /help - Displays this message.""") def send_to(text: str, receiver_hashes: Iterable[str]): for receiver in receiver_hashes: bot.send(receiver, text) COMMANDS = { "/login": handle_login, "/logout": handle_logout, "/whoami": handle_whoami, "/help": handle_help, "/": handle_help } @bot.received def handle_msg(msg): r = Redis(host='localhost', port=6379, db=0) if check_banned(r, msg.sender): print(f"Message from banned user: {msg.sender}") return known_senders = list(map(lambda x: json.loads(x.decode('utf-8')), r.lrange(USERS_NAME, 0, -1))) sender_info = reduce(lambda x, y: y if (y["hash"] == msg.sender) else x, known_senders, None) from_known_sender = sender_info is not None assert isinstance(msg.content, str) for cmd in COMMANDS: if msg.content.startswith(cmd): return COMMANDS[cmd](msg, r, from_known_sender, known_senders) if not from_known_sender: msg.reply("Please identify with `/login [NICK] [PASSWORD]`") print(f"Received a message from unknown user. Ignoring.") else: print(f"Forwarding message from {sender_info['nick']}") receiver_hashes = map(lambda x: x['hash'], filter( lambda x: x['hash'] != msg.sender, known_senders)) send_to(f"<{sender_info['nick']}> {msg.content}", receiver_hashes) bot.run()