From 0f35acd93ec343b7338fb82bc1e9e17aee986d3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Rudowicz?= Date: Tue, 24 Jul 2018 15:31:04 +0200 Subject: [PATCH] Initial commit --- autobanbot.py | 69 +++++++++++++++++++++++++++++++++ config.json.example | 7 ++++ requirements.txt | 4 ++ tests.py | 94 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 174 insertions(+) create mode 100755 autobanbot.py create mode 100644 config.json.example create mode 100644 requirements.txt create mode 100644 tests.py diff --git a/autobanbot.py b/autobanbot.py new file mode 100755 index 0000000..c2a1137 --- /dev/null +++ b/autobanbot.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python + +from telegram.ext import Updater, MessageHandler +from telegram.ext.filters import Filters +import re +import logging +import json + +SPAM_REGEXES = [] +TELEGRAM_API_KEY = "unknown" + +def load_config(): + with open("config.json") as config_file: + config = json.load(config_file) + for regex in config["regexes"]: + SPAM_REGEXES.append(re.compile(regex, re.I)) + return config + +def handle_spam_message(bot, chat, member, message_id): + logging.warning("SPAM USER JOINED: {} | {} | {} | {}".format( + member.id, + member.first_name, + member.last_name, + member.username + )) + logging.debug("WILL BAN: chat_id: {} | user_id: {} | AND DELETE MSG ID = {}".format( + chat.id, + member.id, + message_id + )) + kick_result = bot.kick_chat_member(chat.id, member.id) + if not kick_result: + logging.error("Bankick success: {}".format(kick_result)) + delete_result = bot.delete_message(chat.id, message_id) + if not delete_result: + logging.error("Message delete success: {}".format(delete_result)) + +def hello(bot, update): + for user in update.message.new_chat_members: + is_spam = False + for regex in SPAM_REGEXES: + if user.first_name is not None and regex.search(user.first_name) is not None: + is_spam = True + if user.last_name is not None and regex.search(user.last_name) is not None: + is_spam = True + if is_spam == True: + break + logging.info("User joined: {} | {} | {} | {} | {}".format( + user.id, + user.first_name, + user.last_name, + user.username, + is_spam + )) + if is_spam: + handle_spam_message(bot, update.effective_chat, user, update.message.message_id) + return True + return False + +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s|%(levelname)s|%(message)s', level=logging.INFO) + logging.info("Starting") + config = load_config() + updater = Updater(config["telegramApiKey"]) + + updater.dispatcher.add_handler(MessageHandler(Filters.status_update.new_chat_members, hello)) + + updater.start_polling() + updater.idle() diff --git a/config.json.example b/config.json.example new file mode 100644 index 0000000..6ec3966 --- /dev/null +++ b/config.json.example @@ -0,0 +1,7 @@ +{ + "telegramApiKey": "PUT YOUR KEY HERE", + "regexes": [ + "ready-made telegram accounts", + "253239090.*473157472" + ] +} diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..a5b7dc5 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +certifi==2018.4.16 +future==0.16.0 +pkg-resources==0.0.0 +python-telegram-bot==10.1.0 diff --git a/tests.py b/tests.py new file mode 100644 index 0000000..1f6c896 --- /dev/null +++ b/tests.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python + +import unittest +from unittest.mock import Mock +import autobanbot + +CHAT_ID = "Chat ID" +USER_ID = "User ID" +MESSAGE_ID = "Message ID" +BOT_SPEC = ["kick_chat_member", "delete_message"] + +class Member: + def __init__(self, user_id, first_name, last_name, username): + self.id = user_id + self.first_name = first_name + self.last_name = last_name + self.username = username + +class Message: + def __init__(self, user_id, first_name, last_name, username): + self.new_chat_members = [Member(user_id, first_name, last_name, username)] + self.message_id = MESSAGE_ID + +class Chat: + def __init__(self): + self.id = CHAT_ID + +class Update: + def __init__(self, user_id, first_name, last_name, username): + self.message = Message(user_id, first_name, last_name, username) + self.effective_chat = Chat() + +class TestAutoBanBot(unittest.TestCase): + def setUp(self): + autobanbot.load_config() + + def banned_join(self, user_id, first_name, last_name, username): + bot = Mock(spec=BOT_SPEC) + self.assertTrue(autobanbot.hello( + bot, Update(user_id, first_name, last_name, username) + )) + bot.kick_chat_member.assert_called_once_with(CHAT_ID, USER_ID) + bot.delete_message.assert_called_once_with(CHAT_ID, MESSAGE_ID) + + def legit_join(self, user_id, first_name, last_name, username): + bot = Mock(spec=BOT_SPEC) + self.assertFalse(autobanbot.hello( + bot, Update(user_id, first_name, last_name, username) + )) + bot.kick_chat_member.assert_not_called() + bot.delete_message.assert_not_called() + + def test_spam_joins(self): + updates = [ + "READY-MADE TELEGRAM ACCOUNTS", + "ready-made TeLeGrAm AcCoUnTs", + "there are ready-made TeLeGrAm AcCoUnTs for sale!", + "253239090 hi hello 473157472" + ] + for update in updates: + with self.subTest(first_name=update): + self.banned_join(USER_ID, update, "", "") + with self.subTest(last_name=update): + self.banned_join(USER_ID, "", update, "") + with self.subTest(last_name=update, first_name=None): + self.banned_join(USER_ID, None, update, "") + with self.subTest(last_name=None, first_name=update): + self.banned_join(USER_ID, update, None, "") + with self.subTest(last_name=update, first_name=update): + self.banned_join(USER_ID, update, update, "") + + def test_non_spam_joins(self): + updates = [ + "telegram account", + "abc", + "ready-made telegram naccounts", + "completely legit name", + "" + ] + for update in updates: + with self.subTest(first_name=update): + self.legit_join(USER_ID, update, "", "") + with self.subTest(last_name=update): + self.legit_join(USER_ID, "", update, "") + with self.subTest(first_name=update, last_name=None): + self.legit_join(USER_ID, update, None, "") + with self.subTest(last_name=update, first_name=None): + self.legit_join(USER_ID, None, update, "") + with self.subTest(last_name=update, first_name=update): + self.legit_join(USER_ID, update, update, "") + +if __name__ == '__main__': + unittest.main() +