'''This is most huge module for autobioeb, autohealing and etc...''' from s import states, config from telethon import events, utils, functions from loguru import logger from datetime import datetime, timedelta import asyncio import time import random import re async def eb(client, c, conn, con, d, get_id, my_id, message_q): @client.on(events.NewMessage( pattern='.*йобнув.*|.*подверг(ла)?.*|.*infected.*|.*сикди.*|.*насрал.*|.*выебал.*|.*за допомогою довіреності.*|.*by authorization infected.*|.*при помощи анонимуса атаковала.*', from_users=(6333102398, 'me'))) @logger.catch async def podverg_a(event): logger.debug('bio attack detected') # хто там кого йобнув(ла) m = event.message cinfo = await m.get_chat() chat_name = cinfo.title logger.debug(f"in chat '{chat_name}'") states.stats_most_infect_spam_chats[chat_name] += 1 t = m.raw_text # NOTE: theme hell... any ideas for improvment required # but not use huge regular expression like|that|fuckin|way|a|aaaa|aaaaaaaa # because it makes re.findall like mess... default_bioexpr_theme = r"Прибыль: ([0-9\.\,k]+)" default_infected_days_theme = r' на ([0-9\ ]+) д.*' default_pathogen_remaining_theme = r'Осталось: ([0-9\ ]+)' bio_attack_themes = ( # I guess if too many themes it will be slow, but acceptable, because python slow as is. # current order in theme: # ('infected', 'bio_expr', 'infected days', 'pathogen remaining') # UA theme (r'.* йобнув.+', r"([0-9\.\,k]+) біо-ресурса", default_infected_days_theme, default_pathogen_remaining_theme), # RU theme (r'.* подверг.+', default_bioexpr_theme, default_infected_days_theme, default_pathogen_remaining_theme), # EN theme (r'.* infected.+', r"([0-9\.\,k]+) pcs\.", r' for ([0-9\ ]+) d.*', r'Remaining: ([0-9\ ]+)'), # AZ theme (r'.* сикди.+', r"верир: ([0-9\.\,k]+)", default_infected_days_theme, default_pathogen_remaining_theme), # "ПК гик" theme (r'.* насрал.+', r"потеряет: ([0-9\.\,k]+)", default_infected_days_theme, default_pathogen_remaining_theme), # "Новогодняя" theme (r'.* подверг заморозке.+', default_bioexpr_theme, default_infected_days_theme, default_pathogen_remaining_theme), # "Сексуальная индустрия" theme (r'.* выебал.+', r"кончила ([0-9\.\,k]+)", r' ещё ([0-9\ ]+) д.*', default_pathogen_remaining_theme), # UA theme [via trust] (r'.* за допомогою довіреності зазнала зараження.+', r"([0-9\.\,k]+) біо-ресурса", default_infected_days_theme, default_pathogen_remaining_theme), # RU theme [via trust] (r'.* при помощи доверенности подвергла заражению.+', default_bioexpr_theme, default_infected_days_theme, default_pathogen_remaining_theme), # EN theme [via trust] (r'.* by authorization infected.+', r"([0-9\.\,k]+) pcs\.", r' for ([0-9\ ]+) d.*', r'Remaining: ([0-9\ ]+)'), # idk what is theme [via trust] (r'.* при помощи анонимуса атаковала.+', r'приносит: ([0-9\.\,k]+)', default_infected_days_theme, default_pathogen_remaining_theme), ) if len(m.entities) > 1: h = utils.sanitize_parse_mode( 'html').unparse(t, m.entities) # HTML for theme in bio_attack_themes: trying_theme_index = bio_attack_themes.index(theme) logger.debug(f'trying theme {trying_theme_index}...') r = re.findall(theme[0], h) if r: logger.debug(f'found theme {trying_theme_index}') break if r == []: logger.warning( 'theme not found, showing original message: ' + m.text) logger.debug(str(r)) if r: u1url = r[0][0] u2url = r[0][1] u1id = await get_id(u1url) u2id = await get_id(u2url) bio_excludes = [x[0] for x in c.execute( 'select user_id from avocado_exclude').fetchall()] # print(f'{u1url} [@{u1id}] подверг(ла) {u2url} [@{u2id}]')#показать when = int(datetime.timestamp(m.date)) days = int(re.findall(bio_attack_themes[trying_theme_index][2], t)[ 0].replace(' ', '')) experience = re.findall( bio_attack_themes[trying_theme_index][1], t)[0].strip() if ',' in experience: experience = re.sub(r',', r'.', experience) if 'k' in experience: exp_int = int( float(re.sub('k', '', experience)) * 1000) else: exp_int = int(experience) pathogen_remaining = int(re.findall( bio_attack_themes[trying_theme_index][3], t)[0]) if pathogen_remaining <= states.auto_bioeb_pathogen_threshold and u1id == my_id: states.auto_bioeb_sleep_interval = states.auto_bioeb_max_interval logger.warning( f'Interval bioeb changed (slow down): {states.auto_bioeb_sleep_interval}') elif u1id == my_id: states.auto_bioeb_sleep_interval = states.auto_bioeb_min_interval logger.debug( f'Interval bioeb changed (more fast): {states.auto_bioeb_sleep_interval}') a = datetime.utcfromtimestamp( when)+timedelta(days=int(days), hours=3) do_int = datetime.timestamp(a) do_txt = str(a.strftime("%d.%m.%y")) if u1id > 0 and u2id > 0: if config.db_sqlite3 and u1id == my_id: try: c.execute("INSERT INTO avocado(user_id,when_int,bio_int,expr_int,expr_str) VALUES (?, ?, ?, ?, ?)", ( int(u2id), int(when), int(exp_int), int(datetime.timestamp(a)), str(a.strftime("%d.%m.%y")))) conn.commit() logger.debug( '[new] success writen my bio attack') except: try: c.execute("UPDATE avocado SET when_int = :wh, bio_int = :xpi, expr_int = :end, expr_str = :do WHERE user_id = :z AND when_int <= :wh;", { "wh": int(when), "xpi": int(exp_int), "end": int(datetime.timestamp(a)), "do": str(a.strftime("%d.%m.%y")), "z": int(u2id)}) conn.commit() logger.debug( '[upd] success updated my bio attack') except Exception as Err: logger.exception(f'err: {Err} avocado') states.last_reply_bioeb_avocado = time.time() if config.db_sqlite3 and u1id != my_id and u2id not in bio_excludes: try: c.execute("INSERT INTO avocado(user_id,when_int,bio_int,expr_int) VALUES (?, ?, ?, ?)", ( int(u2id), int(when), int(exp_int), 0)) conn.commit() logger.debug('[new] success writen bio attack') except: # NOTE: this maybe useful if you want sort database by bio-experience, but as S1S13AF7 said this # can be like: in database you have +10k today, tomorrow it changed to +1... # so... idk what next... c.execute("UPDATE avocado SET when_int = :wh, bio_int = :xpi WHERE user_id = :z AND when_int < :wh AND expr_int < :wh", { "wh": int(when), "xpi": int(exp_int), "z": int(u2id)}) conn.commit() logger.debug( '[upd] success updated bio attack') if config.db_pymysql: try: # from_infect who_id user_id profit until_infect until_str d.execute("INSERT INTO `tg_bio_attack` (`who_id`, `user_id`, `from_infect`, `profit`, `until_infect`, `until_str`) VALUES (%s,%s,%s,%s,%s,%s) ON DUPLICATE KEY UPDATE from_infect=VALUES (from_infect),profit=VALUES (profit),until_infect=VALUES (until_infect),until_str = VALUES (until_str);", (int( u1id), int(u2id), int(when), str(experience), int(datetime.timestamp(a)), str(a.strftime("%d.%m.%y")))) con.commit() print( f"\nINSERT INTO .... ON DUPLICATE KEY UPDATE # [@{u1id}] => [@{u2id}]\n") except Exception as Err: logger.exception(f'err: {Err} (tg_bio_attack)') # pass try: # user_id when profit d.execute("INSERT INTO `tg_bio_users` (`user_id`, `when_int`, `profit`) VALUES (%s,%s,%s) ON DUPLICATE KEY UPDATE when_int=VALUES (when_int),profit=VALUES (profit);", (int( u2id), int(when), str(experience))) con.commit() except Exception as Err: logger.exception(f'err: {Err} (tg_bio_users)') # pass if u1id == my_id: logger.success( f'''me подверг(ла) {u2url} [@{u2id}] +{experience}, d: {days}''') else: logger.info( f'''{u1url} [@{u1id}] подверг(ла) {u2url} [@{u2id}] +{experience}, d: {days}''') if u2id in bio_excludes: logger.debug(f'{u2id} not added: excluded') #################################################################### @client.on(events.NewMessage(outgoing=True, pattern=r'\.biofuck$')) async def cmd_bf(event): # крч акуратно з цим,вдруг шо я нічо if states.auto_bioeb_stop is False: await event.edit('biofucking already runned!') return m = event.message when = int(datetime.timestamp(m.date)) msg = '🤷' # якщо нема кого то жри рандом. def get_some_patients(limit=1000): count = int(c.execute( f"SELECT COUNT(*) FROM `avocado` WHERE expr_int <= {when} ORDER BY expr_int,when_int ASC LIMIT {limit}").fetchone()[0]) patients = list(c.execute( f"SELECT * FROM `avocado` WHERE expr_int <= {when} ORDER BY expr_int,when_int ASC LIMIT {limit}").fetchall()) bio_excludes = [x[0] for x in c.execute( 'select user_id from avocado_exclude').fetchall()] for p in patients: if p[0] in bio_excludes: logger.warning( f'skipping patient {p[0]}, excluded from bioebinng') patients.remove(p) return count, patients count, e_info = get_some_patients() # more random for random and reduce risk get very immun target after restart random.shuffle(e_info) if count < 2: nema = '🤷 рандом хавай.' await event.edit(nema) # ред logger.warning(nema) else: pong = '✅ погнали...' states.auto_bioeb_stop = False await event.edit(pong) # ред logger.info( f'є {count} потенційних пацієнтів. спробуєм їх сожрать') while states.auto_bioeb_stop is False: # скільки спим: random rs = float(random.uniform( states.auto_bioeb_sleep_interval[0], states.auto_bioeb_sleep_interval[1])) eb = f'Биоеб {e_info[0][0]}' # повідомлення. m = await event.reply(eb) e_info.pop(0) remaining_in_stack = len(e_info) logger.info( f'remaining patiences in current stack: {remaining_in_stack}') random.shuffle(e_info) states.last_sent_bioeb = int(datetime.timestamp(m.date)) if states.last_reply_bioeb_avocado == 0: # reduce negative ping states.last_reply_bioeb_avocado = int( datetime.timestamp(m.date)) await asyncio.sleep(3.3) await client.delete_messages(event.chat_id, m.id) delta_avocado = states.last_reply_bioeb_avocado - states.last_sent_bioeb if delta_avocado < 0: delta_avocado = delta_avocado * -1 logger.debug( f'latency avocado reply: {delta_avocado} secs') if delta_avocado > states.avocado_reply_timeout: interval_with_lag = rs + \ random.uniform(9.18299148, 40.9201412499) logger.debug( f'bioeb sleep [increased, because avocado have lag]: {interval_with_lag}s') await asyncio.sleep(interval_with_lag) else: logger.debug(f'bioeb sleep: {rs}s') await asyncio.sleep(rs) if len(e_info) <= 0: count, e_info = get_some_patients() if count < 2: event.reply('Закончились, рандом хавай') logger.warning('you are eaten all') break random.shuffle(e_info) e_count = len(e_info) logger.success( f'db refresh: {count} patiences; in stack: {e_count}') states.auto_bioeb_stop = True logger.warning('auto bioeb stopped') await event.reply('stopped') #################################################################### @client.on(events.NewMessage(outgoing=True, pattern=r'\.biofuck stop$')) async def stop_bioeb(event): states.auto_bioeb_stop = True await event.edit('Trying stop...') # ред @client.on(events.NewMessage(outgoing=True, pattern=r'\.bioexclude')) async def add_bioeb_exclude(event): reason = event.text.split(' ', 1)[1] or None reply = await client.get_messages(event.peer_id, ids=event.reply_to.reply_to_msg_id) if not reply.entities: await event.edit('ids not found') return t = reply.raw_text h = utils.sanitize_parse_mode( 'html').unparse(t, reply.entities) # HTML r = re.findall( r'', h) insertion_status = [] for link in r: user_id = await get_id(link) try: c.execute( "INSERT INTO avocado_exclude(user_id, reason) VALUES (?, ?)", (user_id, reason)) insertion_status.append(f'{user_id}: ok') except: insertion_status.append(f'{user_id}: exists') conn.commit() insertion_status = '\n'.join(insertion_status) await event.edit(f'{insertion_status}\nreason: {reason}') @client.on(events.NewMessage(outgoing=True, pattern=r'\.bioebmass')) async def bioeb_mass(event): arg = event.text.split(' ', 1) if len(arg) > 1: try: arg = int(arg[1]) except: await event.edit('Argument should be integer from 1 to 10') reply = await client.get_messages(event.peer_id, ids=event.reply_to.reply_to_msg_id) when = int(datetime.timestamp(event.date)) t = reply.raw_text h = utils.sanitize_parse_mode( 'html').unparse(t, reply.entities) # HTML r_as_list = [] r = re.findall( r'|(@\d+)', h) for x in r: r_as_list.extend(x) r = r_as_list if r == []: await event.edit('nothing to do: ids not found') return def filter_bioeb(victims_ids): bio_excludes = [x[0] for x in c.execute( 'SELECT user_id FROM avocado_exclude').fetchall()] filted_victims = [] for v in victims_ids: if v in bio_excludes: logger.warning( f'skipping patient {v}, excluded from bioebinng') elif c.execute(f'SELECT user_id FROM avocado WHERE expr_int >= {when} and user_id == {v}').fetchone(): logger.warning(f'skipping patient {v}, already eaten') else: filted_victims.append(v) return list(set(filted_victims)) bioebbing_ids = [] for i in r: if i == '': continue if i.startswith('@'): bioebbing_ids.append(int(i.replace('@', ''))) else: bioebbing_ids.append(await get_id(i)) bioebbing_ids = filter_bioeb(bioebbing_ids) bioebbing_len = len(bioebbing_ids) if bioebbing_len == 0: await event.edit('already eaten or excluded') return await event.edit(f'trying eat {bioebbing_len} patients...') for patient in bioebbing_ids: await asyncio.sleep(random.uniform(1.234, 4.222)) if arg: await event.respond(f'биоеб {arg} {patient}') else: await event.respond(f'биоеб {patient}') @client.on(events.NewMessage(outgoing=True, pattern=r'\.biocheck$')) async def set_default_check_chat(event): states.where_send_check_avocado = event.peer_id await event.edit('Checks will be send here') @client.on(events.NewMessage(pattern='.+Служба безопасности лаборатории', from_users=(707693258, 5137994780, 5226378684, 5443619563, 5434504334))) # Организатор заражения: нада биоебнуть? async def iris_sb(event): # iris off bio 31.12.24 m = event.message t = m.raw_text if config.a_404_patient and len(m.entities) > 1 and states.where_send_check_avocado: h = utils.sanitize_parse_mode( 'html').unparse(t, m.entities) # HTML r = re.findall( r'Организатор заражения: ', h) user_url = r[0] # user_id = await get_id(user_url) if r: await asyncio.sleep(random.uniform(1, 2)) logger.info(f'auto checking iris -> avocado: {user_url}') m = await client.send_message(states.where_send_check_avocado, f'.ч {user_url}') await asyncio.sleep(random.uniform(1, 5)) await client.delete_messages(m.chat_id, m.id) #################################################################### @client.on(events.NewMessage(pattern='⏱?🚫 Жертва', from_users=(6333102398,))) async def infection_not_found(event): m = event.message if config.a_404_patient and m.mentioned: await asyncio.sleep(random.uniform(1.0001, 2.22394)) result = await client(functions.messages.GetBotCallbackAnswerRequest( # src https://tl.telethon.dev/methods/messages/get_bot_callback_answer.html peer=m.peer_id, msg_id=m.id, game=False, # idk why it works only when it false... 0_o data=m.reply_markup.rows[0].buttons[0].data )) logger.info('trying eat patient') if result.message: logger.info(f'avocado says: {result.message}') @client.on(events.NewMessage(pattern='🌡 У вас горячка вызванная', from_users=(6333102398,))) async def need_h(event): m = event.message # reply = await client.get_messages(m.peer_id, ids=m.reply_to.reply_to_msg_id) # logger.debug(reply) if config.a_h and m.mentioned: # нада хил ah = await message_q( # отправляет сообщение боту "Хил", 6333102398, mark_read=True, delete=False, ) states.stats_medkit += 1 states.last_reply_bioeb_avocado = int( datetime.timestamp(event.date)) logger.debug(ah.text) logger.warning('Used medkit') elif m.mentioned: # alternative method: just waiting, this reduce bio-res usage states.auto_bioeb_sleep_interval = (3600, 3600) states.last_reply_bioeb_avocado = int( datetime.timestamp(event.date)) logger.warning( 'Waiting for infection release... [For skip just bioeb somebody]')