'''This is most huge module for autobioeb, autohealing and etc...''' from s import states, config, avocado_id from telethon import events, utils, functions from loguru import logger from datetime import datetime, timedelta import asyncio import time import random import re def find_infect_theme(text): # NOTE: theme hell... any ideas for improvment required # but not use huge regular expression like|that|fuckin|way|a|aaaa|aaaaaaaa # because it makes re.findall like mess... default_bioexpr_theme = r"Прибыль: ([0-9\.\,k]+)" default_infected_days_theme = r' на ([0-9\ ]+) д.*' default_pathogen_remaining_theme = r'Осталось: ([0-9\ ]+)' bio_attack_themes = ( # I guess if too many themes it will be slow, but acceptable, because python slow as is. # current order in theme: # ('infected', 'bio_expr', 'infected days', 'pathogen remaining') # UA theme (r'.* йобнув.+', r"([0-9\.\,k]+) біо-ресурса", default_infected_days_theme, default_pathogen_remaining_theme), # RU theme (r'.* подверг.+', default_bioexpr_theme, default_infected_days_theme, default_pathogen_remaining_theme), # EN theme (r'.* infected.+', r"([0-9\.\,k]+) pcs\.", r' for ([0-9\ ]+) d.*', r'Remaining: ([0-9\ ]+)'), # AZ theme (r'.* сикди.+', r"верир: ([0-9\.\,k]+)", default_infected_days_theme, default_pathogen_remaining_theme), # "ПК гик" theme (r'.* насрал.+', r"потеряет: ([0-9\.\,k]+)", default_infected_days_theme, default_pathogen_remaining_theme), # "Новогодняя" theme (r'.* подверг заморозке.+', default_bioexpr_theme, default_infected_days_theme, default_pathogen_remaining_theme), # "Сексуальная индустрия" theme (r'.*.+выебал.+', r"кончила ([0-9\.\,k]+)", r' ещё ([0-9\ ]+) д.*', default_pathogen_remaining_theme), # "Аферисты в сетях" theme (r'.* атаковал.+', r"приносит: ([0-9\.\,k]+)", default_infected_days_theme, default_pathogen_remaining_theme), # "Квадробер" theme (r'.*.+бешенству.+', r"корма: ([0-9\.\,k]+)", default_infected_days_theme, default_pathogen_remaining_theme), # UA theme [via trust] (r'.* за допомогою довіреності зазнала зараження.+', r"([0-9\.\,k]+) біо-ресурса", default_infected_days_theme, default_pathogen_remaining_theme), # RU theme [via trust] (r'.* при помощи доверенности подвергла заражению.+', default_bioexpr_theme, default_infected_days_theme, default_pathogen_remaining_theme), # EN theme [via trust] (r'.* by authorization infected.+', r"([0-9\.\,k]+) pcs\.", r' for ([0-9\ ]+) d.*', r'Remaining: ([0-9\ ]+)'), # idk what is theme [via trust] (r'.* при помощи анонимуса атаковала.+', r'приносит: ([0-9\.\,k]+)', default_infected_days_theme, default_pathogen_remaining_theme), ) for theme in bio_attack_themes: trying_theme_index = bio_attack_themes.index(theme) logger.debug(f'trying theme {trying_theme_index}...') r, bioexp, days, remaining = re.findall(theme[0], text), re.findall(theme[1], text), re.findall(theme[2], text), re.findall(theme[3], text) if r and bioexp and days and remaining: logger.debug(f'found theme {trying_theme_index}') return r, bio_attack_themes[trying_theme_index] for theme in bio_attack_themes: trying_theme_index = bio_attack_themes.index(theme) logger.debug(f'trying theme {trying_theme_index} [without first lab]...') r, bioexp, days, remaining = re.findall(theme[0].split('', 1)[1], text), re.findall(theme[1], text), re.findall(theme[2], text), re.findall(theme[3], text) if r and bioexp and days and remaining: logger.debug(f'found theme {trying_theme_index}, but invisible first lab it will be replaced to avocado id') r = [(f'tg://openmessage?user_id={avocado_id}', r[0])] return r, bio_attack_themes[trying_theme_index] return r, None async def eb(client, c, conn, con, d, get_id, my_id, message_q): @client.on(events.NewMessage( pattern='.*йобнув.*|.*подверг(ла)?.*|.*infected.*|.*сикди.*|.*насрал.*|.*выебал.*|.*за допомогою довіреності.*|.*by authorization infected.*|.*при помощи анонимуса атаковала.*', from_users=(avocado_id, 'me'))) @logger.catch async def podverg_a(event): logger.debug('bio attack detected') # хто там кого йобнув(ла) m = event.message cinfo = await m.get_chat() chat_name = cinfo.title logger.debug(f"in chat '{chat_name}'") states.stats_most_infect_spam_chats[chat_name] += 1 t = m.raw_text if len(m.entities) > 1: h = utils.sanitize_parse_mode( 'html').unparse(t, m.entities) # HTML r, bio_attack_theme = find_infect_theme(h) if r == []: logger.warning( 'theme not found or lost part of message, showing original message: ' + m.text) logger.debug(str(r)) if r: u1url = r[0][0] u2url = r[0][1] u1id = await get_id(u1url) u2id = await get_id(u2url) bio_excludes = [x[0] for x in c.execute( 'select user_id from avocado_exclude').fetchall()] when = int(datetime.timestamp(m.date)) days = int(re.findall(bio_attack_theme[2], t)[ 0].replace(' ', '')) experience = re.findall( bio_attack_theme[1], t)[0].strip() if ',' in experience: experience = re.sub(r',', r'.', experience) if 'k' in experience: exp_int = int( float(re.sub('k', '', experience)) * 1000) else: exp_int = int(experience) pathogen_remaining = int(re.findall( bio_attack_theme[3], t)[0]) if pathogen_remaining <= states.auto_bioeb_pathogen_threshold and u1id == my_id: states.auto_bioeb_sleep_interval = states.auto_bioeb_max_interval logger.warning( f'Interval bioeb changed (slow down): {states.auto_bioeb_sleep_interval}') elif u1id == my_id: states.auto_bioeb_sleep_interval = states.auto_bioeb_min_interval logger.debug( f'Interval bioeb changed (more fast): {states.auto_bioeb_sleep_interval}') a = datetime.utcfromtimestamp( when)+timedelta(days=int(days), hours=3) do_int = datetime.timestamp(a) do_txt = str(a.strftime("%d.%m.%y")) if u1id > 0 and u2id > 0: if config.db_sqlite3 and u1id == my_id: try: c.execute("INSERT INTO avocado(user_id,when_int,bio_int,expr_int,expr_str) VALUES (?, ?, ?, ?, ?)", ( int(u2id), int(when), int(exp_int), int(datetime.timestamp(a)), str(a.strftime("%d.%m.%y")))) conn.commit() logger.debug( '[new] success writen my bio attack') except: try: c.execute("UPDATE avocado SET when_int = :wh, bio_int = :xpi, expr_int = :end, expr_str = :do WHERE user_id = :z AND when_int <= :wh;", { "wh": int(when), "xpi": int(exp_int), "end": int(datetime.timestamp(a)), "do": str(a.strftime("%d.%m.%y")), "z": int(u2id)}) conn.commit() logger.debug( '[upd] success updated my bio attack') except Exception as Err: logger.exception(f'err: {Err} avocado') states.last_reply_bioeb_avocado = time.time() if config.db_sqlite3 and u1id != my_id and u2id not in bio_excludes: try: c.execute("INSERT INTO avocado(user_id,when_int,bio_int,expr_int) VALUES (?, ?, ?, ?)", ( int(u2id), int(when), int(exp_int), 0)) conn.commit() logger.debug('[new] success writen bio attack') except: # NOTE: this maybe useful if you want sort database by bio-experience, but as S1S13AF7 said this # can be like: in database you have +10k today, tomorrow it changed to +1... # so... idk what next... c.execute("UPDATE avocado SET when_int = :wh, bio_int = :xpi WHERE user_id = :z AND when_int < :wh AND expr_int < :wh", { "wh": int(when), "xpi": int(exp_int), "z": int(u2id)}) conn.commit() logger.debug( '[upd] success updated bio attack') if config.db_pymysql: try: # from_infect who_id user_id profit until_infect until_str d.execute("INSERT INTO `tg_bio_attack` (`who_id`, `user_id`, `from_infect`, `profit`, `until_infect`, `until_str`) VALUES (%s,%s,%s,%s,%s,%s) ON DUPLICATE KEY UPDATE from_infect=VALUES (from_infect),profit=VALUES (profit),until_infect=VALUES (until_infect),until_str = VALUES (until_str);", (int( u1id), int(u2id), int(when), str(experience), int(datetime.timestamp(a)), str(a.strftime("%d.%m.%y")))) con.commit() print( f"\nINSERT INTO .... ON DUPLICATE KEY UPDATE # [@{u1id}] => [@{u2id}]\n") except Exception as Err: logger.exception(f'err: {Err} (tg_bio_attack)') # pass try: # user_id when profit d.execute("INSERT INTO `tg_bio_users` (`user_id`, `when_int`, `profit`) VALUES (%s,%s,%s) ON DUPLICATE KEY UPDATE when_int=VALUES (when_int),profit=VALUES (profit);", (int( u2id), int(when), str(experience))) con.commit() except Exception as Err: logger.exception(f'err: {Err} (tg_bio_users)') # pass if u1id == my_id: logger.success( f'''me подверг(ла) {u2url} [@{u2id}] +{experience}, d: {days}''') else: logger.info( f'''{u1url} [@{u1id}] подверг(ла) {u2url} [@{u2id}] +{experience}, d: {days}''') if u2id in bio_excludes: logger.debug(f'{u2id} not added: excluded') #################################################################### @client.on(events.NewMessage(outgoing=True, pattern=r'\.biofuck$')) async def cmd_bf(event): # крч акуратно з цим,вдруг шо я нічо if states.auto_bioeb_stop is False: await event.edit('biofucking already runned!') return m = event.message when = int(datetime.timestamp(m.date)) msg = '🤷' # якщо нема кого то жри рандом. def get_some_patients(limit=1000): count = int(c.execute( f"SELECT COUNT(*) FROM `avocado` WHERE expr_int <= {when} ORDER BY expr_int,when_int ASC LIMIT {limit}").fetchone()[0]) patients = list(c.execute( f"SELECT * FROM `avocado` WHERE expr_int <= {when} ORDER BY expr_int,when_int ASC LIMIT {limit}").fetchall()) bio_excludes = [x[0] for x in c.execute( 'select user_id from avocado_exclude').fetchall()] for p in patients: if p[0] in bio_excludes: logger.warning( f'skipping patient {p[0]}, excluded from bioebinng') patients.remove(p) return count, patients count, e_info = get_some_patients() # more random for random and reduce risk get very immun target after restart random.shuffle(e_info) if count < 2: nema = '🤷 рандом хавай.' await event.edit(nema) # ред logger.warning(nema) else: pong = '✅ погнали...' states.auto_bioeb_stop = False await event.edit(pong) # ред logger.info( f'є {count} потенційних пацієнтів. спробуєм їх сожрать') while states.auto_bioeb_stop is False: # скільки спим: random rs = float(random.uniform( states.auto_bioeb_sleep_interval[0], states.auto_bioeb_sleep_interval[1])) eb = f'Биоеб {e_info[0][0]}' # повідомлення. m = await event.reply(eb) e_info.pop(0) remaining_in_stack = len(e_info) logger.info( f'remaining patiences in current stack: {remaining_in_stack}') random.shuffle(e_info) states.last_sent_bioeb = int(datetime.timestamp(m.date)) if states.last_reply_bioeb_avocado == 0: # reduce negative ping states.last_reply_bioeb_avocado = int( datetime.timestamp(m.date)) await asyncio.sleep(3.3) await client.delete_messages(event.chat_id, m.id) delta_avocado = states.last_reply_bioeb_avocado - states.last_sent_bioeb if delta_avocado < 0: delta_avocado = delta_avocado * -1 logger.debug( f'latency avocado reply: {delta_avocado} secs') if delta_avocado > states.avocado_reply_timeout: interval_with_lag = rs + \ random.uniform(9.18299148, 40.9201412499) logger.debug( f'bioeb sleep [increased, because avocado have lag]: {interval_with_lag}s') await asyncio.sleep(interval_with_lag) else: logger.debug(f'bioeb sleep: {rs}s') await asyncio.sleep(rs) if len(e_info) <= 0: count, e_info = get_some_patients() if count < 2: event.reply('Закончились, рандом хавай') logger.warning('you are eaten all') break random.shuffle(e_info) e_count = len(e_info) logger.success( f'db refresh: {count} patiences; in stack: {e_count}') states.auto_bioeb_stop = True logger.warning('auto bioeb stopped') await event.reply('stopped') #################################################################### @client.on(events.NewMessage(outgoing=True, pattern=r'\.biofuck stop$')) async def stop_bioeb(event): states.auto_bioeb_stop = True await event.edit('Trying stop...') # ред @client.on(events.NewMessage(outgoing=True, pattern=r'\.bioexclude')) async def add_bioeb_exclude(event): reason = event.text.split(' ', 1)[1] or None reply = await client.get_messages(event.peer_id, ids=event.reply_to.reply_to_msg_id) if not reply.entities: await event.edit('ids not found') return t = reply.raw_text h = utils.sanitize_parse_mode( 'html').unparse(t, reply.entities) # HTML r = re.findall( r'', h) insertion_status = [] for link in r: user_id = await get_id(link) try: c.execute( "INSERT INTO avocado_exclude(user_id, reason) VALUES (?, ?)", (user_id, reason)) insertion_status.append(f'{user_id}: ok') except: insertion_status.append(f'{user_id}: exists') conn.commit() insertion_status = '\n'.join(insertion_status) await event.edit(f'{insertion_status}\nreason: {reason}') @client.on(events.NewMessage(outgoing=True, pattern=r'\.bioebmass')) async def bioeb_mass(event): arg = event.text.split(' ', 1) if len(arg) > 1: try: arg = int(arg[1]) except: await event.edit('Argument should be integer from 1 to 10') else: arg = None reply = await client.get_messages(event.peer_id, ids=event.reply_to.reply_to_msg_id) when = int(datetime.timestamp(event.date)) t = reply.raw_text h = utils.sanitize_parse_mode( 'html').unparse(t, reply.entities) # HTML r_as_list = [] r = re.findall( r'|(@\d+)', h) for x in r: r_as_list.extend(x) r = r_as_list if r == []: await event.edit('nothing to do: ids not found') return def filter_bioeb(victims_ids): bio_excludes = [x[0] for x in c.execute( 'SELECT user_id FROM avocado_exclude').fetchall()] filted_victims = [] for v in victims_ids: if v in bio_excludes: logger.warning( f'skipping patient {v}, excluded from bioebinng') elif c.execute(f'SELECT user_id FROM avocado WHERE expr_int >= {when} and user_id == {v}').fetchone(): logger.warning(f'skipping patient {v}, already eaten') else: filted_victims.append(v) return list(set(filted_victims)) bioebbing_ids = [] for i in r: if i == '': continue if i.startswith('@'): bioebbing_ids.append(int(i.replace('@', ''))) else: bioebbing_ids.append(await get_id(i)) bioebbing_ids = filter_bioeb(bioebbing_ids) bioebbing_len = len(bioebbing_ids) if bioebbing_len == 0: await event.edit('already eaten or excluded') return await event.edit(f'trying eat {bioebbing_len} patients...') for patient in bioebbing_ids: await asyncio.sleep(random.uniform(1.234, 4.222)) if arg: await event.respond(f'биоеб {arg} {patient}') else: await event.respond(f'биоеб {patient}') @client.on(events.NewMessage(outgoing=True, pattern=r'\.biocheck$')) async def set_default_check_chat(event): states.where_send_check_avocado = event.peer_id await event.edit('Checks will be send here') @client.on(events.NewMessage(pattern='.+Служба безопасности лаборатории', from_users=(707693258, 5137994780, 5226378684, 5443619563, 5434504334))) # Организатор заражения: нада биоебнуть? async def iris_sb(event): # iris off bio 31.12.24 m = event.message t = m.raw_text if config.a_404_patient and len(m.entities) > 1 and states.where_send_check_avocado: h = utils.sanitize_parse_mode( 'html').unparse(t, m.entities) # HTML r = re.findall( r'Организатор заражения: ', h) user_url = r[0] # user_id = await get_id(user_url) if r: await asyncio.sleep(random.uniform(1, 2)) logger.info(f'auto checking iris -> avocado: {user_url}') m = await client.send_message(states.where_send_check_avocado, f'.ч {user_url}') await asyncio.sleep(random.uniform(1, 5)) await client.delete_messages(m.chat_id, m.id) #################################################################### @client.on(events.NewMessage(pattern='⏱?🚫 Жертва', from_users=(avocado_id,))) async def infection_not_found(event): m = event.message if config.a_404_patient and m.mentioned: await asyncio.sleep(random.uniform(1.0001, 2.22394)) result = await client(functions.messages.GetBotCallbackAnswerRequest( # src https://tl.telethon.dev/methods/messages/get_bot_callback_answer.html peer=m.peer_id, msg_id=m.id, game=False, # idk why it works only when it false... 0_o data=m.reply_markup.rows[0].buttons[0].data )) logger.info('trying eat patient') if result.message: logger.info(f'avocado says: {result.message}') @client.on(events.NewMessage(pattern='🌡 У вас горячка вызванная', from_users=(avocado_id,))) async def need_h(event): m = event.message # reply = await client.get_messages(m.peer_id, ids=m.reply_to.reply_to_msg_id) # logger.debug(reply) if config.a_h and m.mentioned: # нада хил ah = await message_q( # отправляет сообщение боту "Хил", avocado_id, mark_read=True, delete=False, ) states.stats_medkit += 1 states.last_reply_bioeb_avocado = int( datetime.timestamp(event.date)) logger.debug(ah.text) logger.warning('Used medkit') elif m.mentioned: # alternative method: just waiting, this reduce bio-res usage states.auto_bioeb_sleep_interval = (3600, 3600) states.last_reply_bioeb_avocado = int( datetime.timestamp(event.date)) logger.warning( 'Waiting for infection release... [For skip just bioeb somebody]') #################################################################### @client.on(events.NewMessage(pattern='👺 Юзер не знайдений!', from_users=(avocado_id,))) async def avocado_404(event): if event.reply_to: reply = await client.get_messages(event.peer_id, ids=event.reply_to.reply_to_msg_id) t = reply.raw_text if reply.entities: t = utils.sanitize_parse_mode('html').unparse(t, reply.entities) r = re.findall(r'([0-9]{6,10})', t) if r: id = int(r[0]) if config.db_pymysql: try: con.query(f"DELETE FROM `tg_bio_attack` WHERE `user_id` = {id};") except: pass try: con.query(f"DELETE FROM `tg_bio_users` WHERE `user_id` = {id};") except: pass if config.db_sqlite3: try: c.execute("DELETE FROM avocado WHERE user_id = %d" % int(id)) conn.commit() # rm 404 id except Exception as Err: logger.exception(f'err: {Err} in DELETE FROM avocado WHERE `user_id` = {id}') ####################################################################