from src.fw_api import current_instance, get_radios, post_radio_session, get_track_radio, list_libraries, favorite_track, get_audio_file, hide_content from src.fw_libraries import libraries from src.fw_tags import list_tags from src.utils import download_track, track_info_output from src.mpv_control import player, track_url_to_uuid, player_fw_storage, soft_volume_reduce from src.settings import get_config from pyfzf.pyfzf import FzfPrompt from loguru import logger from shlex import quote import threading import time import sys fzf = FzfPrompt() @logger.catch def list_radios(error_given=None): radios = get_radios() count = radios.get('count') results = radios.get('results') view = [] for i in results: index = results.index(i) id_radio = i.get('id') name = i.get('name') descr = i.get('description') radio_option = name if descr and descr != "": radio_option += f' | {descr}' view.append(f'{index}.{radio_option}') # Radios avalaible only for auth user if current_instance.s.headers.get('Authorization'): view.append('Favourites') view.append('Less listened') view.extend(['Tag', 'Random', 'Libraries', 'Users', 'Recently Added']) header = f'Found {count} radios' if error_given: header += f'\n{error_given}' header = quote(header) selected = fzf.prompt( view, f'--header {header} --read0', delimiter="\0") if selected == []: return else: selected = selected[0].split('.', 1) if 'Favourites' in selected: radio_load(id_radio, 'favorites', name='your favorites tracks') elif 'Tag' in selected: tag = list_tags() radio_load(type_radio='tag', name=f'by tag: {tag}', related_object=tag) elif 'Random' in selected: radio_load(id_radio, 'random', name='totally random') elif 'Libraries' in selected: id_radio, type_radio, name_radio, related_obj = libraries(radio=True) radio_load(id_radio, type_radio, name_radio, related_obj) elif 'Users' in selected: libs = list_libraries()['results'] libraries_listing = [] for lib_i in libs: lib_name = lib_i.get('actor').get('full_username') if lib_name not in libraries_listing: libraries_listing.append(lib_name) libraries_listing.append('Custom') lib_addr = fzf.prompt(libraries_listing)[0] if lib_addr == 'Custom': print('Input remote user library (ex. nick@funkwhale.domain.example: ') lib_addr = input() radio_load(None, 'actor-content', lib_addr, lib_addr) elif 'Recently Added' in selected: radio_load(id_radio, 'recently-added', name='Newest content on the network') elif 'Less listened' in selected: radio_load(id_radio, 'less-listened', name="Less listened tracks") else: id_selected = selected[0] id_radio = results[int(id_selected)].get('id') name_radio = results[int(id_selected)].get('name') radio_load(id_radio, name=name_radio) def radio_generator(radio_session_id): count_t = 0 while radio_session_id != '': time.sleep(1) if not radio_event_gen.wait(0): break count_t += 1 if count_t >= 60: count_t = 0 playlist_remaining = len(player.playlist) - \ player.playlist_current_pos if playlist_remaining <= 2: radio_get_track(radio_session_id) sys.stdout.write('\rRadio generator stopped') sys.stdout.flush() radio_event_gen = threading.Event() @logger.catch def radio_load(id_radio=None, type_radio='custom', name=None, related_object=None): show_like_button = get_config('show_like_button') player.volume = get_config('mpv_volume') requested_radio = { 'custom_radio': id_radio, 'radio_type': type_radio, 'related_object_id': related_object } radio_session_id = post_radio_session(requested_radio).get('id') for i in range(0, 2): try: radio_get_track(radio_session_id, first_run=True) except Exception as E: return list_radios(error_given=f'Error: {E}') radio_event_gen.set() radio_task = threading.Thread( target=radio_generator, args=(radio_session_id,), daemon=True) radio_task.start() player_items_menu = ['Next', 'Prev', 'Pause', 'Download', 'Info'] if show_like_button: player_items_menu.append('Like') player_items_menu.extend(['Hide artist', 'Exit']) while True: try: if player.pause: player_items_menu[2] = 'Play' else: player_items_menu[2] = 'Pause' try: select = fzf.prompt(player_items_menu, quote(f"--header=\'Radio {name} playing...\'"))[0] except: select = 'Exit' if select == 'Next': playlist_remaining = player.playlist_count - player.playlist_current_pos if playlist_remaining <= 2: threading.Thread(target=radio_get_track, args=( radio_session_id,), daemon=True).start() if playlist_remaining > 1: player.playlist_next() else: print('No more tracks, please wait for new...') time.sleep(3) elif select == 'Prev': player.playlist_prev() elif select in ('Pause', 'Play'): if player.pause: player.pause = False else: player.pause = True elif select == 'Hide artist': track = player_fw_storage.storage.get(track_url_to_uuid()) hide_content( {'target': {'id': track.get('artist').get('id'), 'type': 'artist'}}) elif select == 'Download': name_downloaded = download_track(player.stream_open_filename) elif select == 'Info': track = player_fw_storage.storage.get(track_url_to_uuid()) track['direct_url'] = player.stream_open_filename track_info_output(track) elif select == 'Like': favorite_track(player_fw_storage.storage.get( track_url_to_uuid())['id']) elif select == 'Exit': radio_event_gen.clear() soft_volume_reduce() player.playlist_clear() player.stop() player_fw_storage.storage = {} break except Exception as E: radio_event_gen.clear() player.playlist_clear() player.stop() player_fw_storage.storage = {} logger.exception(f'Radio force stopped: {E}') break def radio_get_track(radio_session_id, first_run=False): radio_context = get_track_radio({'session': radio_session_id}) if not radio_context: return if isinstance(radio_context, str): logger.error(radio_context) if radio_context == "Radio doesn't have more candidates": radio_event_gen.clear() if first_run: radio_context = 'This radio may be private or haven\'t tracks' raise IOError(radio_context) return if radio_context.get('error'): logger.error(radio_context.get('error')) return else: track = radio_context.get('track') listen_url = track['listen_url'] player_fw_storage.storage[track_url_to_uuid(listen_url)] = track player.loadfile(get_audio_file( listen_url, listen_url=True), 'append-play')