'''This is most huge module for autobioeb, autohealing and etc...'''
from s import states, config, avocado_id
from telethon import events, utils, functions
from loguru import logger
from datetime import datetime, timedelta
import asyncio
import time
import random
import re
def find_infect_theme(text):
# NOTE: theme hell... any ideas for improvment required
# but not use huge regular expression like|that|fuckin|way|a|aaaa|aaaaaaaa
# because it makes re.findall like mess...
default_bioexpr_theme = r"Прибыль: ([0-9\.\,k]+)"
default_infected_days_theme = r' на ([0-9\ ]+) д.*'
default_pathogen_remaining_theme = r'Осталось: ([0-9\ ]+)'
bio_attack_themes = ( # I guess if too many themes it will be slow, but acceptable, because python slow as is.
# current order in theme:
# ('infected', 'bio_expr', 'infected days', 'pathogen remaining')
# UA theme
(r'.* йобнув.+',
r"([0-9\.\,k]+) біо-ресурса",
default_infected_days_theme,
default_pathogen_remaining_theme),
# RU theme
(r'.* подверг.+',
default_bioexpr_theme,
default_infected_days_theme,
default_pathogen_remaining_theme),
# EN theme
(r'.* infected.+',
r"([0-9\.\,k]+) pcs\.",
r' for ([0-9\ ]+) d.*',
r'Remaining: ([0-9\ ]+)'),
# AZ theme
(r'.* сикди.+',
r"верир: ([0-9\.\,k]+)",
default_infected_days_theme,
default_pathogen_remaining_theme),
# "ПК гик" theme
(r'.* насрал.+',
r"потеряет: ([0-9\.\,k]+)",
default_infected_days_theme,
default_pathogen_remaining_theme),
# "Новогодняя" theme
(r'.* подверг заморозке.+',
default_bioexpr_theme,
default_infected_days_theme,
default_pathogen_remaining_theme),
# "Сексуальная индустрия" theme
(r'.*.+выебал.+',
r"кончила ([0-9\.\,k]+)",
r' ещё ([0-9\ ]+) д.*',
default_pathogen_remaining_theme),
# "Аферисты в сетях" theme
(r'.* атаковал.+',
r"приносит: ([0-9\.\,k]+)",
default_infected_days_theme,
default_pathogen_remaining_theme),
# "Квадробер" theme
(r'.*.+бешенству.+',
r"корма: ([0-9\.\,k]+)",
default_infected_days_theme,
default_pathogen_remaining_theme),
# UA theme [via trust]
(r'.* за допомогою довіреності зазнала зараження.+',
r"([0-9\.\,k]+) біо-ресурса",
default_infected_days_theme,
default_pathogen_remaining_theme),
# RU theme [via trust]
(r'.* при помощи доверенности подвергла заражению.+',
default_bioexpr_theme,
default_infected_days_theme,
default_pathogen_remaining_theme),
# EN theme [via trust]
(r'.* by authorization infected.+',
r"([0-9\.\,k]+) pcs\.",
r' for ([0-9\ ]+) d.*',
r'Remaining: ([0-9\ ]+)'),
# idk what is theme [via trust]
(r'.* при помощи анонимуса атаковала.+',
r'приносит: ([0-9\.\,k]+)',
default_infected_days_theme,
default_pathogen_remaining_theme),
)
for theme in bio_attack_themes:
trying_theme_index = bio_attack_themes.index(theme)
logger.debug(f'trying theme {trying_theme_index}...')
r, bioexp, days, remaining = re.findall(theme[0], text), re.findall(theme[1], text), re.findall(theme[2], text), re.findall(theme[3], text)
if r and bioexp and days and remaining:
logger.debug(f'found theme {trying_theme_index}')
return r, bio_attack_themes[trying_theme_index]
for theme in bio_attack_themes:
trying_theme_index = bio_attack_themes.index(theme)
logger.debug(f'trying theme {trying_theme_index} [without first lab]...')
r, bioexp, days, remaining = re.findall(theme[0].split('', 1)[1], text), re.findall(theme[1], text), re.findall(theme[2], text), re.findall(theme[3], text)
if r and bioexp and days and remaining:
logger.debug(f'found theme {trying_theme_index}, but invisible first lab it will be replaced to avocado id')
r = [(f'tg://openmessage?user_id={avocado_id}', r[0])]
return r, bio_attack_themes[trying_theme_index]
return r, None
async def eb(client, c, conn, con, d, get_id, my_id, message_q):
@client.on(events.NewMessage(
pattern='.*йобнув.*|.*подверг(ла)?.*|.*infected.*|.*сикди.*|.*насрал.*|.*выебал.*|.*за допомогою довіреності.*|.*by authorization infected.*|.*при помощи анонимуса атаковала.*',
from_users=(avocado_id, 'me')))
@logger.catch
async def podverg_a(event):
logger.debug('bio attack detected')
# хто там кого йобнув(ла)
m = event.message
cinfo = await m.get_chat()
chat_name = cinfo.title
logger.debug(f"in chat '{chat_name}'")
states.stats_most_infect_spam_chats[chat_name] += 1
t = m.raw_text
if len(m.entities) > 1:
h = utils.sanitize_parse_mode(
'html').unparse(t, m.entities) # HTML
r, bio_attack_theme = find_infect_theme(h)
if r == []:
logger.warning(
'theme not found or lost part of message, showing original message: ' + m.text)
logger.debug(str(r))
if r:
u1url = r[0][0]
u2url = r[0][1]
u1id = await get_id(u1url)
u2id = await get_id(u2url)
bio_excludes = [x[0] for x in c.execute(
'select user_id from avocado_exclude').fetchall()]
when = int(datetime.timestamp(m.date))
days = int(re.findall(bio_attack_theme[2], t)[
0].replace(' ', ''))
experience = re.findall(
bio_attack_theme[1], t)[0].strip()
if ',' in experience:
experience = re.sub(r',', r'.', experience)
if 'k' in experience:
exp_int = int(
float(re.sub('k', '', experience)) * 1000)
else:
exp_int = int(experience)
pathogen_remaining = int(re.findall(
bio_attack_theme[3], t)[0])
if pathogen_remaining <= states.auto_bioeb_pathogen_threshold and u1id == my_id:
states.auto_bioeb_sleep_interval = states.auto_bioeb_max_interval
logger.warning(
f'Interval bioeb changed (slow down): {states.auto_bioeb_sleep_interval}')
elif u1id == my_id:
states.auto_bioeb_sleep_interval = states.auto_bioeb_min_interval
logger.debug(
f'Interval bioeb changed (more fast): {states.auto_bioeb_sleep_interval}')
a = datetime.utcfromtimestamp(
when)+timedelta(days=int(days), hours=3)
do_int = datetime.timestamp(a)
do_txt = str(a.strftime("%d.%m.%y"))
if u1id > 0 and u2id > 0:
if config.db_sqlite3 and u1id == my_id:
try:
c.execute("INSERT INTO avocado(user_id,when_int,bio_int,expr_int,expr_str) VALUES (?, ?, ?, ?, ?)", (
int(u2id), int(when), int(exp_int), int(datetime.timestamp(a)), str(a.strftime("%d.%m.%y"))))
conn.commit()
logger.debug(
'[new] success writen my bio attack')
except:
try:
c.execute("UPDATE avocado SET when_int = :wh, bio_int = :xpi, expr_int = :end, expr_str = :do WHERE user_id = :z AND when_int <= :wh;", {
"wh": int(when), "xpi": int(exp_int), "end": int(datetime.timestamp(a)), "do": str(a.strftime("%d.%m.%y")), "z": int(u2id)})
conn.commit()
logger.debug(
'[upd] success updated my bio attack')
except Exception as Err:
logger.exception(f'err: {Err} avocado')
states.last_reply_bioeb_avocado = time.time()
if config.db_sqlite3 and u1id != my_id and u2id not in bio_excludes:
try:
c.execute("INSERT INTO avocado(user_id,when_int,bio_int,expr_int) VALUES (?, ?, ?, ?)", (
int(u2id), int(when), int(exp_int), 0))
conn.commit()
logger.debug('[new] success writen bio attack')
except:
# NOTE: this maybe useful if you want sort database by bio-experience, but as S1S13AF7 said this
# can be like: in database you have +10k today, tomorrow it changed to +1...
# so... idk what next...
c.execute("UPDATE avocado SET when_int = :wh, bio_int = :xpi WHERE user_id = :z AND when_int < :wh AND expr_int < :wh", {
"wh": int(when), "xpi": int(exp_int), "z": int(u2id)})
conn.commit()
logger.debug(
'[upd] success updated bio attack')
if config.db_pymysql:
try:
# from_infect who_id user_id profit until_infect until_str
d.execute("INSERT INTO `tg_bio_attack` (`who_id`, `user_id`, `from_infect`, `profit`, `until_infect`, `until_str`) VALUES (%s,%s,%s,%s,%s,%s) ON DUPLICATE KEY UPDATE from_infect=VALUES (from_infect),profit=VALUES (profit),until_infect=VALUES (until_infect),until_str = VALUES (until_str);", (int(
u1id), int(u2id), int(when), str(experience), int(datetime.timestamp(a)), str(a.strftime("%d.%m.%y"))))
con.commit()
print(
f"\nINSERT INTO .... ON DUPLICATE KEY UPDATE # [@{u1id}] => [@{u2id}]\n")
except Exception as Err:
logger.exception(f'err: {Err} (tg_bio_attack)')
# pass
try:
# user_id when profit
d.execute("INSERT INTO `tg_bio_users` (`user_id`, `when_int`, `profit`) VALUES (%s,%s,%s) ON DUPLICATE KEY UPDATE when_int=VALUES (when_int),profit=VALUES (profit);", (int(
u2id), int(when), str(experience)))
con.commit()
except Exception as Err:
logger.exception(f'err: {Err} (tg_bio_users)')
# pass
if u1id == my_id:
logger.success(
f'''me подверг(ла) {u2url} [@{u2id}] +{experience}, d: {days}''')
else:
logger.info(
f'''{u1url} [@{u1id}] подверг(ла) {u2url} [@{u2id}] +{experience}, d: {days}''')
if u2id in bio_excludes:
logger.debug(f'{u2id} not added: excluded')
####################################################################
@client.on(events.NewMessage(outgoing=True, pattern=r'\.biofuck$'))
async def cmd_bf(event): # крч акуратно з цим,вдруг шо я нічо
if states.auto_bioeb_stop is False:
await event.edit('biofucking already runned!')
return
m = event.message
when = int(datetime.timestamp(m.date))
msg = '🤷' # якщо нема кого то жри рандом.
def get_some_patients(limit=1000):
count = int(c.execute(
f"SELECT COUNT(*) FROM `avocado` WHERE expr_int <= {when} ORDER BY expr_int,when_int ASC LIMIT {limit}").fetchone()[0])
patients = list(c.execute(
f"SELECT * FROM `avocado` WHERE expr_int <= {when} ORDER BY expr_int,when_int ASC LIMIT {limit}").fetchall())
bio_excludes = [x[0] for x in c.execute(
'select user_id from avocado_exclude').fetchall()]
for p in patients:
if p[0] in bio_excludes:
logger.warning(
f'skipping patient {p[0]}, excluded from bioebinng')
patients.remove(p)
return count, patients
count, e_info = get_some_patients()
# more random for random and reduce risk get very immun target after restart
random.shuffle(e_info)
if count < 2:
nema = '🤷 рандом хавай.'
await event.edit(nema) # ред
logger.warning(nema)
else:
pong = '✅ погнали...'
states.auto_bioeb_stop = False
await event.edit(pong) # ред
logger.info(
f'є {count} потенційних пацієнтів. спробуєм їх сожрать')
while states.auto_bioeb_stop is False:
# скільки спим: random
rs = float(random.uniform(
states.auto_bioeb_sleep_interval[0], states.auto_bioeb_sleep_interval[1]))
eb = f'Биоеб {e_info[0][0]}' # повідомлення.
m = await event.reply(eb)
e_info.pop(0)
remaining_in_stack = len(e_info)
logger.info(
f'remaining patiences in current stack: {remaining_in_stack}')
random.shuffle(e_info)
states.last_sent_bioeb = int(datetime.timestamp(m.date))
if states.last_reply_bioeb_avocado == 0: # reduce negative ping
states.last_reply_bioeb_avocado = int(
datetime.timestamp(m.date))
await asyncio.sleep(3.3)
await client.delete_messages(event.chat_id, m.id)
delta_avocado = states.last_reply_bioeb_avocado - states.last_sent_bioeb
if delta_avocado < 0:
delta_avocado = delta_avocado * -1
logger.debug(
f'latency avocado reply: {delta_avocado} secs')
if delta_avocado > states.avocado_reply_timeout:
interval_with_lag = rs + \
random.uniform(9.18299148, 40.9201412499)
logger.debug(
f'bioeb sleep [increased, because avocado have lag]: {interval_with_lag}s')
await asyncio.sleep(interval_with_lag)
else:
logger.debug(f'bioeb sleep: {rs}s')
await asyncio.sleep(rs)
if len(e_info) <= 0:
count, e_info = get_some_patients()
if count < 2:
event.reply('Закончились, рандом хавай')
logger.warning('you are eaten all')
break
random.shuffle(e_info)
e_count = len(e_info)
logger.success(
f'db refresh: {count} patiences; in stack: {e_count}')
states.auto_bioeb_stop = True
logger.warning('auto bioeb stopped')
await event.reply('stopped')
####################################################################
@client.on(events.NewMessage(outgoing=True, pattern=r'\.biofuck stop$'))
async def stop_bioeb(event):
states.auto_bioeb_stop = True
await event.edit('Trying stop...') # ред
@client.on(events.NewMessage(outgoing=True, pattern=r'\.bioexclude'))
async def add_bioeb_exclude(event):
reason = event.text.split(' ', 1)[1] or None
reply = await client.get_messages(event.peer_id, ids=event.reply_to.reply_to_msg_id)
if not reply.entities:
await event.edit('ids not found')
return
t = reply.raw_text
h = utils.sanitize_parse_mode(
'html').unparse(t, reply.entities) # HTML
r = re.findall(
r'', h)
insertion_status = []
for link in r:
user_id = await get_id(link)
try:
c.execute(
"INSERT INTO avocado_exclude(user_id, reason) VALUES (?, ?)", (user_id, reason))
insertion_status.append(f'{user_id}: ok')
except:
insertion_status.append(f'{user_id}: exists')
conn.commit()
insertion_status = '\n'.join(insertion_status)
await event.edit(f'{insertion_status}\nreason: {reason}')
@client.on(events.NewMessage(outgoing=True, pattern=r'\.bioebmass'))
async def bioeb_mass(event):
arg = event.text.split(' ', 1)
if len(arg) > 1:
try:
arg = int(arg[1])
except:
await event.edit('Argument should be integer from 1 to 10')
else:
arg = None
reply = await client.get_messages(event.peer_id, ids=event.reply_to.reply_to_msg_id)
when = int(datetime.timestamp(event.date))
t = reply.raw_text
h = utils.sanitize_parse_mode(
'html').unparse(t, reply.entities) # HTML
r_as_list = []
r = re.findall(
r'|(@\d+)', h)
for x in r:
r_as_list.extend(x)
r = r_as_list
if r == []:
await event.edit('nothing to do: ids not found')
return
def filter_bioeb(victims_ids):
bio_excludes = [x[0] for x in c.execute(
'SELECT user_id FROM avocado_exclude').fetchall()]
filted_victims = []
for v in victims_ids:
if v in bio_excludes:
logger.warning(
f'skipping patient {v}, excluded from bioebinng')
elif c.execute(f'SELECT user_id FROM avocado WHERE expr_int >= {when} and user_id == {v}').fetchone():
logger.warning(f'skipping patient {v}, already eaten')
else:
filted_victims.append(v)
return list(set(filted_victims))
bioebbing_ids = []
for i in r:
if i == '':
continue
if i.startswith('@'):
bioebbing_ids.append(int(i.replace('@', '')))
else:
bioebbing_ids.append(await get_id(i))
bioebbing_ids = filter_bioeb(bioebbing_ids)
bioebbing_len = len(bioebbing_ids)
if bioebbing_len == 0:
await event.edit('already eaten or excluded')
return
await event.edit(f'trying eat {bioebbing_len} patients...')
for patient in bioebbing_ids:
await asyncio.sleep(random.uniform(1.234, 4.222))
if arg:
await event.respond(f'биоеб {arg} {patient}')
else:
await event.respond(f'биоеб {patient}')
@client.on(events.NewMessage(outgoing=True, pattern=r'\.biocheck$'))
async def set_default_check_chat(event):
states.where_send_check_avocado = event.peer_id
await event.edit('Checks will be send here')
@client.on(events.NewMessage(pattern='.+Служба безопасности лаборатории',
from_users=(707693258, 5137994780, 5226378684, 5443619563, 5434504334)))
# Организатор заражения: нада биоебнуть?
async def iris_sb(event):
# iris off bio 31.12.24
m = event.message
t = m.raw_text
if config.a_404_patient and len(m.entities) > 1 and states.where_send_check_avocado:
h = utils.sanitize_parse_mode(
'html').unparse(t, m.entities) # HTML
r = re.findall(
r'Организатор заражения: ', h)
user_url = r[0]
# user_id = await get_id(user_url)
if r:
await asyncio.sleep(random.uniform(1, 2))
logger.info(f'auto checking iris -> avocado: {user_url}')
m = await client.send_message(states.where_send_check_avocado, f'.ч {user_url}')
await asyncio.sleep(random.uniform(1, 5))
await client.delete_messages(m.chat_id, m.id)
####################################################################
@client.on(events.NewMessage(pattern='⏱?🚫 Жертва', from_users=(avocado_id,)))
async def infection_not_found(event):
m = event.message
if config.a_404_patient and m.mentioned:
await asyncio.sleep(random.uniform(1.0001, 2.22394))
result = await client(functions.messages.GetBotCallbackAnswerRequest( # src https://tl.telethon.dev/methods/messages/get_bot_callback_answer.html
peer=m.peer_id,
msg_id=m.id,
game=False, # idk why it works only when it false... 0_o
data=m.reply_markup.rows[0].buttons[0].data
))
logger.info('trying eat patient')
if result.message:
logger.info(f'avocado says: {result.message}')
@client.on(events.NewMessage(pattern='🌡 У вас горячка вызванная', from_users=(avocado_id,)))
async def need_h(event):
m = event.message
# reply = await client.get_messages(m.peer_id, ids=m.reply_to.reply_to_msg_id)
# logger.debug(reply)
if config.a_h and m.mentioned:
# нада хил
ah = await message_q( # отправляет сообщение боту
"Хил",
avocado_id,
mark_read=True,
delete=False,
)
states.stats_medkit += 1
states.last_reply_bioeb_avocado = int(
datetime.timestamp(event.date))
logger.debug(ah.text)
logger.warning('Used medkit')
elif m.mentioned:
# alternative method: just waiting, this reduce bio-res usage
states.auto_bioeb_sleep_interval = (3600, 3600)
states.last_reply_bioeb_avocado = int(
datetime.timestamp(event.date))
logger.warning(
'Waiting for infection release... [For skip just bioeb somebody]')
####################################################################
@client.on(events.NewMessage(pattern='👺 Юзер не знайдений!'))
async def avocado_404(event):
m = event.message
if m.sender_id == 6333102398:
if event.reply_to:
reply = await client.get_messages(event.peer_id, ids=event.reply_to.reply_to_msg_id)
t= reply.raw_text
if reply.entities:
t=utils.sanitize_parse_mode('html').unparse(t,reply.entities)
r= re.findall(r'([0-9]{6,10})',t)
if r:
id=int(r[0])
if db_pymysql:
try:
con.query(f"DELETE FROM `tg_bio_attack` WHERE `user_id` = {id};");
except:
pass
try:
con.query(f"DELETE FROM `tg_bio_users` WHERE `user_id` = {id};"); #
except:
pass
if db_sqlite3:
try:
c.execute("DELETE FROM avocado WHERE user_id = %d" % int(id));
conn.commit() # rm 404 id
except Exception as Err:
print(f'err: {Err} in DELETE FROM avocado WHERE `user_id` = {id}')
####################################################################