From 284311e9d59a7546545fd8a492f6e372566c3de1 Mon Sep 17 00:00:00 2001 From: Michal Rudowicz Date: Sat, 15 Jun 2024 10:03:15 +0200 Subject: [PATCH] Allowlist of chats where the bot should work --- autobanbot.py | 13 +++++++++---- config.json.example | 1 + tests.py | 32 +++++++++++++++++++++----------- 3 files changed, 31 insertions(+), 15 deletions(-) diff --git a/autobanbot.py b/autobanbot.py index eb919ef..8019ece 100755 --- a/autobanbot.py +++ b/autobanbot.py @@ -11,10 +11,17 @@ def load_config(filename: str = "config.json"): with open(filename) as config_file: config = json.load(config_file) retval["telegramApiKey"] = config["telegramApiKey"] + retval["allowedChats"] = config["allowedChats"] retval["regexes"] = list(map(lambda r: re.compile(r, re.I), config["regexes"])) return retval -async def new_msg(update, context, regexes): +async def new_msg(update, context, regexes, allowed_chats): + if update.message is None: + logging.info(f"Got following update: {update}") + return + if update.message.chat.id not in allowed_chats: + logging.info(f"Got update from not allowed chat: {update.message.chat}. Ignoring") + return is_spam = False for regex in regexes: if update.message is not None and \ @@ -22,8 +29,6 @@ async def new_msg(update, context, regexes): regex.search(update.message.text) is not None: is_spam = True break - if update.message is None: - logging.info(f"Got following update: {update}") if is_spam: logging.info(f"Banning {update.message.from_user.name} from {update.message.chat.effective_name}") await update.message.chat.ban_member(update.message.from_user.id) @@ -41,7 +46,7 @@ if __name__ == '__main__': application = Application.builder().token(config["telegramApiKey"]).build() application.add_handler(MessageHandler(filters.ALL, ( - lambda update, context: new_msg(update, context, config['regexes']) + lambda update, context: new_msg(update, context, config['regexes'], config['allowedChats']) ))) application.add_error_handler(handle_error, block=False) diff --git a/config.json.example b/config.json.example index 21c1430..c02db83 100644 --- a/config.json.example +++ b/config.json.example @@ -1,5 +1,6 @@ { "telegramApiKey": "PUT YOUR KEY HERE", + "allowedChats": [123, 456], "regexes": [ "test123", "(?:@|(?:(?:(?:https?://)?t(?:elegram)?)\\.me\\/))(\\w{4,})bot" diff --git a/tests.py b/tests.py index 177e376..87e4580 100644 --- a/tests.py +++ b/tests.py @@ -11,14 +11,14 @@ assert(os.path.exists(fakes_dir)) sys.path.insert(0, fakes_dir) import autobanbot -CHAT_ID = "Chat ID" -USER_ID = "User ID" -MESSAGE_ID = "Message ID" -BOT_SPEC = ["kick_chat_member", "delete_message"] + +ALLOWED_CHAT_ID = 123 +DISALLOWED_CHAT_ID = 789 @dataclass class Chat: + id: int = 0 ban_member = AsyncMock() effective_name = "effective_chat_name" @@ -45,24 +45,27 @@ class Update: message = Message() -def make_update(msg: str) -> Update: +def make_update(msg: str, chat_id: int) -> Update: update = Update() update.message.text = msg + update.message.chat.id = chat_id return update class TestAutoBanBot(unittest.IsolatedAsyncioTestCase): def setUp(self): - self.regexes = autobanbot.load_config("config.json.example")['regexes'] + config = autobanbot.load_config("config.json.example") + self.regexes = config['regexes'] + self.allowed_chats = config['allowedChats'] async def test_not_bannable(self): messages = ["not bannable message", "siematest_123elo", "i am not a bot", "t.me, i'm not a bot"] for msg in messages: with self.subTest(msg=msg): - update = make_update(msg) + update = make_update(msg, ALLOWED_CHAT_ID) update.message.delete.reset_mock() update.message.chat.ban_member.reset_mock() - await autobanbot.new_msg(update, None, self.regexes) + await autobanbot.new_msg(update, None, self.regexes, self.allowed_chats) update.message.delete.assert_not_called() update.message.chat.ban_member.assert_not_called() @@ -75,13 +78,20 @@ class TestAutoBanBot(unittest.IsolatedAsyncioTestCase): "@Hello_BoT" ] for msg in messages: - with self.subTest(msg=msg): - update = make_update(msg) + with self.subTest("Allowed chat causes a ban", message=msg): + update = make_update(msg, ALLOWED_CHAT_ID) update.message.delete.reset_mock() update.message.chat.ban_member.reset_mock() - await autobanbot.new_msg(update, None, self.regexes) + await autobanbot.new_msg(update, None, self.regexes, self.allowed_chats) update.message.delete.assert_called_once() update.message.chat.ban_member.assert_called_once_with(update.message.from_user.id) + with self.subTest("Unknown chat is ignored", message=msg): + update = make_update(msg, DISALLOWED_CHAT_ID) + update.message.delete.reset_mock() + update.message.chat.ban_member.reset_mock() + await autobanbot.new_msg(update, None, self.regexes, self.allowed_chats) + update.message.delete.assert_not_called() + update.message.chat.ban_member.assert_not_called() if __name__ == '__main__':