import src.fw_api from src.utils import download_track from src.settings import get_config from loguru import logger from pyfzf.pyfzf import FzfPrompt import mpv import time import sys fzf = FzfPrompt() player = mpv.MPV() player.ytdl = False # Prevent attempts load track with yt-dlp player.volume = get_config('mpv_volume') player.prefetch_playlist = get_config('prefetch_playlist') show_like_button = get_config('show_like_button') track_activity_history = get_config('track_activity_history') class player_fw_storage: storage = {} @logger.catch def track_url_to_uuid(listen_url=None): '''Attempt get uuid from track listen url or current playing url''' if listen_url: uuid = listen_url.split(r'/')[-2] else: uuid = player.stream_open_filename.split(r'/')[-2] return uuid if track_activity_history: @player.property_observer('time-pos') @logger.catch def time_observer(_name, value): # Here, _value is either None if nothing is playing or a float containing # fractional seconds since the beginning of the file. if value and player.http_header_fields != [] and player.pause is False: if value >= 30.0 and value <= 30.1: # detect 30 secs for reporting listen activity track = player_fw_storage.storage.get(track_url_to_uuid()) track_id = track.get('id') if track_id: src.fw_api.record_track_in_history(track_id) else: logger.error("Can't write track to history: No track id") time.sleep(1) @player.property_observer('filtered-metadata') @logger.catch def osd_observer(_name, value): '''Sumulate osd playing message in console''' if value: osd_message = [] for i in value.items(): if i[0] in ('Artist', 'Album', 'Title'): osd_message.append(i[1]) sys.stdout.write('\r ') sys.stdout.write('\r'+' — '.join(osd_message)) sys.stdout.flush() def set_http_header(headers=[]): player.http_header_fields = headers @logger.catch def player_menu(header='', storage={}): player_fw_storage.storage.update(storage) player.volume = get_config("mpv_volume") while True: try: player_items_menu = ['Next', 'Prev', 'Pause', 'Download', 'Info'] if player.pause: player_items_menu[2] = 'Play' else: player_items_menu[2] = 'Pause' if show_like_button: player_items_menu.append('Like') player_items_menu.extend(['Hide artist', 'Exit']) select = fzf.prompt(player_items_menu, f"--header=\'{header}\'")[0] if select == 'Next': try: player.playlist_next() except: print('No more next tracks') elif select == 'Prev': player.playlist_prev() elif select in ('Pause', 'Play'): if player.pause: player.pause = False else: player.pause = True elif select == 'Download': name_downloaded = download_track(player.stream_open_filename) elif select == 'Info': track = player_fw_storage.storage.get(track_url_to_uuid()) for i in track.keys(): if i in ('album', 'artist'): name_aa = track.get(i).get('name') if not name_aa: name_aa = track.get(i).get('title') print(i + ': ' + name_aa) key = track.get(i) if key and isinstance(key, str): print(i + ': ' + key) print('Direct link: ' + player.stream_open_filename) input() elif select == 'Like': src.fw_api.favorite_track( player_fw_storage.storage.get(track_url_to_uuid())['id']) elif select == 'Hide artist': track = player_fw_storage.storage.get(track_url_to_uuid()) src.fw_api.hide_content( {'target': {'id': track.get('artist').get('id'), 'type': 'artist'}}) elif select == 'Exit': player.playlist_clear() player.stop() player_fw_storage.storage = {} break except KeyboardInterrupt: break def play_track(track, multi=False): listen_url = src.fw_api.get_audio_file(track['listen_url'], True) player_fw_storage.storage[track_url_to_uuid(listen_url)] = track if multi: player.loadfile(listen_url, 'append-play') else: player.loadfile(listen_url, 'append-play') track_name = track.get('title') player_menu(f"{track_name} playing...", player_fw_storage.storage)