Compare commits

..

10 Commits

7 changed files with 167 additions and 89 deletions

7
.build.yml Normal file
View File

@ -0,0 +1,7 @@
image: alpine/3.20
packages:
- python3
tasks:
- test: |-
cd auto-ban-bot
python3 tests.py

2
.gitignore vendored
View File

@ -1 +1,3 @@
config.json config.json
venv/
__pycache__/

View File

@ -6,35 +6,46 @@ import re
import logging import logging
import json import json
def load_config(): def load_config(filename: str = "config.json"):
retval = {} retval = {}
with open("config.json") as config_file: with open(filename) as config_file:
config = json.load(config_file) config = json.load(config_file)
retval["telegramApiKey"] = config["telegramApiKey"] retval["telegramApiKey"] = config["telegramApiKey"]
retval["regexes"] = [] retval["allowedChats"] = config["allowedChats"]
for regex in config["regexes"]: retval["regexes"] = list(map(lambda r: re.compile(r, re.I), config["regexes"]))
retval["regexes"].append(re.compile(regex, re.I))
return retval return retval
async def new_msg(update, context, regexes): async def new_msg(update, context, regexes, allowed_chats):
is_spam = False if update.message is None and update.edited_message is None:
logging.info(f"Got following unknown update: {update}")
return
message = update.message
if update.edited_message is not None:
message = update.edited_message
if message is None:
return
if message.chat.id not in allowed_chats:
return
for regex in regexes: for regex in regexes:
if update.message.text is not None and regex.search(update.message.text) is not None: if message.text is not None and regex.search(message.text) is not None:
is_spam = True logging.info(f"Banning {message.from_user.name} from {message.chat.effective_name} for posting a message matching {regex.pattern}")
break await message.chat.ban_member(message.from_user.id)
if is_spam: await message.delete()
logging.info(f"Banning: {update.message}") return
update.message.chat.ban_member(update.message.from_user.id)
update.message.delete() async def handle_error(update, context):
logging.error("Exception while handling an update:", exc_info=context.error)
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s|%(levelname)s|%(message)s', level=logging.INFO) logging.basicConfig(format='%(asctime)s|%(levelname)s|%(message)s', level=logging.INFO)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.info("Starting") logging.info("Starting")
config = load_config() config = load_config()
application = Application.builder().token(config["telegramApiKey"]).build() application = Application.builder().token(config["telegramApiKey"]).build()
application.add_handler(MessageHandler(filters.ALL, ( application.add_handler(MessageHandler(filters.ALL, (
lambda update, context: new_msg(update, context, config['regexes']) lambda update, context: new_msg(update, context, config['regexes'], config['allowedChats'])
))) )))
application.add_error_handler(handle_error, block=False)
application.run_polling(allowed_updates=Update.ALL_TYPES) application.run_polling(allowed_updates=Update.ALL_TYPES)

View File

@ -1,7 +1,8 @@
{ {
"telegramApiKey": "PUT YOUR KEY HERE", "telegramApiKey": "PUT YOUR KEY HERE",
"allowedChats": [123, 456],
"regexes": [ "regexes": [
"ready-made telegram accounts", "test123",
"253239090.*473157472" "(?:@|(?:(?:(?:https?://)?t(?:elegram)?)\\.me\\/))(\\w{4,})bot"
] ]
} }

View File

@ -0,0 +1,4 @@
#!/ust/bin/env python3
class Update:
pass

12
fakes/telegram/ext.py Normal file
View File

@ -0,0 +1,12 @@
#!/usr/bin/env python3
class Application:
pass
class MessageHandler:
pass
class filters:
pass

185
tests.py
View File

