nomadnet-website/groupchatbot/bot.py

130 lines
4.2 KiB
Python

from lxmfbot import LXMFBot
from redis import Redis
from time import monotonic
from hashlib import sha256
import json
from functools import reduce
from typing import Iterable
bot = LXMFBot("HSWro Conference Bot")
PASS = '34a77e61000b2ba1f56201332ef64d93f9cdc60e63ab48bc255102586ef7e592'
WRONG_LOGIN_BAN_DURATION = 30
USERS_NAME = "hswroconference:users"
BANNED_NAME = "hswroconference:banned"
def check_password(password: str) -> bool:
gotten = sha256(f"hswrosalt{password}".encode(
'utf-8'), usedforsecurity=True).hexdigest()
return PASS == gotten
def ban_sender(r, userhash, until):
r.rpush(BANNED_NAME, str(json.dumps({
"hash": userhash,
"until": str(until),
})))
def check_banned(r, userhash):
for raw in r.lrange(BANNED_NAME, 0, -1):
m = json.loads(raw.decode('utf-8'))
if m['hash'] == userhash:
if monotonic() > float(m['until']):
r.lrem(BANNED_NAME, 0, raw)
print(f"Ban expired for {userhash}, unbanning")
return False
return True
def check_name(username: str) -> bool:
if len(username) >= 16:
return False
return all(map(lambda c: c.isalnum(), username))
def handle_login(msg, r, from_known_sender, known_senders: Iterable):
def login_usage():
msg.reply("Usage: `/login [NICK] [PASSWORD]`")
parts = msg.content.split()
if len(parts) != 3:
login_usage()
return
assert parts[0] == "/login"
if from_known_sender:
msg.reply("You are already logged in.")
return
if not check_password(parts[2]):
ban_sender(r, msg.sender, monotonic() + WRONG_LOGIN_BAN_DURATION)
return
if not check_name(parts[1]):
msg.reply("Name can only consist of alphanumeric characters.")
return
r.rpush(USERS_NAME, str(json.dumps({
"hash": msg.sender,
"nick": parts[1],
})))
msg.reply(f"Logged in as {parts[1]}.")
send_to(f"{parts[1]} joined the chat.", map(
lambda x: x['hash'], known_senders))
def handle_logout(msg, r, from_known_sender, known_senders: Iterable):
if not from_known_sender:
return handle_help(msg)
for raw in r.lrange(USERS_NAME, 0, -1):
parsed = json.loads(raw.decode('utf-8'))
if msg.sender == parsed['hash']:
r.lrem(USERS_NAME, 0, raw)
msg.reply(f"User {parsed['nick']} logged out.")
send_to(f"{parsed['nick']} left the chat.", map(
lambda x: x['hash'],
filter(lambda x: x['hash'] != parsed['hash'], known_senders)))
return
def handle_help(msg):
msg.reply("""Supported commands:
/login [NICK] [PASSWORD] - Logs user in. Partyline messages will be sent.
/logout - Logs user out. Partyline messages will stop.
/help - Displays this message.""")
def send_to(text: str, receiver_hashes: Iterable[str]):
for receiver in receiver_hashes:
bot.send(receiver, text)
@bot.received
def handle_msg(msg):
r = Redis(host='localhost', port=6379, db=0)
if check_banned(r, msg.sender):
print(f"Message from banned user: {msg.sender}")
return
known_senders = list(map(lambda x: json.loads(x.decode('utf-8')),
r.lrange(USERS_NAME, 0, -1)))
sender_info = reduce(lambda x, y: y if (y["hash"] == msg.sender) else x,
known_senders, None)
from_known_sender = sender_info is not None
assert isinstance(msg.content, str)
if msg.content.startswith("/login"):
return handle_login(msg, r, from_known_sender, known_senders)
if msg.content.startswith("/logout"):
return handle_logout(msg, r, from_known_sender, known_senders)
if msg.content.startswith("/help"):
return handle_help(msg)
if msg.content.startswith("/"):
return handle_help(msg)
if not from_known_sender:
msg.reply("Please identify with `/login [NICK] [PASSWORD]`")
print(f"Received a message from unknown user. Ignoring.")
else:
print(f"Forwarding message from {sender_info['nick']}")
receiver_hashes = map(lambda x: x['hash'], filter(
lambda x: x['hash'] != msg.sender, known_senders))
send_to(f"<{sender_info['nick']}> {msg.content}", receiver_hashes)
bot.run()