#!/usr/bin/python import os import re import sys import datetime import requests import subprocess from lxml import html from dotenv import load_dotenv import argparse # Load variables from .env file load_dotenv() url_pattern = r'^http(s?):\/\/[-a-zA-Z0-9@:%._\+~#=]{2,256}\.[a-z]{2,6}\b[-a-zA-Z0-9@:%_\+.~#?&\/\/=]*$' STASH_IMPORT_DIR = os.getenv("STASH_IMPORT_DIR_TEMP") or os.getenv("STASH_IMPORT_DIR") STASH_API_KEY = os.getenv("STASH_API_KEY") STASH_HOST = os.getenv("STASH_HOST") STASH_PORT = os.getenv("STASH_PORT") STASH_YTDLP_FORMAT = os.getenv("STASH_YTDLP_FORMAT") STASH_PRINT_PREFIX = os.getenv("STASH_PRINT_PREFIX") def find_booru_artist(page_url): response = requests.get(page_url) if response.status_code != 200: print(f"Error: Unable to fetch page from {page_url}") return None # Parse the HTML content tree = html.fromstring(response.content) # Extract the artist name using XPath artist_name = tree.xpath("/html/body/div[1]/section/ul/li[1]/a/text()") or tree.xpath("/html/body/div[1]/div[2]/div/div/aside/section[2]/div/ul[1]/li/a[2]/text()") if not artist_name: print("Warning: Artist name not found on the page.") return None # Clean up and format the artist name artist_name = artist_name[0].strip() artist_name = ''.join(c if c.isalnum() or c.isspace() else '_' for c in artist_name).lower().strip() artist_name = artist_name.replace(' ', '_') return artist_name def update_stash(): print("Running scan for new items in Stash...") url = f"http://{STASH_HOST}:{STASH_PORT}/graphql" headers = { "ApiKey": STASH_API_KEY, "Content-Type": "application/json", } data = '{"query": "mutation { metadataScan (input:{useFileMetadata: false})}" }' try: response = requests.post(url, headers=headers, data=data) if response.ok: print("Update successful!") else: print(f"Update failed with status code: {response.status_code}") print(response.text) exit(1) except requests.exceptions.RequestException as e: print(f"Update error: {e}") exit(1) def download_file(file_url, download_dir, ytdlp_format): extensions = "(jpg|JPG|jpeg|JPEG|png|PNG|gif|GIF|mp4|MP4)" rgx_file = r"^.*\.{0}$".format(extensions) rgx_filename = r"[A-Za-z0-9_]*\.{0}".format(extensions) rgx_booru = r'https?://[a-z.]+/(index\.php.*id=([0-9]+)|posts/([0-9]+))' rgx_booru_v1 = r'(https?://.*/original/([A-Za-z0-9/_]*\.{0})|https?://img[a-z0-9.]+\.[a-z]+\.com/(.*?)))'.format(extensions) rgx_booru_v2 = r'(https?://.*/original/([A-Za-z0-9/_]*\.{0})|https?://img[a-z0-9.]+\.[a-z]+\.com/im)'.format(extensions) rgx_booru_v2_vid = r'https?://video-cdn[0-9]{1}.*booru.*\.com/images/.*\.mp4' artist = None if re.match(rgx_booru, file_url): artist = find_booru_artist(file_url) booru_url = file_url file_url = re.search(rgx_booru_v2, requests.get(booru_url).text) file_url = file_url.group(1).strip() if file_url else None if not file_url: #try video file_url = re.search(rgx_booru_v2_vid, requests.get(booru_url).text) file_url = file_url.group(0).strip() if file_url else None if not file_url: return 1 if re.match(rgx_file, file_url): print(STASH_PRINT_PREFIX, file_url) #if artist: #print("Artist is:", artist) try: response = requests.get(file_url, stream=True) if response.status_code == 200: filename = ((artist + "__") if artist else "") + re.search(rgx_filename, file_url).group(0) download_path = os.path.join(download_dir, filename) exists = os.path.isfile(download_path) if exists and not any(arg in sys.argv for arg in ("--overwrite", "-o")): print("Destination file already exists:", filename) return False with open(download_path, 'wb') as f: for chunk in response.iter_content(8192): f.write(chunk) print("Saved as:", filename) return True elif response.status_code == 403: print("Error: HTTP 403 Forbidden - Access to the file is forbidden.") return False else: print(f"Error: Failed to download the file. Status code: {response.status_code}") return False except requests.exceptions.RequestException as e: print(f"Error: {e}") return False else: print(STASH_PRINT_PREFIX, file_url) download_path = os.path.join(download_dir, ytdlp_format) command = ['yt-dlp', file_url, '-o', download_path, '--restrict-filenames'] try: subprocess.run(command, check=True) return True except subprocess.CalledProcessError as e: print(f"Failed to run yt-dlp command. Error: {e}") return False def is_path_or_url(arg): #chatgpt global url_pattern # Check if the argument is a valid file path if os.path.exists(arg): return arg # Use regular expression to check if the argument matches a URL pattern if re.match(url_pattern, arg): return arg # If no path or URL argument is found, return None return None if __name__ == "__main__": parser = argparse.ArgumentParser(description="Download files or update stash.") parser.add_argument("url_or_path", metavar="URL_or_path", nargs="?", help="URL or file path to download") parser.add_argument("-o", "--overwrite", action="store_true", help="Overwrite existing files if present") parser.add_argument("-u", "--update", action="store_true", help="Update stash") parser.add_argument("-n", "--no-update", action="store_true", help="Do not update stash") args = parser.parse_args() if args.update and args.no_update: print("Conflicting arguments: --update and --no-update cannot be used together.") exit(1) if args.update: update_stash() exit(0) url_or_path = args.url_or_path valid_args = [arg for arg in args.url_or_path if is_path_or_url(arg) is not None] if isinstance(args.url_or_path, list) else [args.url_or_path] if valid_args is None or len(valid_args) == 0: print("Valid URL or file path required") exit(1) for valid_url in valid_args: if re.match(url_pattern, valid_url): # Download using yt-dlp if not download_file(valid_url, STASH_IMPORT_DIR, STASH_YTDLP_FORMAT): print("Stopped") exit(1) else: is_file = subprocess.check_output(["file", "-0", valid_url]).decode().split("\x00")[1] if "text" in is_file: # Download as multiple URLs from the provided source file print(f"Reading list of {sum(1 for _ in open(valid_url))} URL(s)") with open(valid_url) as source_file: for url in source_file: download_file(url.strip(), STASH_IMPORT_DIR, STASH_YTDLP_FORMAT) else: subprocess.run(["rsync", valid_url, STASH_IMPORT_DIR], check=True) # Update stash if not args.no_update: update_stash()