e1e99957b6
Fixes #78
279 lines
12 KiB
Python
Executable file
279 lines
12 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
from itertools import zip_longest
|
|
from sys import stdout
|
|
import time
|
|
|
|
from textual.app import App, ComposeResult
|
|
from textual.widgets import DataTable, Header, Footer, TextLog, Static, Input
|
|
from textual.containers import Horizontal, Vertical
|
|
from textual.screen import Screen
|
|
from textual import log
|
|
from rich.text import Text
|
|
from jinja2.filters import do_filesizeformat
|
|
|
|
from fhost import db, File, su, app as fhost_app, in_upload_bl
|
|
from modui import *
|
|
|
|
fhost_app.app_context().push()
|
|
|
|
class NullptrMod(Screen):
|
|
BINDINGS = [
|
|
("q", "quit_app", "Quit"),
|
|
("f1", "filter(1, 'Lookup name:')", "Lookup name"),
|
|
("f2", "filter(2, 'Filter IP address:')", "Filter IP"),
|
|
("f3", "filter(3, 'Filter MIME Type:')", "Filter MIME"),
|
|
("f4", "filter(4, 'Filter extension:')", "Filter Ext."),
|
|
("f5", "refresh", "Refresh"),
|
|
("f6", "filter_clear", "Clear filter"),
|
|
("r", "remove_file(False)", "Remove file"),
|
|
("ctrl+r", "remove_file(True)", "Ban file"),
|
|
("p", "ban_ip(False)", "Ban IP"),
|
|
("ctrl+p", "ban_ip(True)", "Nuke IP"),
|
|
]
|
|
|
|
async def action_quit_app(self):
|
|
self.mpvw.shutdown()
|
|
await self.app.action_quit()
|
|
|
|
def action_refresh(self):
|
|
ftable = self.query_one("#ftable")
|
|
ftable.watch_query(None, None)
|
|
|
|
def action_filter_clear(self):
|
|
self.query_one("#filter_container").display = False
|
|
ftable = self.query_one("#ftable")
|
|
ftable.focus()
|
|
ftable.query = ftable.base_query
|
|
|
|
def action_filter(self, fcol: int, label: str):
|
|
self.query_one("#filter_label").update(label)
|
|
finput = self.query_one("#filter_input")
|
|
self.filter_col = fcol
|
|
self.query_one("#filter_container").display = True
|
|
finput.focus()
|
|
self._refresh_layout()
|
|
|
|
if self.current_file:
|
|
match fcol:
|
|
case 1: finput.value = ""
|
|
case 2: finput.value = self.current_file.addr
|
|
case 3: finput.value = self.current_file.mime
|
|
case 4: finput.value = self.current_file.ext
|
|
|
|
def on_input_submitted(self, message: Input.Submitted) -> None:
|
|
self.query_one("#filter_container").display = False
|
|
ftable = self.query_one("#ftable")
|
|
ftable.focus()
|
|
|
|
if len(message.value):
|
|
match self.filter_col:
|
|
case 1:
|
|
try: ftable.query = ftable.base_query.filter(File.id == su.debase(message.value))
|
|
except ValueError: pass
|
|
case 2: ftable.query = ftable.base_query.filter(File.addr == message.value)
|
|
case 3: ftable.query = ftable.base_query.filter(File.mime.like(message.value))
|
|
case 4: ftable.query = ftable.base_query.filter(File.ext.like(message.value))
|
|
else:
|
|
ftable.query = ftable.base_query
|
|
|
|
def action_remove_file(self, permanent: bool) -> None:
|
|
if self.current_file:
|
|
self.current_file.delete(permanent)
|
|
db.session.commit()
|
|
self.mount(Notification(f"{'Banned' if permanent else 'Removed'} file {self.current_file.getname()}"))
|
|
self.action_refresh()
|
|
|
|
def action_ban_ip(self, nuke: bool) -> None:
|
|
if self.current_file:
|
|
if not fhost_app.config["FHOST_UPLOAD_BLACKLIST"]:
|
|
self.mount(Notification("Failed: FHOST_UPLOAD_BLACKLIST not set!"))
|
|
return
|
|
else:
|
|
if in_upload_bl(self.current_file.addr):
|
|
txt = f"{self.current_file.addr} is already banned"
|
|
else:
|
|
with fhost_app.open_instance_resource(fhost_app.config["FHOST_UPLOAD_BLACKLIST"], "a") as bl:
|
|
print(self.current_file.addr.lstrip("::ffff:"), file=bl)
|
|
txt = f"Banned {self.current_file.addr}"
|
|
|
|
if nuke:
|
|
tsize = 0
|
|
trm = 0
|
|
for f in File.query.filter(File.addr == self.current_file.addr):
|
|
if f.getpath().is_file():
|
|
tsize += f.size or f.getpath().stat().st_size
|
|
trm += 1
|
|
f.delete(True)
|
|
db.session.commit()
|
|
txt += f", removed {trm} {'files' if trm != 1 else 'file'} totaling {do_filesizeformat(tsize, True)}"
|
|
self.mount(Notification(txt))
|
|
self._refresh_layout()
|
|
ftable = self.query_one("#ftable")
|
|
ftable.watch_query(None, None)
|
|
|
|
def on_update(self) -> None:
|
|
stdout.write("\033[?25l")
|
|
stdout.flush()
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Header()
|
|
yield Horizontal(
|
|
FileTable(id="ftable", zebra_stripes=True),
|
|
Vertical(
|
|
DataTable(id="finfo", show_header=False),
|
|
MpvWidget(id="mpv"),
|
|
TextLog(id="ftextlog"),
|
|
id="infopane"))
|
|
yield Horizontal(Static("Filter:", id="filter_label"), Input(id="filter_input"), id="filter_container")
|
|
yield Footer()
|
|
|
|
def on_mount(self) -> None:
|
|
self.current_file = None
|
|
|
|
self.ftable = self.query_one("#ftable")
|
|
self.ftable.focus()
|
|
|
|
self.finfo = self.query_one("#finfo")
|
|
self.finfo.add_columns("key", "value")
|
|
|
|
self.mpvw = self.query_one("#mpv")
|
|
self.ftlog = self.query_one("#ftextlog")
|
|
|
|
self.mimehandler = mime.MIMEHandler()
|
|
self.mimehandler.register(mime.MIMECategory.Archive, self.handle_libarchive)
|
|
self.mimehandler.register(mime.MIMECategory.Text, self.handle_text)
|
|
self.mimehandler.register(mime.MIMECategory.AV, self.handle_mpv)
|
|
self.mimehandler.register(mime.MIMECategory.Document, self.handle_mupdf)
|
|
self.mimehandler.register(mime.MIMECategory.Fallback, self.handle_libarchive)
|
|
self.mimehandler.register(mime.MIMECategory.Fallback, self.handle_mpv)
|
|
self.mimehandler.register(mime.MIMECategory.Fallback, self.handle_raw)
|
|
|
|
def handle_libarchive(self, cat):
|
|
import libarchive
|
|
with libarchive.file_reader(str(self.current_file.getpath())) as a:
|
|
self.ftlog.write("\n".join(e.path for e in a))
|
|
return True
|
|
|
|
def handle_text(self, cat):
|
|
with open(self.current_file.getpath(), "r") as sf:
|
|
data = sf.read(1000000).replace("\033","")
|
|
self.ftlog.write(data)
|
|
return True
|
|
|
|
def handle_mupdf(self, cat):
|
|
import fitz
|
|
with fitz.open(self.current_file.getpath(),
|
|
filetype=self.current_file.ext.lstrip(".")) as doc:
|
|
p = doc.load_page(0)
|
|
pix = p.get_pixmap(dpi=72)
|
|
imgdata = pix.tobytes("ppm").hex()
|
|
|
|
self.mpvw.styles.height = "40%"
|
|
self.mpvw.start_mpv("hex://" + imgdata, 0)
|
|
|
|
self.ftlog.write(Text.from_markup(f"[bold]Pages:[/bold] {doc.page_count}"))
|
|
self.ftlog.write(Text.from_markup("[bold]Metadata:[/bold]"))
|
|
for k, v in doc.metadata.items():
|
|
self.ftlog.write(Text.from_markup(f" [bold]{k}:[/bold] {v}"))
|
|
toc = doc.get_toc()
|
|
if len(toc):
|
|
self.ftlog.write(Text.from_markup("[bold]TOC:[/bold]"))
|
|
for lvl, title, page in toc:
|
|
self.ftlog.write(f"{' ' * lvl} {page}: {title}")
|
|
return True
|
|
|
|
def handle_mpv(self, cat):
|
|
if cat == mime.MIMECategory.AV or self.current_file.nsfw_score >= 0:
|
|
self.mpvw.styles.height = "20%"
|
|
self.mpvw.start_mpv(str(self.current_file.getpath()), 0)
|
|
|
|
import av
|
|
with av.open(str(self.current_file.getpath())) as c:
|
|
self.ftlog.write(Text("Format:", style="bold"))
|
|
self.ftlog.write(f" {c.format.long_name}")
|
|
if len(c.metadata):
|
|
self.ftlog.write(Text("Metadata:", style="bold"))
|
|
for k, v in c.metadata.items():
|
|
self.ftlog.write(f" {k}: {v}")
|
|
for s in c.streams:
|
|
self.ftlog.write(Text(f"Stream {s.index}:", style="bold"))
|
|
self.ftlog.write(f" Type: {s.type}")
|
|
if s.base_rate:
|
|
self.ftlog.write(f" Frame rate: {s.base_rate}")
|
|
if len(s.metadata):
|
|
self.ftlog.write(Text(" Metadata:", style="bold"))
|
|
for k, v in s.metadata.items():
|
|
self.ftlog.write(f" {k}: {v}")
|
|
return True
|
|
return False
|
|
|
|
def handle_raw(self, cat):
|
|
def hexdump(binf, length):
|
|
def fmt(s):
|
|
if isinstance(s, str):
|
|
c = chr(int(s, 16))
|
|
else:
|
|
c = chr(s)
|
|
s = c
|
|
if c.isalpha(): return f"\0[chartreuse1]{s}\0[/chartreuse1]"
|
|
if c.isdigit(): return f"\0[gold1]{s}\0[/gold1]"
|
|
if not c.isprintable():
|
|
g = "grey50" if c == "\0" else "cadet_blue"
|
|
return f"\0[{g}]{s if len(s) == 2 else '.'}\0[/{g}]"
|
|
return s
|
|
return Text.from_markup("\n".join(f"{' '.join(map(fmt, map(''.join, zip(*[iter(c.hex())] * 2))))}"
|
|
f"{' ' * (16 - len(c))}"
|
|
f" {''.join(map(fmt, c))}"
|
|
for c in map(lambda x: bytes([n for n in x if n != None]),
|
|
zip_longest(*[iter(binf.read(min(length, 16 * 10)))] * 16))))
|
|
|
|
with open(self.current_file.getpath(), "rb") as binf:
|
|
self.ftlog.write(hexdump(binf, self.current_file.size))
|
|
if self.current_file.size > 16*10*2:
|
|
binf.seek(self.current_file.size-16*10)
|
|
self.ftlog.write(" [...] ".center(64, '─'))
|
|
self.ftlog.write(hexdump(binf, self.current_file.size - binf.tell()))
|
|
|
|
return True
|
|
|
|
def on_file_table_selected(self, message: FileTable.Selected) -> None:
|
|
f = message.file
|
|
self.current_file = f
|
|
self.finfo.clear()
|
|
self.finfo.add_rows([
|
|
("ID:", str(f.id)),
|
|
("File name:", f.getname()),
|
|
("URL:", f.geturl() if fhost_app.config["SERVER_NAME"] else "⚠ Set SERVER_NAME in config.py to display"),
|
|
("File size:", do_filesizeformat(f.size, True)),
|
|
("MIME type:", f.mime),
|
|
("SHA256 checksum:", f.sha256),
|
|
("Uploaded by:", Text(f.addr)),
|
|
("Management token:", f.mgmt_token),
|
|
("Secret:", f.secret),
|
|
("Is NSFW:", ("Yes" if f.is_nsfw else "No") + (f" (Score: {f.nsfw_score:0.4f})" if f.nsfw_score else " (Not scanned)")),
|
|
("Is banned:", "Yes" if f.removed else "No"),
|
|
("Expires:", time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(File.get_expiration(f.expiration, f.size)/1000)))
|
|
])
|
|
|
|
self.mpvw.stop_mpv(True)
|
|
self.ftlog.remove()
|
|
self.query_one("#infopane").mount(TextLog(id="ftextlog"))
|
|
self.ftlog = self.query_one("#ftextlog")
|
|
|
|
if f.getpath().is_file():
|
|
self.mimehandler.handle(f.mime, f.ext)
|
|
self.ftlog.scroll_home(animate=False)
|
|
|
|
class NullptrModApp(App):
|
|
CSS_PATH = "mod.css"
|
|
|
|
def on_mount(self) -> None:
|
|
self.title = "0x0 File Moderation Interface"
|
|
self.main_screen = NullptrMod()
|
|
self.install_screen(self.main_screen, name="main")
|
|
self.push_screen("main")
|
|
|
|
if __name__ == "__main__":
|
|
app = NullptrModApp()
|
|
app.run()
|