from config import instance import time import json import requests from loguru import logger instance_point = f"https://{instance}/api/v1" with open(".auth", mode='rt') as auth: tkn = auth.read().replace('\n', '') headers= { "Authorization": "Bearer " + tkn } def get_notifications(): params = { "limit": 15, "types": ["mention"] } success = 0 while success == 0: try: r = requests.get(instance_point + "/notifications", json=params, headers=headers) r.raise_for_status() success = 1 return r.json() except: logger.exception('Error get notificatios') time.sleep(30) logger.info('Retrying get notificatios...') def mark_as_read_notification(id_notification): success = 0 while success == 0: try: r = requests.post(instance_point + f"/notifications/{id_notification}/dismiss", headers=headers) r.raise_for_status() success = 1 return r.json() except: logger.exception(f'Error read notification {id_notification}') time.sleep(30) logger.info(f'Retrying read notification {id_notification}...') def get_status_context(status_id): success = 0 while success == 0: try: r = requests.get(instance_point + f"/statuses/{status_id}/context", headers=headers) r.raise_for_status() success = 1 return r.json() except: logger.exception(f'Ошибка получения контекста треда {status_id}') time.sleep(30) logger.info('Повторный запрос треда...') def get_status(status_id): success = 0 while success == 0: try: r = requests.get(instance_point + f"/statuses/{status_id}", headers=headers) r.raise_for_status() success = 1 return r.json() except: logger.exception(f'Error get status {status_id}') time.sleep(30) logger.info(f'Retrying get status {status_id}') def post_status(text, reply_to_status_id=None, poll_options=None, poll_expires=345600, attachments=None): poll = None if poll_options is not None: poll = { "options": poll_options, "expires_in": poll_expires, "multiple": True } params = { "status": text, "in_reply_to_id": reply_to_status_id, "visibility": "unlisted", "content_type": "text/plain", "language": "ru", "poll": poll } if attachments: params['media_ids'] = attachments success = 0 while success == 0: try: r = requests.post(instance_point + "/statuses", json=params, headers=headers) r.raise_for_status() success = 1 return r.json() except: logger.exception('Error send status, retrying...') time.sleep(5) def upload_attachment(file_path): file = { "file": open(file_path, mode='rb') } params = { "description": "Fediverse Movie Night\nВоскресенье, 21:00\nLIVE ON XXIV Production", } success = 0 while success == 0: try: r = requests.post(instance_point + "/media", params, files=file, headers=headers) r.raise_for_status() success = 1 return r.json()['id'] except: logger.exception(f'Error uploading {file_path} attachment') time.sleep(5) logger.info(f'Retrying upload {file_path}...') def mute_user(acct_id=str, acct=str, duration=None): params = { "duration": duration } success = 0 while success == 0: try: r = requests.post(instance_point + '/accounts' + f"/{acct_id}/mute", params, headers=headers) r.raise_for_status() logger.info(f'Пользователь {acct} был заглушен на {duration} secs') success = 1 except: logger.exception(f'Ошибка глушения {acct}') time.sleep(5) logger.info(f'Повторное глушение {acct}...')