191 lines
7.2 KiB
Python
Executable File
191 lines
7.2 KiB
Python
Executable File
#!/usr/bin/python
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import datetime
|
|
|
|
import requests
|
|
import subprocess
|
|
from lxml import html
|
|
from dotenv import load_dotenv
|
|
import argparse
|
|
|
|
# Load variables from .env file
|
|
load_dotenv()
|
|
|
|
url_pattern = r'^http(s?):\/\/[-a-zA-Z0-9@:%._\+~#=]{2,256}\.[a-z]{2,6}\b[-a-zA-Z0-9@:%_\+.~#?&\/\/=]*$'
|
|
|
|
STASH_IMPORT_DIR = os.getenv("STASH_IMPORT_DIR_TEMP") or os.getenv("STASH_IMPORT_DIR")
|
|
STASH_API_KEY = os.getenv("STASH_API_KEY")
|
|
STASH_HOST = os.getenv("STASH_HOST")
|
|
STASH_PORT = os.getenv("STASH_PORT")
|
|
STASH_YTDLP_FORMAT = os.getenv("STASH_YTDLP_FORMAT")
|
|
STASH_PRINT_PREFIX = os.getenv("STASH_PRINT_PREFIX")
|
|
|
|
def find_booru_artist(page_url):
|
|
response = requests.get(page_url)
|
|
|
|
if response.status_code != 200:
|
|
print(f"Error: Unable to fetch page from {page_url}")
|
|
return None
|
|
|
|
# Parse the HTML content
|
|
tree = html.fromstring(response.content)
|
|
|
|
# Extract the artist name using XPath
|
|
artist_name = tree.xpath("/html/body/div[1]/section/ul/li[1]/a/text()") or tree.xpath("/html/body/div[1]/div[2]/div/div/aside/section[2]/div/ul[1]/li/a[2]/text()")
|
|
if not artist_name:
|
|
print("Warning: Artist name not found on the page.")
|
|
return None
|
|
|
|
# Clean up and format the artist name
|
|
artist_name = artist_name[0].strip()
|
|
artist_name = ''.join(c if c.isalnum() or c.isspace() else '_' for c in artist_name).lower().strip()
|
|
artist_name = artist_name.replace(' ', '_')
|
|
|
|
return artist_name
|
|
|
|
def update_stash():
|
|
print("Running scan for new items in Stash...")
|
|
url = f"http://{STASH_HOST}:{STASH_PORT}/graphql"
|
|
headers = {
|
|
"ApiKey": STASH_API_KEY,
|
|
"Content-Type": "application/json",
|
|
}
|
|
data = '{"query": "mutation { metadataScan (input:{useFileMetadata: false})}" }'
|
|
try:
|
|
response = requests.post(url, headers=headers, data=data)
|
|
if response.ok:
|
|
print("Update successful!")
|
|
else:
|
|
print(f"Update failed with status code: {response.status_code}")
|
|
print(response.text)
|
|
exit(1)
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Update error: {e}")
|
|
exit(1)
|
|
|
|
def download_file(file_url, download_dir, ytdlp_format):
|
|
extensions = "(jpg|JPG|jpeg|JPEG|png|PNG|gif|GIF|mp4|MP4)"
|
|
rgx_file = r"^.*\.{0}$".format(extensions)
|
|
rgx_filename = r"[A-Za-z0-9_]*\.{0}".format(extensions)
|
|
rgx_booru = r'https?://[a-z.]+/(index\.php.*id=([0-9]+)|posts/([0-9]+))'
|
|
rgx_booru_v1 = r'(https?://.*/original/([A-Za-z0-9/_]*\.{0})|https?://img[a-z0-9.]+\.[a-z]+\.com/(.*?)))'.format(extensions)
|
|
rgx_booru_v2 = r'(https?://.*/original/([A-Za-z0-9/_]*\.{0})|https?://img[a-z0-9.]+\.[a-z]+\.com/im)'.format(extensions)
|
|
rgx_booru_v2_vid = r'https?://video-cdn[0-9]{1}.*booru.*\.com/images/.*\.mp4'
|
|
|
|
artist = None
|
|
if re.match(rgx_booru, file_url):
|
|
artist = find_booru_artist(file_url)
|
|
booru_url = file_url
|
|
file_url = re.search(rgx_booru_v2, requests.get(booru_url).text)
|
|
file_url = file_url.group(1).strip() if file_url else None
|
|
if not file_url: #try video
|
|
file_url = re.search(rgx_booru_v2_vid, requests.get(booru_url).text)
|
|
file_url = file_url.group(0).strip() if file_url else None
|
|
|
|
if not file_url:
|
|
return 1
|
|
|
|
if re.match(rgx_file, file_url):
|
|
print(STASH_PRINT_PREFIX, file_url)
|
|
#if artist:
|
|
#print("Artist is:", artist)
|
|
try:
|
|
response = requests.get(file_url, stream=True)
|
|
if response.status_code == 200:
|
|
filename = ((artist + "__") if artist else "") + re.search(rgx_filename, file_url).group(0)
|
|
download_path = os.path.join(download_dir, filename)
|
|
|
|
exists = os.path.isfile(download_path)
|
|
|
|
if exists and not any(arg in sys.argv for arg in ("--overwrite", "-o")):
|
|
print("Destination file already exists:", filename)
|
|
return False
|
|
|
|
with open(download_path, 'wb') as f:
|
|
for chunk in response.iter_content(8192):
|
|
f.write(chunk)
|
|
print("Saved as:", filename)
|
|
return True
|
|
elif response.status_code == 403:
|
|
print("Error: HTTP 403 Forbidden - Access to the file is forbidden.")
|
|
return False
|
|
else:
|
|
print(f"Error: Failed to download the file. Status code: {response.status_code}")
|
|
return False
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Error: {e}")
|
|
return False
|
|
else:
|
|
print(STASH_PRINT_PREFIX, file_url)
|
|
download_path = os.path.join(download_dir, ytdlp_format)
|
|
command = ['yt-dlp', file_url, '-o', download_path, '--restrict-filenames']
|
|
try:
|
|
subprocess.run(command, check=True)
|
|
return True
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Failed to run yt-dlp command. Error: {e}")
|
|
return False
|
|
|
|
def is_path_or_url(arg): #chatgpt
|
|
global url_pattern
|
|
|
|
# Check if the argument is a valid file path
|
|
if os.path.exists(arg):
|
|
return arg
|
|
|
|
# Use regular expression to check if the argument matches a URL pattern
|
|
if re.match(url_pattern, arg):
|
|
return arg
|
|
|
|
# If no path or URL argument is found, return None
|
|
return None
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Download files or update stash.")
|
|
parser.add_argument("url_or_path", metavar="URL_or_path", nargs="?", help="URL or file path to download")
|
|
parser.add_argument("-o", "--overwrite", action="store_true", help="Overwrite existing files if present")
|
|
parser.add_argument("-u", "--update", action="store_true", help="Update stash")
|
|
parser.add_argument("-n", "--no-update", action="store_true", help="Do not update stash")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.update and args.no_update:
|
|
print("Conflicting arguments: --update and --no-update cannot be used together.")
|
|
exit(1)
|
|
|
|
if args.update:
|
|
update_stash()
|
|
exit(0)
|
|
|
|
url_or_path = args.url_or_path
|
|
|
|
valid_args = [arg for arg in args.url_or_path if is_path_or_url(arg) is not None] if isinstance(args.url_or_path, list) else [args.url_or_path]
|
|
|
|
if valid_args is None or len(valid_args) == 0:
|
|
print("Valid URL or file path required")
|
|
exit(1)
|
|
|
|
for valid_url in valid_args:
|
|
if re.match(url_pattern, valid_url):
|
|
# Download using yt-dlp
|
|
if not download_file(valid_url, STASH_IMPORT_DIR, STASH_YTDLP_FORMAT):
|
|
print("Stopped")
|
|
exit(1)
|
|
else:
|
|
is_file = subprocess.check_output(["file", "-0", valid_url]).decode().split("\x00")[1]
|
|
if "text" in is_file:
|
|
# Download as multiple URLs from the provided source file
|
|
print(f"Reading list of {sum(1 for _ in open(valid_url))} URL(s)")
|
|
with open(valid_url) as source_file:
|
|
for url in source_file:
|
|
download_file(url.strip(), STASH_IMPORT_DIR, STASH_YTDLP_FORMAT)
|
|
else:
|
|
subprocess.run(["rsync", valid_url, STASH_IMPORT_DIR], check=True)
|
|
|
|
# Update stash
|
|
if not args.no_update:
|
|
update_stash()
|