Add moderation TUI
This ended up way fancier than I imagined.
This commit is contained in:
parent
dcea8bffe1
commit
eebd5d8c6d
42
README.rst
42
README.rst
|
@ -48,6 +48,48 @@ Before running the service for the first time and every time you update it
|
|||
from this git repository, run ``FLASK_APP=fhost flask db upgrade``.
|
||||
|
||||
|
||||
Moderation UI
|
||||
-------------
|
||||
|
||||
0x0 features a TUI program for file moderation. With it, you can view a list
|
||||
of uploaded files, as well as extended information on them. It allows you to
|
||||
take actions like removing files temporarily or permanently, as well as
|
||||
blocking IP addresses and associated files.
|
||||
|
||||
If a sufficiently recent version of python-mpv with libmpv is present and
|
||||
your terminal supports it, you also get graphical file previews, including
|
||||
video playback. Upstream mpv currently supports sixel graphics, but there is
|
||||
`an open pull request <https://github.com/mpv-player/mpv/pull/11002>`_ that
|
||||
adds support for the `kitty graphics protocol <https://sw.kovidgoyal.net/kitty/graphics-protocol/>`_.
|
||||
For this to work, set the ``MOD_PREVIEW_PROTO`` option in ``instance/config.py``.
|
||||
|
||||
Requirements:
|
||||
|
||||
* `Textual <https://textual.textualize.io/>`_
|
||||
|
||||
Optional:
|
||||
|
||||
* `python-mpv <https://github.com/jaseg/python-mpv>`_
|
||||
(graphical previews)
|
||||
* `PyAV <https://github.com/PyAV-Org/PyAV>`_
|
||||
(information on multimedia files)
|
||||
* `PyMuPDF <https://github.com/pymupdf/PyMuPDF>`_
|
||||
(previews and file information for PDF, XPS, EPUB, MOBI and FB2)
|
||||
* `libarchive-c <https://github.com/Changaco/python-libarchive-c>`_
|
||||
(archive content listing)
|
||||
|
||||
.. note::
|
||||
`Mosh <https://mosh.org/>`_ currently does not support sixels or kitty graphics.
|
||||
|
||||
.. hint::
|
||||
You may need to set the ``COLORTERM`` environment variable to
|
||||
``truecolor``.
|
||||
|
||||
.. tip::
|
||||
Using compression with SSH (``-C`` option) can significantly
|
||||
reduce the bandwidth requirements for graphics.
|
||||
|
||||
|
||||
NSFW Detection
|
||||
--------------
|
||||
|
||||
|
|
|
@ -58,6 +58,17 @@ FHOST_MIN_EXPIRATION = 30 * 24 * 60 * 60 * 1000
|
|||
FHOST_MAX_EXPIRATION = 365 * 24 * 60 * 60 * 1000
|
||||
|
||||
|
||||
# This should be detected automatically when running behind a reverse proxy, but needs
|
||||
# to be set for URL resolution to work in e.g. the moderation UI.
|
||||
# SERVER_NAME = "example.com"
|
||||
|
||||
|
||||
# Specifies which graphics protocol to use for the media previews in the moderation UI.
|
||||
# Requires pympv with libmpv >= 0.36.0 and terminal support.
|
||||
# Available choices are "sixel" and "kitty".
|
||||
# MOD_PREVIEW_PROTO = "sixel"
|
||||
|
||||
|
||||
# Use the X-SENDFILE header to speed up serving files w/ compatible webservers
|
||||
#
|
||||
# Some webservers can be configured use the X-Sendfile header to handle sending
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
#ftable {
|
||||
width: 1fr;
|
||||
}
|
||||
|
||||
#infopane {
|
||||
width: 50%;
|
||||
outline-top: hkey $primary;
|
||||
background: $panel;
|
||||
}
|
||||
|
||||
#finfo {
|
||||
background: $boost;
|
||||
height: 12;
|
||||
width: 1fr;
|
||||
box-sizing: content-box;
|
||||
}
|
||||
|
||||
#mpv {
|
||||
display: none;
|
||||
height: 20%;
|
||||
width: 1fr;
|
||||
content-align: center middle;
|
||||
}
|
||||
|
||||
#ftextlog {
|
||||
height: 1fr;
|
||||
width: 1fr;
|
||||
}
|
||||
|
||||
#filter_container {
|
||||
height: auto;
|
||||
display: none;
|
||||
}
|
||||
|
||||
#filter_label {
|
||||
content-align: right middle;
|
||||
height: 1fr;
|
||||
width: 20%;
|
||||
margin: 0 1 0 2;
|
||||
}
|
||||
|
||||
#filter_input {
|
||||
width: 1fr;
|
||||
}
|
||||
|
||||
Notification {
|
||||
dock: bottom;
|
||||
layer: notification;
|
||||
width: auto;
|
||||
margin: 2 4;
|
||||
padding: 1 2;
|
||||
background: $background;
|
||||
color: $text;
|
||||
height: auto;
|
||||
|
||||
}
|
|
@ -0,0 +1,279 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from itertools import zip_longest
|
||||
from sys import stdout
|
||||
import time
|
||||
|
||||
from textual.app import App, ComposeResult
|
||||
from textual.widgets import DataTable, Header, Footer, TextLog, Static, Input
|
||||
from textual.containers import Horizontal, Vertical
|
||||
from textual.screen import Screen
|
||||
from textual import log
|
||||
from rich.text import Text
|
||||
from jinja2.filters import do_filesizeformat
|
||||
|
||||
from fhost import db, File, su, app as fhost_app, in_upload_bl
|
||||
from modui import *
|
||||
|
||||
fhost_app.app_context().push()
|
||||
|
||||
class NullptrMod(Screen):
|
||||
BINDINGS = [
|
||||
("q", "quit_app", "Quit"),
|
||||
("f1", "filter(1, 'Lookup name:')", "Lookup name"),
|
||||
("f2", "filter(2, 'Filter IP address:')", "Filter IP"),
|
||||
("f3", "filter(3, 'Filter MIME Type:')", "Filter MIME"),
|
||||
("f4", "filter(4, 'Filter extension:')", "Filter Ext."),
|
||||
("f5", "refresh", "Refresh"),
|
||||
("f6", "filter_clear", "Clear filter"),
|
||||
("r", "remove_file(False)", "Remove file"),
|
||||
("ctrl+r", "remove_file(True)", "Ban file"),
|
||||
("p", "ban_ip(False)", "Ban IP"),
|
||||
("ctrl+p", "ban_ip(True)", "Nuke IP"),
|
||||
]
|
||||
|
||||
async def action_quit_app(self):
|
||||
self.mpvw.shutdown()
|
||||
await self.app.action_quit()
|
||||
|
||||
def action_refresh(self):
|
||||
ftable = self.query_one("#ftable")
|
||||
ftable.watch_query(None, None)
|
||||
|
||||
def action_filter_clear(self):
|
||||
self.query_one("#filter_container").display = False
|
||||
ftable = self.query_one("#ftable")
|
||||
ftable.focus()
|
||||
ftable.query = ftable.base_query
|
||||
|
||||
def action_filter(self, fcol: int, label: str):
|
||||
self.query_one("#filter_label").update(label)
|
||||
finput = self.query_one("#filter_input")
|
||||
self.filter_col = fcol
|
||||
self.query_one("#filter_container").display = True
|
||||
finput.focus()
|
||||
self._refresh_layout()
|
||||
|
||||
if self.current_file:
|
||||
match fcol:
|
||||
case 1: finput.value = ""
|
||||
case 2: finput.value = self.current_file.addr
|
||||
case 3: finput.value = self.current_file.mime
|
||||
case 4: finput.value = self.current_file.ext
|
||||
|
||||
def on_input_submitted(self, message: Input.Submitted) -> None:
|
||||
self.query_one("#filter_container").display = False
|
||||
ftable = self.query_one("#ftable")
|
||||
ftable.focus()
|
||||
|
||||
if len(message.value):
|
||||
match self.filter_col:
|
||||
case 1:
|
||||
try: ftable.query = ftable.base_query.filter(File.id == su.debase(message.value))
|
||||
except ValueError: pass
|
||||
case 2: ftable.query = ftable.base_query.filter(File.addr == message.value)
|
||||
case 3: ftable.query = ftable.base_query.filter(File.mime.like(message.value))
|
||||
case 4: ftable.query = ftable.base_query.filter(File.ext.like(message.value))
|
||||
else:
|
||||
ftable.query = ftable.base_query
|
||||
|
||||
def action_remove_file(self, permanent: bool) -> None:
|
||||
if self.current_file:
|
||||
self.current_file.delete(permanent)
|
||||
db.session.commit()
|
||||
self.mount(Notification(f"{'Banned' if permanent else 'Removed'} file {self.current_file.getname()}"))
|
||||
self.action_refresh()
|
||||
|
||||
def action_ban_ip(self, nuke: bool) -> None:
|
||||
if self.current_file:
|
||||
if not fhost_app.config["FHOST_UPLOAD_BLACKLIST"]:
|
||||
self.mount(Notification("Failed: FHOST_UPLOAD_BLACKLIST not set!"))
|
||||
return
|
||||
else:
|
||||
if in_upload_bl(self.current_file.addr):
|
||||
txt = f"{self.current_file.addr} is already banned"
|
||||
else:
|
||||
with fhost_app.open_instance_resource(fhost_app.config["FHOST_UPLOAD_BLACKLIST"], "a") as bl:
|
||||
print(self.current_file.addr.lstrip("::ffff:"), file=bl)
|
||||
txt = f"Banned {self.current_file.addr}"
|
||||
|
||||
if nuke:
|
||||
tsize = 0
|
||||
trm = 0
|
||||
for f in File.query.filter(File.addr == self.current_file.addr):
|
||||
if f.getpath().is_file():
|
||||
tsize += f.size or f.getpath().stat().st_size
|
||||
trm += 1
|
||||
f.delete(True)
|
||||
db.session.commit()
|
||||
txt += f", removed {trm} {'files' if trm != 1 else 'file'} totaling {jinja2.filters.do_filesizeformat(tsize, True)}"
|
||||
self.mount(Notification(txt))
|
||||
self._refresh_layout()
|
||||
ftable = self.query_one("#ftable")
|
||||
ftable.watch_query(None, None)
|
||||
|
||||
def on_update(self) -> None:
|
||||
stdout.write("\033[?25l")
|
||||
stdout.flush()
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header()
|
||||
yield Horizontal(
|
||||
FileTable(id="ftable", zebra_stripes=True),
|
||||
Vertical(
|
||||
DataTable(id="finfo", show_header=False),
|
||||
MpvWidget(id="mpv"),
|
||||
TextLog(id="ftextlog"),
|
||||
id="infopane"))
|
||||
yield Horizontal(Static("Filter:", id="filter_label"), Input(id="filter_input"), id="filter_container")
|
||||
yield Footer()
|
||||
|
||||
def on_mount(self) -> None:
|
||||
self.current_file = None
|
||||
|
||||
self.ftable = self.query_one("#ftable")
|
||||
self.ftable.focus()
|
||||
|
||||
self.finfo = self.query_one("#finfo")
|
||||
self.finfo.add_columns("key", "value")
|
||||
|
||||
self.mpvw = self.query_one("#mpv")
|
||||
self.ftlog = self.query_one("#ftextlog")
|
||||
|
||||
self.mimehandler = mime.MIMEHandler()
|
||||
self.mimehandler.register(mime.MIMECategory.Archive, self.handle_libarchive)
|
||||
self.mimehandler.register(mime.MIMECategory.Text, self.handle_text)
|
||||
self.mimehandler.register(mime.MIMECategory.AV, self.handle_mpv)
|
||||
self.mimehandler.register(mime.MIMECategory.Document, self.handle_mupdf)
|
||||
self.mimehandler.register(mime.MIMECategory.Fallback, self.handle_libarchive)
|
||||
self.mimehandler.register(mime.MIMECategory.Fallback, self.handle_mpv)
|
||||
self.mimehandler.register(mime.MIMECategory.Fallback, self.handle_raw)
|
||||
|
||||
def handle_libarchive(self, cat):
|
||||
import libarchive
|
||||
with libarchive.file_reader(str(self.current_file.getpath())) as a:
|
||||
self.ftlog.write("\n".join(e.path for e in a))
|
||||
return True
|
||||
|
||||
def handle_text(self, cat):
|
||||
with open(self.current_file.getpath(), "r") as sf:
|
||||
data = sf.read(1000000).replace("\033","")
|
||||
self.ftlog.write(data)
|
||||
return True
|
||||
|
||||
def handle_mupdf(self, cat):
|
||||
import fitz
|
||||
with fitz.open(self.current_file.getpath(),
|
||||
filetype=self.current_file.ext.lstrip(".")) as doc:
|
||||
p = doc.load_page(0)
|
||||
pix = p.get_pixmap(dpi=72)
|
||||
imgdata = pix.tobytes("ppm").hex()
|
||||
|
||||
self.mpvw.styles.height = "40%"
|
||||
self.mpvw.start_mpv("hex://" + imgdata, 0)
|
||||
|
||||
self.ftlog.write(Text.from_markup(f"[bold]Pages:[/bold] {doc.page_count}"))
|
||||
self.ftlog.write(Text.from_markup("[bold]Metadata:[/bold]"))
|
||||
for k, v in doc.metadata.items():
|
||||
self.ftlog.write(Text.from_markup(f" [bold]{k}:[/bold] {v}"))
|
||||
toc = doc.get_toc()
|
||||
if len(toc):
|
||||
self.ftlog.write(Text.from_markup("[bold]TOC:[/bold]"))
|
||||
for lvl, title, page in toc:
|
||||
self.ftlog.write(f"{' ' * lvl} {page}: {title}")
|
||||
return True
|
||||
|
||||
def handle_mpv(self, cat):
|
||||
if cat == mime.MIMECategory.AV or self.current_file.nsfw_score >= 0:
|
||||
self.mpvw.styles.height = "20%"
|
||||
self.mpvw.start_mpv(str(self.current_file.getpath()), 0)
|
||||
|
||||
import av
|
||||
with av.open(str(self.current_file.getpath())) as c:
|
||||
self.ftlog.write(Text("Format:", style="bold"))
|
||||
self.ftlog.write(f" {c.format.long_name}")
|
||||
if len(c.metadata):
|
||||
self.ftlog.write(Text("Metadata:", style="bold"))
|
||||
for k, v in c.metadata.items():
|
||||
self.ftlog.write(f" {k}: {v}")
|
||||
for s in c.streams:
|
||||
self.ftlog.write(Text(f"Stream {s.index}:", style="bold"))
|
||||
self.ftlog.write(f" Type: {s.type}")
|
||||
if s.base_rate:
|
||||
self.ftlog.write(f" Frame rate: {s.base_rate}")
|
||||
if len(s.metadata):
|
||||
self.ftlog.write(Text(" Metadata:", style="bold"))
|
||||
for k, v in s.metadata.items():
|
||||
self.ftlog.write(f" {k}: {v}")
|
||||
return True
|
||||
return False
|
||||
|
||||
def handle_raw(self, cat):
|
||||
def hexdump(binf, length):
|
||||
def fmt(s):
|
||||
if isinstance(s, str):
|
||||
c = chr(int(s, 16))
|
||||
else:
|
||||
c = chr(s)
|
||||
s = c
|
||||
if c.isalpha(): return f"\0[chartreuse1]{s}\0[/chartreuse1]"
|
||||
if c.isdigit(): return f"\0[gold1]{s}\0[/gold1]"
|
||||
if not c.isprintable():
|
||||
g = "grey50" if c == "\0" else "cadet_blue"
|
||||
return f"\0[{g}]{s if len(s) == 2 else '.'}\0[/{g}]"
|
||||
return s
|
||||
return Text.from_markup("\n".join(f"{' '.join(map(fmt, map(''.join, zip(*[iter(c.hex())] * 2))))}"
|
||||
f"{' ' * (16 - len(c))}"
|
||||
f" {''.join(map(fmt, c))}"
|
||||
for c in map(lambda x: bytes([n for n in x if n != None]),
|
||||
zip_longest(*[iter(binf.read(min(length, 16 * 10)))] * 16))))
|
||||
|
||||
with open(self.current_file.getpath(), "rb") as binf:
|
||||
self.ftlog.write(hexdump(binf, self.current_file.size))
|
||||
if self.current_file.size > 16*10*2:
|
||||
binf.seek(self.current_file.size-16*10)
|
||||
self.ftlog.write(" [...] ".center(64, '─'))
|
||||
self.ftlog.write(hexdump(binf, self.current_file.size - binf.tell()))
|
||||
|
||||
return True
|
||||
|
||||
def on_file_table_selected(self, message: FileTable.Selected) -> None:
|
||||
f = message.file
|
||||
self.current_file = f
|
||||
self.finfo.clear()
|
||||
self.finfo.add_rows([
|
||||
("ID:", str(f.id)),
|
||||
("File name:", f.getname()),
|
||||
("URL:", f.geturl() if fhost_app.config["SERVER_NAME"] else "⚠ Set SERVER_NAME in config.py to display"),
|
||||
("File size:", do_filesizeformat(f.size, True)),
|
||||
("MIME type:", f.mime),
|
||||
("SHA256 checksum:", f.sha256),
|
||||
("Uploaded by:", Text(f.addr)),
|
||||
("Management token:", f.mgmt_token),
|
||||
("Secret:", f.secret),
|
||||
("Is NSFW:", ("Yes" if f.is_nsfw else "No") + f" (Score: {f.nsfw_score:0.4f})"),
|
||||
("Is banned:", "Yes" if f.removed else "No"),
|
||||
("Expires:", time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(File.get_expiration(f.expiration, f.size)/1000)))
|
||||
])
|
||||
|
||||
self.mpvw.stop_mpv(True)
|
||||
self.ftlog.remove()
|
||||
self.query_one("#infopane").mount(TextLog(id="ftextlog"))
|
||||
self.ftlog = self.query_one("#ftextlog")
|
||||
|
||||
if f.getpath().is_file():
|
||||
self.mimehandler.handle(f.mime, f.ext)
|
||||
self.ftlog.scroll_home(animate=False)
|
||||
|
||||
class NullptrModApp(App):
|
||||
CSS_PATH = "mod.css"
|
||||
|
||||
def on_mount(self) -> None:
|
||||
self.title = "0x0 File Moderation Interface"
|
||||
self.main_screen = NullptrMod()
|
||||
self.install_screen(self.main_screen, name="main")
|
||||
self.push_screen("main")
|
||||
|
||||
if __name__ == "__main__":
|
||||
app = NullptrModApp()
|
||||
app.run()
|
|
@ -0,0 +1,3 @@
|
|||
from .filetable import FileTable
|
||||
from .notification import Notification
|
||||
from .mpvwidget import MpvWidget
|
|
@ -0,0 +1,72 @@
|
|||
from textual.widgets import DataTable, Static
|
||||
from textual.reactive import Reactive
|
||||
from textual.message import Message, MessageTarget
|
||||
from textual import events, log
|
||||
from jinja2.filters import do_filesizeformat
|
||||
|
||||
from fhost import File
|
||||
from modui import mime
|
||||
|
||||
class FileTable(DataTable):
|
||||
query = Reactive(None)
|
||||
order_col = Reactive(0)
|
||||
order_desc = Reactive(True)
|
||||
limit = 10000
|
||||
colmap = [File.id, File.removed, File.nsfw_score, None, File.ext, File.size, File.mime]
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.add_columns("#", "☣️", "🔞", "📂", "name", "size", "mime")
|
||||
self.base_query = File.query.filter(File.size != None)
|
||||
self.query = self.base_query
|
||||
|
||||
class Selected(Message):
|
||||
def __init__(self, sender: MessageTarget, f: File) -> None:
|
||||
self.file = f
|
||||
super().__init__(sender)
|
||||
|
||||
def watch_order_col(self, old, value) -> None:
|
||||
self.watch_query(None, None)
|
||||
|
||||
def watch_order_desc(self, old, value) -> None:
|
||||
self.watch_query(None, None)
|
||||
|
||||
def watch_query(self, old, value) -> None:
|
||||
def fmt_file(f: File) -> tuple:
|
||||
return (
|
||||
str(f.id),
|
||||
"🔴" if f.removed else " ",
|
||||
"🚩" if f.is_nsfw else " ",
|
||||
"👻" if not f.getpath().is_file() else " ",
|
||||
f.getname(),
|
||||
do_filesizeformat(f.size, True),
|
||||
f"{mime.mimemoji.get(f.mime.split('/')[0], mime.mimemoji.get(f.mime)) or ' '} " + f.mime,
|
||||
)
|
||||
|
||||
if (self.query):
|
||||
self.clear()
|
||||
order = FileTable.colmap[self.order_col]
|
||||
q = self.query
|
||||
if order: q = q.order_by(order.desc() if self.order_desc else order, File.id)
|
||||
self.add_rows(map(fmt_file, q.limit(self.limit)))
|
||||
|
||||
def _scroll_cursor_in_to_view(self, animate: bool = False) -> None:
|
||||
region = self._get_cell_region(self.cursor_row, 0)
|
||||
spacing = self._get_cell_border()
|
||||
self.scroll_to_region(region, animate=animate, spacing=spacing)
|
||||
|
||||
async def watch_cursor_cell(self, old, value) -> None:
|
||||
super().watch_cursor_cell(old, value)
|
||||
if value[0] < len(self.data) and value[0] >= 0:
|
||||
f = File.query.get(int(self.data[value[0]][0]))
|
||||
await self.emit(self.Selected(self, f))
|
||||
|
||||
def on_click(self, event: events.Click) -> None:
|
||||
super().on_click(event)
|
||||
meta = self.get_style_at(event.x, event.y).meta
|
||||
if meta:
|
||||
if meta["row"] == -1:
|
||||
qi = FileTable.colmap[meta["column"]]
|
||||
if meta["column"] == self.order_col:
|
||||
self.order_desc = not self.order_desc
|
||||
self.order_col = meta["column"]
|
|
@ -0,0 +1,122 @@
|
|||
from enum import Enum
|
||||
from textual import log
|
||||
|
||||
mimemoji = {
|
||||
"audio" : "🔈",
|
||||
"video" : "🎞",
|
||||
"text" : "📄",
|
||||
"image" : "🖼",
|
||||
"application/zip" : "🗜️",
|
||||
"application/x-zip-compressed" : "🗜️",
|
||||
"application/x-tar" : "🗄",
|
||||
"application/x-cpio" : "🗄",
|
||||
"application/x-xz" : "🗜️",
|
||||
"application/x-7z-compressed" : "🗜️",
|
||||
"application/gzip" : "🗜️",
|
||||
"application/zstd" : "🗜️",
|
||||
"application/x-rar" : "🗜️",
|
||||
"application/x-rar-compressed" : "🗜️",
|
||||
"application/vnd.ms-cab-compressed" : "🗜️",
|
||||
"application/x-bzip2" : "🗜️",
|
||||
"application/x-lzip" : "🗜️",
|
||||
"application/x-iso9660-image" : "💿",
|
||||
"application/pdf" : "📕",
|
||||
"application/epub+zip" : "📕",
|
||||
"application/mxf" : "🎞",
|
||||
"application/vnd.android.package-archive" : "📦",
|
||||
"application/vnd.debian.binary-package" : "📦",
|
||||
"application/x-rpm" : "📦",
|
||||
"application/x-dosexec" : "⚙",
|
||||
"application/x-execuftable" : "⚙",
|
||||
"application/x-sharedlib" : "⚙",
|
||||
"application/java-archive" : "☕",
|
||||
"application/x-qemu-disk" : "🖴",
|
||||
"application/pgp-encrypted" : "🔏",
|
||||
}
|
||||
|
||||
MIMECategory = Enum("MIMECategory",
|
||||
["Archive", "Text", "AV", "Document", "Fallback"]
|
||||
)
|
||||
|
||||
class MIMEHandler:
|
||||
def __init__(self):
|
||||
self.handlers = {
|
||||
MIMECategory.Archive : [[
|
||||
"application/zip",
|
||||
"application/x-zip-compressed",
|
||||
"application/x-tar",
|
||||
"application/x-cpio",
|
||||
"application/x-xz",
|
||||
"application/x-7z-compressed",
|
||||
"application/gzip",
|
||||
"application/zstd",
|
||||
"application/x-rar",
|
||||
"application/x-rar-compressed",
|
||||
"application/vnd.ms-cab-compressed",
|
||||
"application/x-bzip2",
|
||||
"application/x-lzip",
|
||||
"application/x-iso9660-image",
|
||||
"application/vnd.android.package-archive",
|
||||
"application/vnd.debian.binary-package",
|
||||
"application/x-rpm",
|
||||
"application/java-archive",
|
||||
"application/vnd.openxmlformats"
|
||||
], []],
|
||||
MIMECategory.Text : [["text"], []],
|
||||
MIMECategory.AV : [[
|
||||
"audio", "video", "image",
|
||||
"application/mxf"
|
||||
], []],
|
||||
MIMECategory.Document : [[
|
||||
"application/pdf",
|
||||
"application/epub",
|
||||
"application/x-mobipocket-ebook",
|
||||
], []],
|
||||
MIMECategory.Fallback : [[], []]
|
||||
}
|
||||
|
||||
self.exceptions = {
|
||||
MIMECategory.Archive : {
|
||||
".cbz" : MIMECategory.Document,
|
||||
".xps" : MIMECategory.Document,
|
||||
".epub" : MIMECategory.Document,
|
||||
},
|
||||
MIMECategory.Text : {
|
||||
".fb2" : MIMECategory.Document,
|
||||
}
|
||||
}
|
||||
|
||||
def register(self, category, handler):
|
||||
self.handlers[category][1].append(handler)
|
||||
|
||||
def handle(self, mime, ext):
|
||||
def getcat(s):
|
||||
cat = MIMECategory.Fallback
|
||||
for k, v in self.handlers.items():
|
||||
s = s.split(";")[0]
|
||||
if s in v[0] or s.split("/")[0] in v[0]:
|
||||
cat = k
|
||||
break
|
||||
|
||||
for x in v[0]:
|
||||
if s.startswith(x):
|
||||
cat = k
|
||||
break
|
||||
|
||||
if cat in self.exceptions:
|
||||
cat = self.exceptions[cat].get(ext) or cat
|
||||
|
||||
return cat
|
||||
|
||||
cat = getcat(mime)
|
||||
for handler in self.handlers[cat][1]:
|
||||
try:
|
||||
if handler(cat): return
|
||||
except: pass
|
||||
|
||||
for handler in self.handlers[MIMECategory.Fallback][1]:
|
||||
try:
|
||||
if handler(None): return
|
||||
except: pass
|
||||
|
||||
raise RuntimeError(f"Unhandled MIME type category: {cat}")
|
|
@ -0,0 +1,88 @@
|
|||
import time
|
||||
import fcntl, struct, termios
|
||||
from sys import stdout
|
||||
|
||||
from textual import events, log
|
||||
from textual.widgets import Static
|
||||
|
||||
from fhost import app as fhost_app
|
||||
|
||||
class MpvWidget(Static):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self.mpv = None
|
||||
self.vo = fhost_app.config.get("MOD_PREVIEW_PROTO")
|
||||
|
||||
if not self.vo in ["sixel", "kitty"]:
|
||||
self.update("⚠ Previews not enabled. \n\nSet MOD_PREVIEW_PROTO to 'sixel' or 'kitty' in config.py,\nwhichever is supported by your terminal.")
|
||||
else:
|
||||
try:
|
||||
import mpv
|
||||
self.mpv = mpv.MPV()
|
||||
self.mpv.profile = "sw-fast"
|
||||
self.mpv["vo"] = self.vo
|
||||
self.mpv[f"vo-{self.vo}-config-clear"] = False
|
||||
self.mpv[f"vo-{self.vo}-alt-screen"] = False
|
||||
self.mpv[f"vo-sixel-buffered"] = True
|
||||
self.mpv["audio"] = False
|
||||
self.mpv["loop-file"] = "inf"
|
||||
self.mpv["image-display-duration"] = 0.5 if self.vo == "sixel" else "inf"
|
||||
except Exception as e:
|
||||
self.mpv = None
|
||||
self.update(f"⚠ Previews require python-mpv with libmpv 0.36.0 or later \n\nError was:\n{type(e).__name__}: {e}")
|
||||
|
||||
def start_mpv(self, f: str|None = None, pos: float|str|None = None) -> None:
|
||||
self.display = True
|
||||
self.screen._refresh_layout()
|
||||
|
||||
if self.mpv:
|
||||
if self.content_region.x:
|
||||
r, c, w, h = struct.unpack('hhhh', fcntl.ioctl(0, termios.TIOCGWINSZ, '12345678'))
|
||||
width = int((w / c) * self.content_region.width)
|
||||
height = int((h / r) * (self.content_region.height + (1 if self.vo == "sixel" else 0)))
|
||||
self.mpv[f"vo-{self.vo}-left"] = self.content_region.x + 1
|
||||
self.mpv[f"vo-{self.vo}-top"] = self.content_region.y + 1
|
||||
self.mpv[f"vo-{self.vo}-rows"] = self.content_region.height + (1 if self.vo == "sixel" else 0)
|
||||
self.mpv[f"vo-{self.vo}-cols"] = self.content_region.width
|
||||
self.mpv[f"vo-{self.vo}-width"] = width
|
||||
self.mpv[f"vo-{self.vo}-height"] = height
|
||||
|
||||
if pos != None:
|
||||
self.mpv["start"] = pos
|
||||
|
||||
if f:
|
||||
self.mpv.loadfile(f)
|
||||
else:
|
||||
self.mpv.playlist_play_index(0)
|
||||
|
||||
def stop_mpv(self, wait: bool = False) -> None:
|
||||
if self.mpv:
|
||||
if not self.mpv.idle_active:
|
||||
self.mpv.stop(True)
|
||||
if wait:
|
||||
time.sleep(0.1)
|
||||
self.clear_mpv()
|
||||
self.display = False
|
||||
|
||||
def on_resize(self, size) -> None:
|
||||
if self.mpv:
|
||||
if not self.mpv.idle_active:
|
||||
t = self.mpv.time_pos
|
||||
self.stop_mpv()
|
||||
if t:
|
||||
self.mpv["start"] = t
|
||||
self.start_mpv()
|
||||
|
||||
def clear_mpv(self) -> None:
|
||||
if self.vo == "kitty":
|
||||
stdout.write("\033_Ga=d;\033\\")
|
||||
stdout.flush()
|
||||
|
||||
def shutdown(self) -> None:
|
||||
if self.mpv:
|
||||
self.mpv.stop()
|
||||
del self.mpv
|
||||
if self.vo == "kitty":
|
||||
stdout.write("\033_Ga=d;\033\\\033[?25l")
|
||||
stdout.flush()
|
|
@ -0,0 +1,8 @@
|
|||
from textual.widgets import Static
|
||||
|
||||
class Notification(Static):
|
||||
def on_mount(self) -> None:
|
||||
self.set_timer(3, self.remove)
|
||||
|
||||
def on_click(self) -> None:
|
||||
self.remove()
|
Loading…
Reference in New Issue