@ -1,93 +1,134 @@
#!/usr/bin/env python #!/usr/bin/env python
import unittest import unittest
from unittest.mock import Mock from unittest.mock import AsyncMock
import os
import re
import sys
from dataclasses import dataclass
fakes_dir = os.path.join(os.path.dirname(__file__), 'fakes')
assert(os.path.exists(fakes_dir))
sys.path.insert(0, fakes_dir)
import autobanbot import autobanbot
CHAT_ID = "Chat ID"
USER_ID = "User ID"
MESSAGE_ID = "Message ID"
BOT_SPEC = ["kick_chat_member", "delete_message"]
class Member: ALLOWED_CHAT_ID = 123
def __init__(self, user_id, first_name, last_name, username): DISALLOWED_CHAT_ID = 789
self.id = user_id
self.first_name = first_name
self.last_name = last_name
self.username = username
class Message:
def __init__(self, user_id, first_name, last_name, username):
self.new_chat_members = [Member(user_id, first_name, last_name, username)]
self.message_id = MESSAGE_ID
@dataclass
class Chat: class Chat:
id: int = 0
ban_member = AsyncMock()
effective_name = "effective_chat_name"
class User:
id_count = 0
def __init__(self): def __init__(self):
self.id = CHAT_ID self.id = User.id_count
self.name = "User name"
User.id_count += 1
@dataclass
class Message:
text = "Message text"
delete = AsyncMock()
chat = Chat()
from_user = User()
@dataclass
class Update: class Update:
def __init__(self, user_id, first_name, last_name, username): message = Message()
self.message = Message(user_id, first_name, last_name, username) edited_message = None
self.effective_chat = Chat()
class TestAutoBanBot(unittest.TestCase):
def make_update(msg: str, chat_id: int) -> Update:
update = Update()
update.message.text = msg
update.message.chat.id = chat_id
return update
def make_edited_message(msg: str, chat_id: int) -> Update:
update = Update()
update.message = None
update.edited_message = Message()
update.edited_message.text = msg
update.edited_message.chat.id = chat_id
return update
class TestAutoBanBot(unittest.IsolatedAsyncioTestCase):
def setUp(self): def setUp(self):
self.regexes = autobanbot.load_config()["regexes"] config = autobanbot.load_config("config.json.example")
self.regexes = config['regexes']
self.allowed_chats = config['allowedChats']
def banned_join(self, user_id, first_name, last_name, username): async def test_not_bannable(self):
bot = Mock(spec=BOT_SPEC) messages = ["not bannable message", "siematest_123elo", "i am not a bot", "t.me, i'm not a bot"]
self.assertTrue(autobanbot.hello( for msg in messages:
bot, Update(user_id, first_name, last_name, username), self.regexes with self.subTest("new message", message=msg):
)) update = make_update(msg, ALLOWED_CHAT_ID)
bot.kick_chat_member.assert_called_once_with(CHAT_ID, USER_ID) self.assertIsNone(update.edited_message)
bot.delete_message.assert_called_once_with(CHAT_ID, MESSAGE_ID) update.message.delete.reset_mock()
update.message.chat.ban_member.reset_mock()
await autobanbot.new_msg(update, None, self.regexes, self.allowed_chats)
update.message.delete.assert_not_called()
update.message.chat.ban_member.assert_not_called()
with self.subTest("message edit", message=msg):
update = make_edited_message(msg, ALLOWED_CHAT_ID)
self.assertIsNone(update.message)
update.edited_message.delete.reset_mock()
update.edited_message.chat.ban_member.reset_mock()
await autobanbot.new_msg(update, None, self.regexes, self.allowed_chats)
update.edited_message.delete.assert_not_called()
update.edited_message.chat.ban_member.assert_not_called()
def legit_join(self, user_id, first_name, last_name, username): async def test_bannable(self):
bot = Mock(spec=BOT_SPEC) messages = [
self.assertFalse(autobanbot.hello( "hitest123hello",
bot, Update(user_id, first_name, last_name, username), self.regexes "HiTeSt123HeLlO",
)) "t.me/whatever_bot?not bannable message",
bot.kick_chat_member.assert_not_called() "https://www.t.me/ohhaaaaaaaibot/siematest123elo",
bot.delete_message.assert_not_called() "@Hello_BoT"
def test_spam_joins(self):
updates = [
"READY-MADE TELEGRAM ACCOUNTS",
"ready-made TeLeGrAm AcCoUnTs",
"there are ready-made TeLeGrAm AcCoUnTs for sale!",
"253239090 hi hello 473157472"
] ]
for update in updates: for msg in messages:
with self.subTest(first_name=update): with self.subTest("Allowed chat causes a ban", message=msg):
self.banned_join(USER_ID, update, "", "") update = make_update(msg, ALLOWED_CHAT_ID)
with self.subTest(last_name=update): self.assertIsNone(update.edited_message)
self.banned_join(USER_ID, "", update, "") update.message.delete.reset_mock()
with self.subTest(last_name=update, first_name=None): update.message.chat.ban_member.reset_mock()
self.banned_join(USER_ID, None, update, "") await autobanbot.new_msg(update, None, self.regexes, self.allowed_chats)
with self.subTest(last_name=None, first_name=update): update.message.delete.assert_called_once()
self.banned_join(USER_ID, update, None, "") update.message.chat.ban_member.assert_called_once_with(update.message.from_user.id)
with self.subTest(last_name=update, first_name=update): with self.subTest("Unknown chat is ignored", message=msg):
self.banned_join(USER_ID, update, update, "") update = make_update(msg, DISALLOWED_CHAT_ID)
self.assertIsNone(update.edited_message)
update.message.delete.reset_mock()
update.message.chat.ban_member.reset_mock()
await autobanbot.new_msg(update, None, self.regexes, self.allowed_chats)
update.message.delete.assert_not_called()
update.message.chat.ban_member.assert_not_called()
with self.subTest("Edit in allowed chat causes a ban", message=msg):
update = make_edited_message(msg, ALLOWED_CHAT_ID)
self.assertIsNone(update.message)
update.edited_message.delete.reset_mock()
update.edited_message.chat.ban_member.reset_mock()
await autobanbot.new_msg(update, None, self.regexes, self.allowed_chats)
update.edited_message.delete.assert_called_once()
update.edited_message.chat.ban_member.assert_called_once_with(update.edited_message.from_user.id)
with self.subTest("Edit in unknown chat is ignored", message=msg):
update = make_edited_message(msg, DISALLOWED_CHAT_ID)
self.assertIsNone(update.message)
update.edited_message.delete.reset_mock()
update.edited_message.chat.ban_member.reset_mock()
await autobanbot.new_msg(update, None, self.regexes, self.allowed_chats)
update.edited_message.delete.assert_not_called()
update.edited_message.chat.ban_member.assert_not_called()
def test_non_spam_joins(self):
updates = [
"telegram account",
"abc",
"ready-made telegram naccounts",
"completely legit name",
""
]
for update in updates:
with self.subTest(first_name=update):
self.legit_join(USER_ID, update, "", "")
with self.subTest(last_name=update):
self.legit_join(USER_ID, "", update, "")
with self.subTest(first_name=update, last_name=None):
self.legit_join(USER_ID, update, None, "")
with self.subTest(last_name=update, first_name=None):
self.legit_join(USER_ID, None, update, "")
with self.subTest(last_name=update, first_name=update):
self.legit_join(USER_ID, update, update, "")
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()