system-scripts/stasher.py

191 lines
7.2 KiB
Python
Executable File

#!/usr/bin/python
import os
import re
import sys
import datetime
import requests
import subprocess
from lxml import html
from dotenv import load_dotenv
import argparse
# Load variables from .env file
load_dotenv()
url_pattern = r'^http(s?):\/\/[-a-zA-Z0-9@:%._\+~#=]{2,256}\.[a-z]{2,6}\b[-a-zA-Z0-9@:%_\+.~#?&\/\/=]*$'
STASH_IMPORT_DIR = os.getenv("STASH_IMPORT_DIR_TEMP") or os.getenv("STASH_IMPORT_DIR")
STASH_API_KEY = os.getenv("STASH_API_KEY")
STASH_HOST = os.getenv("STASH_HOST")
STASH_PORT = os.getenv("STASH_PORT")
STASH_YTDLP_FORMAT = os.getenv("STASH_YTDLP_FORMAT")
STASH_PRINT_PREFIX = os.getenv("STASH_PRINT_PREFIX")
def find_booru_artist(page_url):
response = requests.get(page_url)
if response.status_code != 200:
print(f"Error: Unable to fetch page from {page_url}")
return None
# Parse the HTML content
tree = html.fromstring(response.content)
# Extract the artist name using XPath
artist_name = tree.xpath("/html/body/div[1]/section/ul/li[1]/a/text()") or tree.xpath("/html/body/div[1]/div[2]/div/div/aside/section[2]/div/ul[1]/li/a[2]/text()")
if not artist_name:
print("Warning: Artist name not found on the page.")
return None
# Clean up and format the artist name
artist_name = artist_name[0].strip()
artist_name = ''.join(c if c.isalnum() or c.isspace() else '_' for c in artist_name).lower().strip()
artist_name = artist_name.replace(' ', '_')
return artist_name
def update_stash():
print("Running scan for new items in Stash...")
url = f"http://{STASH_HOST}:{STASH_PORT}/graphql"
headers = {
"ApiKey": STASH_API_KEY,
"Content-Type": "application/json",
}
data = '{"query": "mutation { metadataScan (input:{useFileMetadata: false})}" }'
try:
response = requests.post(url, headers=headers, data=data)
if response.ok:
print("Update successful!")
else:
print(f"Update failed with status code: {response.status_code}")
print(response.text)
exit(1)
except requests.exceptions.RequestException as e:
print(f"Update error: {e}")
exit(1)
def download_file(file_url, download_dir, ytdlp_format):
extensions = "(jpg|JPG|jpeg|JPEG|png|PNG|gif|GIF|mp4|MP4)"
rgx_file = r"^.*\.{0}$".format(extensions)
rgx_filename = r"[A-Za-z0-9_]*\.{0}".format(extensions)
rgx_booru = r'https?://[a-z.]+/(index\.php.*id=([0-9]+)|posts/([0-9]+))'
rgx_booru_v1 = r'(https?://.*/original/([A-Za-z0-9/_]*\.{0})|https?://img[a-z0-9.]+\.[a-z]+\.com/(.*?)))'.format(extensions)
rgx_booru_v2 = r'(https?://.*/original/([A-Za-z0-9/_]*\.{0})|https?://img[a-z0-9.]+\.[a-z]+\.com/im)'.format(extensions)
rgx_booru_v2_vid = r'https?://video-cdn[0-9]{1}.*booru.*\.com/images/.*\.mp4'
artist = None
if re.match(rgx_booru, file_url):
artist = find_booru_artist(file_url)
booru_url = file_url
file_url = re.search(rgx_booru_v2, requests.get(booru_url).text)
file_url = file_url.group(1).strip() if file_url else None
if not file_url: #try video
file_url = re.search(rgx_booru_v2_vid, requests.get(booru_url).text)
file_url = file_url.group(0).strip() if file_url else None
if not file_url:
return 1
if re.match(rgx_file, file_url):
print(STASH_PRINT_PREFIX, file_url)
#if artist:
#print("Artist is:", artist)
try:
response = requests.get(file_url, stream=True)
if response.status_code == 200:
filename = ((artist + "__") if artist else "") + re.search(rgx_filename, file_url).group(0)
download_path = os.path.join(download_dir, filename)
exists = os.path.isfile(download_path)
if exists and not any(arg in sys.argv for arg in ("--overwrite", "-o")):
print("Destination file already exists:", filename)
return False
with open(download_path, 'wb') as f:
for chunk in response.iter_content(8192):
f.write(chunk)
print("Saved as:", filename)
return True
elif response.status_code == 403:
print("Error: HTTP 403 Forbidden - Access to the file is forbidden.")
return False
else:
print(f"Error: Failed to download the file. Status code: {response.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
return False
else:
print(STASH_PRINT_PREFIX, file_url)
download_path = os.path.join(download_dir, ytdlp_format)
command = ['yt-dlp', file_url, '-o', download_path, '--restrict-filenames']
try:
subprocess.run(command, check=True)
return True
except subprocess.CalledProcessError as e:
print(f"Failed to run yt-dlp command. Error: {e}")
return False
def is_path_or_url(arg): #chatgpt
global url_pattern
# Check if the argument is a valid file path
if os.path.exists(arg):
return arg
# Use regular expression to check if the argument matches a URL pattern
if re.match(url_pattern, arg):
return arg
# If no path or URL argument is found, return None
return None
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Download files or update stash.")
parser.add_argument("url_or_path", metavar="URL_or_path", nargs="?", help="URL or file path to download")
parser.add_argument("-o", "--overwrite", action="store_true", help="Overwrite existing files if present")
parser.add_argument("-u", "--update", action="store_true", help="Update stash")
parser.add_argument("-n", "--no-update", action="store_true", help="Do not update stash")
args = parser.parse_args()
if args.update and args.no_update:
print("Conflicting arguments: --update and --no-update cannot be used together.")
exit(1)
if args.update:
update_stash()
exit(0)
url_or_path = args.url_or_path
valid_args = [arg for arg in args.url_or_path if is_path_or_url(arg) is not None] if isinstance(args.url_or_path, list) else [args.url_or_path]
if valid_args is None or len(valid_args) == 0:
print("Valid URL or file path required")
exit(1)
for valid_url in valid_args:
if re.match(url_pattern, valid_url):
# Download using yt-dlp
if not download_file(valid_url, STASH_IMPORT_DIR, STASH_YTDLP_FORMAT):
print("Stopped")
exit(1)
else:
is_file = subprocess.check_output(["file", "-0", valid_url]).decode().split("\x00")[1]
if "text" in is_file:
# Download as multiple URLs from the provided source file
print(f"Reading list of {sum(1 for _ in open(valid_url))} URL(s)")
with open(valid_url) as source_file:
for url in source_file:
download_file(url.strip(), STASH_IMPORT_DIR, STASH_YTDLP_FORMAT)
else:
subprocess.run(["rsync", valid_url, STASH_IMPORT_DIR], check=True)
# Update stash
if not args.no_update:
update_stash()