Allowlist of chats where the bot should work

This commit is contained in:
Michał Rudowicz 2024-06-15 10:03:15 +02:00
parent a7299d65ad
commit 284311e9d5
3 changed files with 31 additions and 15 deletions

View File

@ -11,10 +11,17 @@ def load_config(filename: str = "config.json"):
with open(filename) as config_file: with open(filename) as config_file:
config = json.load(config_file) config = json.load(config_file)
retval["telegramApiKey"] = config["telegramApiKey"] retval["telegramApiKey"] = config["telegramApiKey"]
retval["allowedChats"] = config["allowedChats"]
retval["regexes"] = list(map(lambda r: re.compile(r, re.I), config["regexes"])) retval["regexes"] = list(map(lambda r: re.compile(r, re.I), config["regexes"]))
return retval return retval
async def new_msg(update, context, regexes): async def new_msg(update, context, regexes, allowed_chats):
if update.message is None:
logging.info(f"Got following update: {update}")
return
if update.message.chat.id not in allowed_chats:
logging.info(f"Got update from not allowed chat: {update.message.chat}. Ignoring")
return
is_spam = False is_spam = False
for regex in regexes: for regex in regexes:
if update.message is not None and \ if update.message is not None and \
@ -22,8 +29,6 @@ async def new_msg(update, context, regexes):
regex.search(update.message.text) is not None: regex.search(update.message.text) is not None:
is_spam = True is_spam = True
break break
if update.message is None:
logging.info(f"Got following update: {update}")
if is_spam: if is_spam:
logging.info(f"Banning {update.message.from_user.name} from {update.message.chat.effective_name}") logging.info(f"Banning {update.message.from_user.name} from {update.message.chat.effective_name}")
await update.message.chat.ban_member(update.message.from_user.id) await update.message.chat.ban_member(update.message.from_user.id)
@ -41,7 +46,7 @@ if __name__ == '__main__':
application = Application.builder().token(config["telegramApiKey"]).build() application = Application.builder().token(config["telegramApiKey"]).build()
application.add_handler(MessageHandler(filters.ALL, ( application.add_handler(MessageHandler(filters.ALL, (
lambda update, context: new_msg(update, context, config['regexes']) lambda update, context: new_msg(update, context, config['regexes'], config['allowedChats'])
))) )))
application.add_error_handler(handle_error, block=False) application.add_error_handler(handle_error, block=False)

View File

@ -1,5 +1,6 @@
{ {
"telegramApiKey": "PUT YOUR KEY HERE", "telegramApiKey": "PUT YOUR KEY HERE",
"allowedChats": [123, 456],
"regexes": [ "regexes": [
"test123", "test123",
"(?:@|(?:(?:(?:https?://)?t(?:elegram)?)\\.me\\/))(\\w{4,})bot" "(?:@|(?:(?:(?:https?://)?t(?:elegram)?)\\.me\\/))(\\w{4,})bot"

View File

@ -11,14 +11,14 @@ assert(os.path.exists(fakes_dir))
sys.path.insert(0, fakes_dir) sys.path.insert(0, fakes_dir)
import autobanbot import autobanbot
CHAT_ID = "Chat ID"
USER_ID = "User ID" ALLOWED_CHAT_ID = 123
MESSAGE_ID = "Message ID" DISALLOWED_CHAT_ID = 789
BOT_SPEC = ["kick_chat_member", "delete_message"]
@dataclass @dataclass
class Chat: class Chat:
id: int = 0
ban_member = AsyncMock() ban_member = AsyncMock()
effective_name = "effective_chat_name" effective_name = "effective_chat_name"
@ -45,24 +45,27 @@ class Update:
message = Message() message = Message()
def make_update(msg: str) -> Update: def make_update(msg: str, chat_id: int) -> Update:
update = Update() update = Update()
update.message.text = msg update.message.text = msg
update.message.chat.id = chat_id
return update return update
class TestAutoBanBot(unittest.IsolatedAsyncioTestCase): class TestAutoBanBot(unittest.IsolatedAsyncioTestCase):
def setUp(self): def setUp(self):
self.regexes = autobanbot.load_config("config.json.example")['regexes'] config = autobanbot.load_config("config.json.example")
self.regexes = config['regexes']
self.allowed_chats = config['allowedChats']
async def test_not_bannable(self): async def test_not_bannable(self):
messages = ["not bannable message", "siematest_123elo", "i am not a bot", "t.me, i'm not a bot"] messages = ["not bannable message", "siematest_123elo", "i am not a bot", "t.me, i'm not a bot"]
for msg in messages: for msg in messages:
with self.subTest(msg=msg): with self.subTest(msg=msg):
update = make_update(msg) update = make_update(msg, ALLOWED_CHAT_ID)
update.message.delete.reset_mock() update.message.delete.reset_mock()
update.message.chat.ban_member.reset_mock() update.message.chat.ban_member.reset_mock()
await autobanbot.new_msg(update, None, self.regexes) await autobanbot.new_msg(update, None, self.regexes, self.allowed_chats)
update.message.delete.assert_not_called() update.message.delete.assert_not_called()
update.message.chat.ban_member.assert_not_called() update.message.chat.ban_member.assert_not_called()
@ -75,13 +78,20 @@ class TestAutoBanBot(unittest.IsolatedAsyncioTestCase):
"@Hello_BoT" "@Hello_BoT"
] ]
for msg in messages: for msg in messages:
with self.subTest(msg=msg): with self.subTest("Allowed chat causes a ban", message=msg):
update = make_update(msg) update = make_update(msg, ALLOWED_CHAT_ID)
update.message.delete.reset_mock() update.message.delete.reset_mock()
update.message.chat.ban_member.reset_mock() update.message.chat.ban_member.reset_mock()
await autobanbot.new_msg(update, None, self.regexes) await autobanbot.new_msg(update, None, self.regexes, self.allowed_chats)
update.message.delete.assert_called_once() update.message.delete.assert_called_once()
update.message.chat.ban_member.assert_called_once_with(update.message.from_user.id) update.message.chat.ban_member.assert_called_once_with(update.message.from_user.id)
with self.subTest("Unknown chat is ignored", message=msg):
update = make_update(msg, DISALLOWED_CHAT_ID)
update.message.delete.reset_mock()
update.message.chat.ban_member.reset_mock()
await autobanbot.new_msg(update, None, self.regexes, self.allowed_chats)
update.message.delete.assert_not_called()
update.message.chat.ban_member.assert_not_called()
if __name__ == '__main__': if __name__ == '__main__':