FMN_bot/src/listener_mention.py

80 lines
4.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from src.fedi_api import get_notifications, mark_as_read_notification, post_status, upload_attachment
from src.fmn_states_db import add_state, get_state
from config import admins_bot, limit_movies_per_user, limit_all_movies_poll, hour_poll_posting, fmn_next_watching_hour
import threading, time
from datetime import datetime
from dateutil.parser import parse as dateutilparse
from dateutil.relativedelta import relativedelta, TU, SU
def get_control_mention():
while True:
time_now = datetime.now()
now_week = time_now.weekday()
if now_week not in (0, 6):
time.sleep(30)
continue
if get_state('last_thread_id'):
time.sleep(30)
continue
notif = get_notifications()
for i in notif:
if i['type'] != "mention":
continue
seen = i['pleroma']['is_seen']
acct_mention = i['account']['acct']
reply_to_id = i['status']['in_reply_to_id']
if acct_mention in admins_bot and seen == False and reply_to_id == None and now_week in (0, 6):
st_id = i['status']['id']
st_date = i['status']['created_at']
thread_created_at = dateutilparse(st_date)
delta = relativedelta(hour=hour_poll_posting, minute=0, second=0, weekday=TU(1))
stop_thread_scan = thread_created_at + delta
movies_accept_time = stop_thread_scan.strftime('%H:%M %d.%m.%Y MSK')
stop_thread_scan = time.mktime(time.struct_time(stop_thread_scan.timetuple()))
if now_week == 6: # Фикс стыков двух недель. Если вс, то расчитываем на следующую неделю
next_week = 2
else:
next_week = 1
next_movie_watching_delta = relativedelta(hour=fmn_next_watching_hour, minute=0, second=0, weekday=SU(next_week))
next_movie_watching = time_now + next_movie_watching_delta
max_mute_time = time.mktime(time.struct_time(next_movie_watching.timetuple())) # Глушение до следующего сеанса FMN.
next_movie_watching = next_movie_watching.strftime('%d.%m.%Y')
post_status(start_collect_movies_text(movies_accept_time, next_movie_watching), st_id, attachments=[upload_attachment('src/FMN.png')])
time.sleep(0.2)
mark_as_read_notification(i['id'])
add_state('max_mute_time', int(max_mute_time))
add_state('stop_thread_scan', int(stop_thread_scan))
add_state('last_thread_id', st_id)
break
time.sleep(30)
def start_collect_movies_text(movies_accept_time=str, next_movie_watching=str):
text = f'''
Начинаем прием заявок на следующий вечерний киносеанс, запланированный на {next_movie_watching} в 21:00 по Москве.
Напоминаем правила:
- Мы принимаем на просмотр полнометражные художественные фильмы;
- Прием варианта осуществляется путем публикации ссылки на этот фильм на IMDB или Кинопоиске в этом треде;
- Нам не подходят: сериалы, короткометражные и документальные фильмы;
- Максимальное количество вариантов, предложенных одним человеком не должно превышать {limit_movies_per_user};
- Всего может быть собрано до {limit_all_movies_poll} фильмов;
- Заявки принимаются до крайнего срока, после чего будет объявлено голосование по собранным вариантам.
Крайний срок подачи заявки - {movies_accept_time}.
Желаем удачи.
'''.replace('\t', '')
return text
def run_scan_notif():
scan_notif = threading.Thread(target=get_control_mention, daemon=True)
scan_notif.start()