FMN_bot/src/fedi_api.py

148 lines
4.1 KiB
Python

from config import instance
import time
import json
import requests
from loguru import logger
instance_point = f"https://{instance}/api/v1"
with open(".auth", mode='rt') as auth:
tkn = auth.read().replace('\n', '')
s = requests.Session()
s.headers.update({
"Authorization": "Bearer " + tkn,
"Accept-encoding": 'gzip'
})
def get_notifications():
params = {
"limit": 15,
"types": ["mention"]
}
success = 0
while success == 0:
try:
r = s.get(instance_point + "/notifications", json=params)
r.raise_for_status()
success = 1
return r.json()
except:
logger.exception('Error get notificatios')
time.sleep(30)
logger.info('Retrying get notificatios...')
def mark_as_read_notification(id_notification):
success = 0
while success == 0:
try:
r = s.post(instance_point + f"/notifications/{id_notification}/dismiss")
r.raise_for_status()
success = 1
return r.json()
except:
logger.exception(f'Error read notification {id_notification}')
time.sleep(30)
logger.info(f'Retrying read notification {id_notification}...')
def get_status_context(status_id):
success = 0
while success == 0:
try:
r = s.get(instance_point + f"/statuses/{status_id}/context")
r.raise_for_status()
success = 1
return r.json()
except:
logger.exception(f'Ошибка получения контекста треда {status_id}')
time.sleep(30)
logger.info('Повторный запрос треда...')
def get_status(status_id):
success = 0
while success == 0:
try:
r = s.get(instance_point + f"/statuses/{status_id}")
r.raise_for_status()
success = 1
return r.json()
except:
logger.exception(f'Error get status {status_id}')
time.sleep(30)
logger.info(f'Retrying get status {status_id}')
def post_status(text, reply_to_status_id=None, poll_options=None, poll_expires=345600, attachments=None):
poll = None
if poll_options is not None:
poll = {
"options": poll_options,
"expires_in": poll_expires,
"multiple": True
}
params = {
"status": text,
"in_reply_to_id": reply_to_status_id,
"visibility": "unlisted",
"content_type": "text/plain",
"language": "ru",
"poll": poll
}
if attachments:
params['media_ids'] = attachments
success = 0
while success == 0:
try:
r = s.post(instance_point + "/statuses", json=params)
r.raise_for_status()
success = 1
return r.json()
except:
logger.exception('Error send status, retrying...')
time.sleep(5)
def upload_attachment(file_path):
file = {
"file": open(file_path, mode='rb')
}
params = {
"description": "Fediverse Movie Night\nВоскресенье, 21:00\nLIVE ON XXIV Production",
}
success = 0
while success == 0:
try:
r = s.post(instance_point + "/media", params, files=file)
r.raise_for_status()
success = 1
return r.json()['id']
except:
logger.exception(f'Error uploading {file_path} attachment')
time.sleep(5)
logger.info(f'Retrying upload {file_path}...')
def mute_user(acct_id=str, acct=str, duration=None):
params = {
"duration": duration
}
success = 0
while success == 0:
try:
r = s.post(instance_point + '/accounts' + f"/{acct_id}/mute", params)
r.raise_for_status()
logger.info(f'Пользователь {acct} был заглушен на {duration} secs')
success = 1
except:
logger.exception(f'Ошибка глушения {acct}')
time.sleep(5)
logger.info(f'Повторное глушение {acct}...')