mirror of
https://gitea.phreedom.club/localhost_frssoft/funkwlmpv
synced 2024-11-23 18:39:19 +02:00
115 lines
4.1 KiB
Python
115 lines
4.1 KiB
Python
import requests
|
|
import concurrent.futures
|
|
import argparse
|
|
from pathlib import Path
|
|
|
|
tracks_stor = []
|
|
with open('instances') as instances:
|
|
instances = instances.read().strip().split('\n')
|
|
|
|
|
|
parser = argparse.ArgumentParser(
|
|
prog='funkwhale playlist',
|
|
description='Create playlist from query or just random playlist tracks from funkwhale instances')
|
|
parser.add_argument('-s', '--search')
|
|
parser.add_argument('-t', '--tag')
|
|
parser.add_argument('-i', '--instance')
|
|
parser.add_argument('-r', '--recursion', type=int, default=0)
|
|
parser.add_argument('-d', '--depth', type=int, default=5)
|
|
args = parser.parse_args()
|
|
if args.instance:
|
|
instances = [args.instance]
|
|
|
|
|
|
def create_playlist_file(track_list):
|
|
with open('playlist.m3u8', 'w') as file:
|
|
file.write('#EXTM3U\n')
|
|
for i in track_list:
|
|
file.write('\n' + i)
|
|
|
|
|
|
def filter_tracks(tracks):
|
|
def remove_unreach_tracks(track):
|
|
try:
|
|
r = requests.head(track['listen_url'], timeout=1)
|
|
r.raise_for_status()
|
|
return 1
|
|
except:
|
|
return 0
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
|
|
before = len(tracks)
|
|
res = [executor.submit(remove_unreach_tracks, track) for track in tracks]
|
|
concurrent.futures.wait(res)
|
|
avalaible = []
|
|
for idx, track in enumerate(tracks):
|
|
is_avalaible = res[idx].result()
|
|
if is_avalaible == 1:
|
|
avalaible.append(track)
|
|
tracks = avalaible
|
|
after = before - len(tracks)
|
|
print(f'-{after} unreach tracks')
|
|
|
|
Path('filter_tags').touch()
|
|
Path('filter_artists').touch()
|
|
Path('filter_raw_urls').touch()
|
|
with open('filter_tags') as tags_file:
|
|
block_tags = tags_file.read().strip().split('\n')
|
|
|
|
with open('filter_artists') as artists_file:
|
|
block_artists = artists_file.read().strip().split('\n')
|
|
|
|
with open('filter_raw_urls') as raw_urls_file:
|
|
block_raw_urls = raw_urls_file.read().strip().split('\n')
|
|
|
|
for i in tracks:
|
|
if [tag.lower() for tag in i['tags']] in block_tags:
|
|
continue
|
|
if i['artist']['name'].lower() in block_artists:
|
|
continue
|
|
if i['listen_url'].lower() in block_raw_urls:
|
|
continue
|
|
tracks_stor.append(i)
|
|
|
|
|
|
|
|
def search_tracks_on_instance(instance, tag='', query='', recursion=args.recursion):
|
|
tracks = requests.get(f'https://{instance}/api/v1/tracks', params={'tag': tag, 'q': query,
|
|
'local': True, 'playable': True,
|
|
'ordering': 'random'}, timeout=10).json()
|
|
count = tracks['count']
|
|
print(f'found {count} tracks on {instance}')
|
|
if recursion == 1:
|
|
recursion_limit = 0
|
|
while tracks['next']:
|
|
try:
|
|
if recursion_limit >= args.depth:
|
|
break
|
|
new_tracks = requests.get(tracks['next']).json()
|
|
tracks['results'] += new_tracks['results']
|
|
tracks['next'] = new_tracks['next']
|
|
recursion_limit += 1
|
|
except Exception as E:
|
|
print(E)
|
|
tracks_replacer = []
|
|
for track in tracks['results']:
|
|
track['listen_url'] = f'https://{instance}' + track['listen_url']
|
|
tracks_replacer.append(track)
|
|
tracks['results'] = tracks_replacer
|
|
return tracks
|
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
|
|
res = [executor.submit(search_tracks_on_instance, instance, args.tag, args.search) for instance in instances]
|
|
concurrent.futures.wait(res)
|
|
playlist_files = []
|
|
for idx, instance in enumerate(instances):
|
|
try:
|
|
tracks = res[idx].result()
|
|
filter_tracks(tracks['results'])
|
|
except Exception as E:
|
|
print(E)
|
|
for track in tracks_stor:
|
|
artist, album, title, play_url = track['artist']['name'], track['album']['title'], track['title'], track['listen_url']
|
|
playlist_files.append(f'#EXTINF:-1,{artist} - {album} - {title}\n{play_url}')
|
|
create_playlist_file(playlist_files)
|