Initial commit
This commit is contained in:
commit
0f35acd93e
|
@ -0,0 +1,69 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
from telegram.ext import Updater, MessageHandler
|
||||
from telegram.ext.filters import Filters
|
||||
import re
|
||||
import logging
|
||||
import json
|
||||
|
||||
SPAM_REGEXES = []
|
||||
TELEGRAM_API_KEY = "unknown"
|
||||
|
||||
def load_config():
|
||||
with open("config.json") as config_file:
|
||||
config = json.load(config_file)
|
||||
for regex in config["regexes"]:
|
||||
SPAM_REGEXES.append(re.compile(regex, re.I))
|
||||
return config
|
||||
|
||||
def handle_spam_message(bot, chat, member, message_id):
|
||||
logging.warning("SPAM USER JOINED: {} | {} | {} | {}".format(
|
||||
member.id,
|
||||
member.first_name,
|
||||
member.last_name,
|
||||
member.username
|
||||
))
|
||||
logging.debug("WILL BAN: chat_id: {} | user_id: {} | AND DELETE MSG ID = {}".format(
|
||||
chat.id,
|
||||
member.id,
|
||||
message_id
|
||||
))
|
||||
kick_result = bot.kick_chat_member(chat.id, member.id)
|
||||
if not kick_result:
|
||||
logging.error("Bankick success: {}".format(kick_result))
|
||||
delete_result = bot.delete_message(chat.id, message_id)
|
||||
if not delete_result:
|
||||
logging.error("Message delete success: {}".format(delete_result))
|
||||
|
||||
def hello(bot, update):
|
||||
for user in update.message.new_chat_members:
|
||||
is_spam = False
|
||||
for regex in SPAM_REGEXES:
|
||||
if user.first_name is not None and regex.search(user.first_name) is not None:
|
||||
is_spam = True
|
||||
if user.last_name is not None and regex.search(user.last_name) is not None:
|
||||
is_spam = True
|
||||
if is_spam == True:
|
||||
break
|
||||
logging.info("User joined: {} | {} | {} | {} | {}".format(
|
||||
user.id,
|
||||
user.first_name,
|
||||
user.last_name,
|
||||
user.username,
|
||||
is_spam
|
||||
))
|
||||
if is_spam:
|
||||
handle_spam_message(bot, update.effective_chat, user, update.message.message_id)
|
||||
return True
|
||||
return False
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(format='%(asctime)s|%(levelname)s|%(message)s', level=logging.INFO)
|
||||
logging.info("Starting")
|
||||
config = load_config()
|
||||
updater = Updater(config["telegramApiKey"])
|
||||
|
||||
updater.dispatcher.add_handler(MessageHandler(Filters.status_update.new_chat_members, hello))
|
||||
|
||||
updater.start_polling()
|
||||
updater.idle()
|
|
@ -0,0 +1,7 @@
|
|||
{
|
||||
"telegramApiKey": "PUT YOUR KEY HERE",
|
||||
"regexes": [
|
||||
"ready-made telegram accounts",
|
||||
"253239090.*473157472"
|
||||
]
|
||||
}
|
|
@ -0,0 +1,4 @@
|
|||
certifi==2018.4.16
|
||||
future==0.16.0
|
||||
pkg-resources==0.0.0
|
||||
python-telegram-bot==10.1.0
|
|
@ -0,0 +1,94 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
import unittest
|
||||
from unittest.mock import Mock
|
||||
import autobanbot
|
||||
|
||||
CHAT_ID = "Chat ID"
|
||||
USER_ID = "User ID"
|
||||
MESSAGE_ID = "Message ID"
|
||||
BOT_SPEC = ["kick_chat_member", "delete_message"]
|
||||
|
||||
class Member:
|
||||
def __init__(self, user_id, first_name, last_name, username):
|
||||
self.id = user_id
|
||||
self.first_name = first_name
|
||||
self.last_name = last_name
|
||||
self.username = username
|
||||
|
||||
class Message:
|
||||
def __init__(self, user_id, first_name, last_name, username):
|
||||
self.new_chat_members = [Member(user_id, first_name, last_name, username)]
|
||||
self.message_id = MESSAGE_ID
|
||||
|
||||
class Chat:
|
||||
def __init__(self):
|
||||
self.id = CHAT_ID
|
||||
|
||||
class Update:
|
||||
def __init__(self, user_id, first_name, last_name, username):
|
||||
self.message = Message(user_id, first_name, last_name, username)
|
||||
self.effective_chat = Chat()
|
||||
|
||||
class TestAutoBanBot(unittest.TestCase):
|
||||
def setUp(self):
|
||||
autobanbot.load_config()
|
||||
|
||||
def banned_join(self, user_id, first_name, last_name, username):
|
||||
bot = Mock(spec=BOT_SPEC)
|
||||
self.assertTrue(autobanbot.hello(
|
||||
bot, Update(user_id, first_name, last_name, username)
|
||||
))
|
||||
bot.kick_chat_member.assert_called_once_with(CHAT_ID, USER_ID)
|
||||
bot.delete_message.assert_called_once_with(CHAT_ID, MESSAGE_ID)
|
||||
|
||||
def legit_join(self, user_id, first_name, last_name, username):
|
||||
bot = Mock(spec=BOT_SPEC)
|
||||
self.assertFalse(autobanbot.hello(
|
||||
bot, Update(user_id, first_name, last_name, username)
|
||||
))
|
||||
bot.kick_chat_member.assert_not_called()
|
||||
bot.delete_message.assert_not_called()
|
||||
|
||||
def test_spam_joins(self):
|
||||
updates = [
|
||||
"READY-MADE TELEGRAM ACCOUNTS",
|
||||
"ready-made TeLeGrAm AcCoUnTs",
|
||||
"there are ready-made TeLeGrAm AcCoUnTs for sale!",
|
||||
"253239090 hi hello 473157472"
|
||||
]
|
||||
for update in updates:
|
||||
with self.subTest(first_name=update):
|
||||
self.banned_join(USER_ID, update, "", "")
|
||||
with self.subTest(last_name=update):
|
||||
self.banned_join(USER_ID, "", update, "")
|
||||
with self.subTest(last_name=update, first_name=None):
|
||||
self.banned_join(USER_ID, None, update, "")
|
||||
with self.subTest(last_name=None, first_name=update):
|
||||
self.banned_join(USER_ID, update, None, "")
|
||||
with self.subTest(last_name=update, first_name=update):
|
||||
self.banned_join(USER_ID, update, update, "")
|
||||
|
||||
def test_non_spam_joins(self):
|
||||
updates = [
|
||||
"telegram account",
|
||||
"abc",
|
||||
"ready-made telegram naccounts",
|
||||
"completely legit name",
|
||||
""
|
||||
]
|
||||
for update in updates:
|
||||
with self.subTest(first_name=update):
|
||||
self.legit_join(USER_ID, update, "", "")
|
||||
with self.subTest(last_name=update):
|
||||
self.legit_join(USER_ID, "", update, "")
|
||||
with self.subTest(first_name=update, last_name=None):
|
||||
self.legit_join(USER_ID, update, None, "")
|
||||
with self.subTest(last_name=update, first_name=None):
|
||||
self.legit_join(USER_ID, None, update, "")
|
||||
with self.subTest(last_name=update, first_name=update):
|
||||
self.legit_join(USER_ID, update, update, "")
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
Reference in New Issue