Compare commits
10 Commits
51f34324fb
...
9a52cdf3cf
Author | SHA1 | Date |
---|---|---|
Michał Rudowicz | 9a52cdf3cf | |
Michał Rudowicz | 284311e9d5 | |
Michał Rudowicz | a7299d65ad | |
Michał Rudowicz | 2d4bf992de | |
Michał Rudowicz | 1d111f003f | |
Michał Rudowicz | c9783eefe0 | |
Michał Rudowicz | e343b54dcf | |
Michał Rudowicz | 779292196e | |
Michał Rudowicz | 4e2a2d3f28 | |
Michał Rudowicz | 0197c70d80 |
|
@ -0,0 +1,7 @@
|
||||||
|
image: alpine/3.20
|
||||||
|
packages:
|
||||||
|
- python3
|
||||||
|
tasks:
|
||||||
|
- test: |-
|
||||||
|
cd auto-ban-bot
|
||||||
|
python3 tests.py
|
|
@ -1 +1,3 @@
|
||||||
config.json
|
config.json
|
||||||
|
venv/
|
||||||
|
__pycache__/
|
||||||
|
|
|
@ -6,35 +6,46 @@ import re
|
||||||
import logging
|
import logging
|
||||||
import json
|
import json
|
||||||
|
|
||||||
def load_config():
|
def load_config(filename: str = "config.json"):
|
||||||
retval = {}
|
retval = {}
|
||||||
with open("config.json") as config_file:
|
with open(filename) as config_file:
|
||||||
config = json.load(config_file)
|
config = json.load(config_file)
|
||||||
retval["telegramApiKey"] = config["telegramApiKey"]
|
retval["telegramApiKey"] = config["telegramApiKey"]
|
||||||
retval["regexes"] = []
|
retval["allowedChats"] = config["allowedChats"]
|
||||||
for regex in config["regexes"]:
|
retval["regexes"] = list(map(lambda r: re.compile(r, re.I), config["regexes"]))
|
||||||
retval["regexes"].append(re.compile(regex, re.I))
|
|
||||||
return retval
|
return retval
|
||||||
|
|
||||||
async def new_msg(update, context, regexes):
|
async def new_msg(update, context, regexes, allowed_chats):
|
||||||
is_spam = False
|
if update.message is None and update.edited_message is None:
|
||||||
|
logging.info(f"Got following unknown update: {update}")
|
||||||
|
return
|
||||||
|
message = update.message
|
||||||
|
if update.edited_message is not None:
|
||||||
|
message = update.edited_message
|
||||||
|
if message is None:
|
||||||
|
return
|
||||||
|
if message.chat.id not in allowed_chats:
|
||||||
|
return
|
||||||
for regex in regexes:
|
for regex in regexes:
|
||||||
if update.message.text is not None and regex.search(update.message.text) is not None:
|
if message.text is not None and regex.search(message.text) is not None:
|
||||||
is_spam = True
|
logging.info(f"Banning {message.from_user.name} from {message.chat.effective_name} for posting a message matching {regex.pattern}")
|
||||||
break
|
await message.chat.ban_member(message.from_user.id)
|
||||||
if is_spam:
|
await message.delete()
|
||||||
logging.info(f"Banning: {update.message}")
|
return
|
||||||
update.message.chat.ban_member(update.message.from_user.id)
|
|
||||||
update.message.delete()
|
async def handle_error(update, context):
|
||||||
|
logging.error("Exception while handling an update:", exc_info=context.error)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
logging.basicConfig(format='%(asctime)s|%(levelname)s|%(message)s', level=logging.INFO)
|
logging.basicConfig(format='%(asctime)s|%(levelname)s|%(message)s', level=logging.INFO)
|
||||||
|
logging.getLogger("httpx").setLevel(logging.WARNING)
|
||||||
logging.info("Starting")
|
logging.info("Starting")
|
||||||
config = load_config()
|
config = load_config()
|
||||||
application = Application.builder().token(config["telegramApiKey"]).build()
|
application = Application.builder().token(config["telegramApiKey"]).build()
|
||||||
|
|
||||||
application.add_handler(MessageHandler(filters.ALL, (
|
application.add_handler(MessageHandler(filters.ALL, (
|
||||||
lambda update, context: new_msg(update, context, config['regexes'])
|
lambda update, context: new_msg(update, context, config['regexes'], config['allowedChats'])
|
||||||
)))
|
)))
|
||||||
|
application.add_error_handler(handle_error, block=False)
|
||||||
|
|
||||||
application.run_polling(allowed_updates=Update.ALL_TYPES)
|
application.run_polling(allowed_updates=Update.ALL_TYPES)
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
{
|
{
|
||||||
"telegramApiKey": "PUT YOUR KEY HERE",
|
"telegramApiKey": "PUT YOUR KEY HERE",
|
||||||
|
"allowedChats": [123, 456],
|
||||||
"regexes": [
|
"regexes": [
|
||||||
"ready-made telegram accounts",
|
"test123",
|
||||||
"253239090.*473157472"
|
"(?:@|(?:(?:(?:https?://)?t(?:elegram)?)\\.me\\/))(\\w{4,})bot"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
#!/ust/bin/env python3
|
||||||
|
|
||||||
|
class Update:
|
||||||
|
pass
|
|
@ -0,0 +1,12 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
class Application:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class MessageHandler:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class filters:
|
||||||
|
pass
|
185
tests.py
185
tests.py
|
@ -1,93 +1,134 @@
|
||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
from unittest.mock import Mock
|
from unittest.mock import AsyncMock
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
from dataclasses import dataclass
|
||||||
|
fakes_dir = os.path.join(os.path.dirname(__file__), 'fakes')
|
||||||
|
assert(os.path.exists(fakes_dir))
|
||||||
|
sys.path.insert(0, fakes_dir)
|
||||||
import autobanbot
|
import autobanbot
|
||||||
|
|
||||||
CHAT_ID = "Chat ID"
|
|
||||||
USER_ID = "User ID"
|
|
||||||
MESSAGE_ID = "Message ID"
|
|
||||||
BOT_SPEC = ["kick_chat_member", "delete_message"]
|
|
||||||
|
|
||||||
class Member:
|
ALLOWED_CHAT_ID = 123
|
||||||
def __init__(self, user_id, first_name, last_name, username):
|
DISALLOWED_CHAT_ID = 789
|
||||||
self.id = user_id
|
|
||||||
self.first_name = first_name
|
|
||||||
self.last_name = last_name
|
|
||||||
self.username = username
|
|
||||||
|
|
||||||
class Message:
|
|
||||||
def __init__(self, user_id, first_name, last_name, username):
|
|
||||||
self.new_chat_members = [Member(user_id, first_name, last_name, username)]
|
|
||||||
self.message_id = MESSAGE_ID
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
class Chat:
|
class Chat:
|
||||||
|
id: int = 0
|
||||||
|
ban_member = AsyncMock()
|
||||||
|
effective_name = "effective_chat_name"
|
||||||
|
|
||||||
|
|
||||||
|
class User:
|
||||||
|
id_count = 0
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.id = CHAT_ID
|
self.id = User.id_count
|
||||||
|
self.name = "User name"
|
||||||
|
User.id_count += 1
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Message:
|
||||||
|
text = "Message text"
|
||||||
|
delete = AsyncMock()
|
||||||
|
chat = Chat()
|
||||||
|
from_user = User()
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
class Update:
|
class Update:
|
||||||
def __init__(self, user_id, first_name, last_name, username):
|
message = Message()
|
||||||
self.message = Message(user_id, first_name, last_name, username)
|
edited_message = None
|
||||||
self.effective_chat = Chat()
|
|
||||||
|
|
||||||
class TestAutoBanBot(unittest.TestCase):
|
|
||||||
|
def make_update(msg: str, chat_id: int) -> Update:
|
||||||
|
update = Update()
|
||||||
|
update.message.text = msg
|
||||||
|
update.message.chat.id = chat_id
|
||||||
|
return update
|
||||||
|
|
||||||
|
def make_edited_message(msg: str, chat_id: int) -> Update:
|
||||||
|
update = Update()
|
||||||
|
update.message = None
|
||||||
|
update.edited_message = Message()
|
||||||
|
update.edited_message.text = msg
|
||||||
|
update.edited_message.chat.id = chat_id
|
||||||
|
return update
|
||||||
|
|
||||||
|
|
||||||
|
class TestAutoBanBot(unittest.IsolatedAsyncioTestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.regexes = autobanbot.load_config()["regexes"]
|
config = autobanbot.load_config("config.json.example")
|
||||||
|
self.regexes = config['regexes']
|
||||||
|
self.allowed_chats = config['allowedChats']
|
||||||
|
|
||||||
def banned_join(self, user_id, first_name, last_name, username):
|
async def test_not_bannable(self):
|
||||||
bot = Mock(spec=BOT_SPEC)
|
messages = ["not bannable message", "siematest_123elo", "i am not a bot", "t.me, i'm not a bot"]
|
||||||
self.assertTrue(autobanbot.hello(
|
for msg in messages:
|
||||||
bot, Update(user_id, first_name, last_name, username), self.regexes
|
with self.subTest("new message", message=msg):
|
||||||
))
|
update = make_update(msg, ALLOWED_CHAT_ID)
|
||||||
bot.kick_chat_member.assert_called_once_with(CHAT_ID, USER_ID)
|
self.assertIsNone(update.edited_message)
|
||||||
bot.delete_message.assert_called_once_with(CHAT_ID, MESSAGE_ID)
|
update.message.delete.reset_mock()
|
||||||
|
update.message.chat.ban_member.reset_mock()
|
||||||
|
await autobanbot.new_msg(update, None, self.regexes, self.allowed_chats)
|
||||||
|
update.message.delete.assert_not_called()
|
||||||
|
update.message.chat.ban_member.assert_not_called()
|
||||||
|
with self.subTest("message edit", message=msg):
|
||||||
|
update = make_edited_message(msg, ALLOWED_CHAT_ID)
|
||||||
|
self.assertIsNone(update.message)
|
||||||
|
update.edited_message.delete.reset_mock()
|
||||||
|
update.edited_message.chat.ban_member.reset_mock()
|
||||||
|
await autobanbot.new_msg(update, None, self.regexes, self.allowed_chats)
|
||||||
|
update.edited_message.delete.assert_not_called()
|
||||||
|
update.edited_message.chat.ban_member.assert_not_called()
|
||||||
|
|
||||||
def legit_join(self, user_id, first_name, last_name, username):
|
async def test_bannable(self):
|
||||||
bot = Mock(spec=BOT_SPEC)
|
messages = [
|
||||||
self.assertFalse(autobanbot.hello(
|
"hitest123hello",
|
||||||
bot, Update(user_id, first_name, last_name, username), self.regexes
|
"HiTeSt123HeLlO",
|
||||||
))
|
"t.me/whatever_bot?not bannable message",
|
||||||
bot.kick_chat_member.assert_not_called()
|
"https://www.t.me/ohhaaaaaaaibot/siematest123elo",
|
||||||
bot.delete_message.assert_not_called()
|
"@Hello_BoT"
|
||||||
|
|
||||||
def test_spam_joins(self):
|
|
||||||
updates = [
|
|
||||||
"READY-MADE TELEGRAM ACCOUNTS",
|
|
||||||
"ready-made TeLeGrAm AcCoUnTs",
|
|
||||||
"there are ready-made TeLeGrAm AcCoUnTs for sale!",
|
|
||||||
"253239090 hi hello 473157472"
|
|
||||||
]
|
]
|
||||||
for update in updates:
|
for msg in messages:
|
||||||
with self.subTest(first_name=update):
|
with self.subTest("Allowed chat causes a ban", message=msg):
|
||||||
self.banned_join(USER_ID, update, "", "")
|
update = make_update(msg, ALLOWED_CHAT_ID)
|
||||||
with self.subTest(last_name=update):
|
self.assertIsNone(update.edited_message)
|
||||||
self.banned_join(USER_ID, "", update, "")
|
update.message.delete.reset_mock()
|
||||||
with self.subTest(last_name=update, first_name=None):
|
update.message.chat.ban_member.reset_mock()
|
||||||
self.banned_join(USER_ID, None, update, "")
|
await autobanbot.new_msg(update, None, self.regexes, self.allowed_chats)
|
||||||
with self.subTest(last_name=None, first_name=update):
|
update.message.delete.assert_called_once()
|
||||||
self.banned_join(USER_ID, update, None, "")
|
update.message.chat.ban_member.assert_called_once_with(update.message.from_user.id)
|
||||||
with self.subTest(last_name=update, first_name=update):
|
with self.subTest("Unknown chat is ignored", message=msg):
|
||||||
self.banned_join(USER_ID, update, update, "")
|
update = make_update(msg, DISALLOWED_CHAT_ID)
|
||||||
|
self.assertIsNone(update.edited_message)
|
||||||
|
update.message.delete.reset_mock()
|
||||||
|
update.message.chat.ban_member.reset_mock()
|
||||||
|
await autobanbot.new_msg(update, None, self.regexes, self.allowed_chats)
|
||||||
|
update.message.delete.assert_not_called()
|
||||||
|
update.message.chat.ban_member.assert_not_called()
|
||||||
|
with self.subTest("Edit in allowed chat causes a ban", message=msg):
|
||||||
|
update = make_edited_message(msg, ALLOWED_CHAT_ID)
|
||||||
|
self.assertIsNone(update.message)
|
||||||
|
update.edited_message.delete.reset_mock()
|
||||||
|
update.edited_message.chat.ban_member.reset_mock()
|
||||||
|
await autobanbot.new_msg(update, None, self.regexes, self.allowed_chats)
|
||||||
|
update.edited_message.delete.assert_called_once()
|
||||||
|
update.edited_message.chat.ban_member.assert_called_once_with(update.edited_message.from_user.id)
|
||||||
|
with self.subTest("Edit in unknown chat is ignored", message=msg):
|
||||||
|
update = make_edited_message(msg, DISALLOWED_CHAT_ID)
|
||||||
|
self.assertIsNone(update.message)
|
||||||
|
update.edited_message.delete.reset_mock()
|
||||||
|
update.edited_message.chat.ban_member.reset_mock()
|
||||||
|
await autobanbot.new_msg(update, None, self.regexes, self.allowed_chats)
|
||||||
|
update.edited_message.delete.assert_not_called()
|
||||||
|
update.edited_message.chat.ban_member.assert_not_called()
|
||||||
|
|
||||||
def test_non_spam_joins(self):
|
|
||||||
updates = [
|
|
||||||
"telegram account",
|
|
||||||
"abc",
|
|
||||||
"ready-made telegram naccounts",
|
|
||||||
"completely legit name",
|
|
||||||
""
|
|
||||||
]
|
|
||||||
for update in updates:
|
|
||||||
with self.subTest(first_name=update):
|
|
||||||
self.legit_join(USER_ID, update, "", "")
|
|
||||||
with self.subTest(last_name=update):
|
|
||||||
self.legit_join(USER_ID, "", update, "")
|
|
||||||
with self.subTest(first_name=update, last_name=None):
|
|
||||||
self.legit_join(USER_ID, update, None, "")
|
|
||||||
with self.subTest(last_name=update, first_name=None):
|
|
||||||
self.legit_join(USER_ID, None, update, "")
|
|
||||||
with self.subTest(last_name=update, first_name=update):
|
|
||||||
self.legit_join(USER_ID, update, update, "")
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
Loading…
Reference in New Issue