82 lines
2.5 KiB
Python
82 lines
2.5 KiB
Python
from lxmfbot import LXMFBot
|
|
from redis import Redis
|
|
from time import monotonic
|
|
from hashlib import sha256
|
|
import json
|
|
from functools import reduce
|
|
|
|
bot = LXMFBot("HSWro Conference Bot")
|
|
PASS = '34a77e61000b2ba1f56201332ef64d93f9cdc60e63ab48bc255102586ef7e592'
|
|
WRONG_LOGIN_BAN_DURATION = 60
|
|
|
|
|
|
def check_password(password: str) -> bool:
|
|
gotten = sha256(f"hswrosalt{password}".encode(
|
|
'utf-8'), usedforsecurity=True).hexdigest()
|
|
return PASS == gotten
|
|
|
|
|
|
def ban_sender(r, userhash, until):
|
|
r.rpush("hswroconference:banned", str(json.dumps({
|
|
"hash": userhash,
|
|
"until": str(until),
|
|
})))
|
|
|
|
|
|
def check_banned(r, userhash):
|
|
for raw in r.lrange("hswroconference:banned", 0, -1):
|
|
m = json.loads(raw.decode('utf-8'))
|
|
if m['hash'] == userhash:
|
|
if monotonic() > float(m['until']):
|
|
r.lrem("hswroconference:banned", 0, raw)
|
|
print(f"Ban expired for {userhash}, unbanning")
|
|
return False
|
|
return True
|
|
|
|
|
|
def handle_login(msg, r, from_known_sender):
|
|
def login_usage():
|
|
msg.reply("Usage: `/login [NICK] [PASSWORD]`")
|
|
parts = msg.content.split()
|
|
if len(parts) != 3:
|
|
login_usage()
|
|
return
|
|
assert parts[0] == "/login"
|
|
if not check_password(parts[2]):
|
|
ban_sender(r, msg.sender, monotonic() + WRONG_LOGIN_BAN_DURATION)
|
|
return
|
|
r.rpush("hswroconference:users", str(json.dumps({
|
|
"hash": msg.sender,
|
|
"nick": parts[1],
|
|
})))
|
|
msg.reply(f"Logged in as {parts[1]}.")
|
|
|
|
|
|
@bot.received
|
|
def echo_msg(msg):
|
|
r = Redis(host='localhost', port=6379, db=0)
|
|
if check_banned(r, msg.sender):
|
|
print(f"Message from banned user: {msg.sender}")
|
|
return
|
|
known_senders = list(map(lambda x: json.loads(x.decode('utf-8')),
|
|
r.lrange("hswroconference:users", 0, -1)))
|
|
sender_info = reduce(lambda x, y: y if (y["hash"] == msg.sender) else x,
|
|
known_senders, None)
|
|
from_known_sender = sender_info is not None
|
|
assert isinstance(msg.content, str)
|
|
if msg.content.startswith("/login"):
|
|
handle_login(msg, r, from_known_sender)
|
|
return
|
|
|
|
if not from_known_sender:
|
|
msg.reply("Please identify with `/login [NICK] [PASSWORD]`")
|
|
else:
|
|
print(f"Forwarding message from {sender_info['nick']}")
|
|
for r in known_senders:
|
|
if r["hash"] == msg.sender:
|
|
continue
|
|
bot.send(r["hash"], f"<{sender_info['nick']}> {msg.content}")
|
|
|
|
|
|
bot.run()
|