Compare commits

..

No commits in common. "master" and "master" have entirely different histories.

5 changed files with 60 additions and 104 deletions

View File

@ -106,7 +106,6 @@ db = SQLAlchemy(app)
migrate = Migrate(app, db)
class URL(db.Model):
__tablename__ = "URL"
id = db.Column(db.Integer, primary_key = True)
url = db.Column(db.UnicodeText, unique = True)
@ -135,7 +134,6 @@ class File(db.Model):
ext = db.Column(db.UnicodeText)
mime = db.Column(db.UnicodeText)
addr = db.Column(db.UnicodeText)
ua = db.Column(db.UnicodeText)
removed = db.Column(db.Boolean, default=False)
nsfw_score = db.Column(db.Float)
expiration = db.Column(db.BigInteger)
@ -144,12 +142,11 @@ class File(db.Model):
last_vscan = db.Column(db.DateTime)
size = db.Column(db.BigInteger)
def __init__(self, sha256, ext, mime, addr, ua, expiration, mgmt_token):
def __init__(self, sha256, ext, mime, addr, expiration, mgmt_token):
self.sha256 = sha256
self.ext = ext
self.mime = mime
self.addr = addr
self.ua = ua
self.expiration = expiration
self.mgmt_token = mgmt_token
@ -214,7 +211,7 @@ class File(db.Model):
Any value greater that the longest allowed file lifespan will be rounded down to that
value.
"""
def store(file_, requested_expiration: typing.Optional[int], addr, ua, secret: bool):
def store(file_, requested_expiration: typing.Optional[int], addr, secret: bool):
data = file_.read()
digest = sha256(data).hexdigest()
@ -280,10 +277,9 @@ class File(db.Model):
mime = get_mime()
ext = get_ext(mime)
mgmt_token = secrets.token_urlsafe()
f = File(digest, ext, mime, addr, ua, expiration, mgmt_token)
f = File(digest, ext, mime, addr, expiration, mgmt_token)
f.addr = addr
f.ua = ua
if isnew:
f.secret = None
@ -371,11 +367,11 @@ requested_expiration can be:
Any value greater that the longest allowed file lifespan will be rounded down to that
value.
"""
def store_file(f, requested_expiration: typing.Optional[int], addr, ua, secret: bool):
def store_file(f, requested_expiration: typing.Optional[int], addr, secret: bool):
if in_upload_bl(addr):
return "Your host is blocked from uploading files.\n", 451
sf, isnew = File.store(f, requested_expiration, addr, ua, secret)
sf, isnew = File.store(f, requested_expiration, addr, secret)
response = make_response(sf.geturl())
response.headers["X-Expires"] = sf.expiration
@ -385,7 +381,7 @@ def store_file(f, requested_expiration: typing.Optional[int], addr, ua, secret:
return response
def store_url(url, addr, ua, secret: bool):
def store_url(url, addr, secret: bool):
if is_fhost_url(url):
abort(400)
@ -400,13 +396,13 @@ def store_url(url, addr, ua, secret: bool):
if "content-length" in r.headers:
l = int(r.headers["content-length"])
if l <= app.config["MAX_CONTENT_LENGTH"]:
if l < app.config["MAX_CONTENT_LENGTH"]:
def urlfile(**kwargs):
return type('',(),kwargs)()
f = urlfile(read=r.raw.read, content_type=r.headers["content-type"], filename="")
return store_file(f, None, addr, ua, secret)
return store_file(f, None, addr, secret)
else:
abort(413)
else:
@ -501,7 +497,6 @@ def fhost():
request.files["file"],
int(request.form["expires"]),
request.remote_addr,
request.user_agent.string,
secret
)
except ValueError:
@ -513,14 +508,12 @@ def fhost():
request.files["file"],
None,
request.remote_addr,
request.user_agent.string,
secret
)
elif "url" in request.form:
return store_url(
request.form["url"],
request.remote_addr,
request.user_agent.string,
secret
)
elif "shorten" in request.form:

View File

@ -1,30 +0,0 @@
"""Store user agent string with files
Revision ID: dd0766afb7d2
Revises: 30bfe33aa328
Create Date: 2023-03-29 07:18:49.113200
"""
# revision identifiers, used by Alembic.
revision = 'dd0766afb7d2'
down_revision = '30bfe33aa328'
from alembic import op
import sqlalchemy as sa
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('file', schema=None) as batch_op:
batch_op.add_column(sa.Column('ua', sa.UnicodeText(), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('file', schema=None) as batch_op:
batch_op.drop_column('ua')
# ### end Alembic commands ###

16
mod.css
View File

@ -1,6 +1,5 @@
#ftable {
width: 1fr;
height: 100%;
}
#infopane {
@ -11,7 +10,7 @@
#finfo {
background: $boost;
height: 14;
height: 12;
width: 1fr;
box-sizing: content-box;
}
@ -28,9 +27,20 @@
width: 1fr;
}
#filter_container {
height: auto;
display: none;
}
#filter_label {
content-align: right middle;
height: 1fr;
width: 20%;
margin: 0 1 0 2;
}
#filter_input {
width: 1fr;
display: none;
}
Notification {

51
mod.py
View File

@ -5,7 +5,7 @@ from sys import stdout
import time
from textual.app import App, ComposeResult
from textual.widgets import DataTable, Header, Footer, RichLog, Static, Input
from textual.widgets import DataTable, Header, Footer, TextLog, Static, Input
from textual.containers import Horizontal, Vertical
from textual.screen import Screen
from textual import log
@ -20,13 +20,12 @@ fhost_app.app_context().push()
class NullptrMod(Screen):
BINDINGS = [
("q", "quit_app", "Quit"),
("f1", "filter(1, 'Name')", "Lookup name"),
("f2", "filter(2, 'IP address')", "Filter IP"),
("f3", "filter(3, 'MIME Type')", "Filter MIME"),
("f4", "filter(4, 'Extension')", "Filter Ext."),
("f1", "filter(1, 'Lookup name:')", "Lookup name"),
("f2", "filter(2, 'Filter IP address:')", "Filter IP"),
("f3", "filter(3, 'Filter MIME Type:')", "Filter MIME"),
("f4", "filter(4, 'Filter extension:')", "Filter Ext."),
("f5", "refresh", "Refresh"),
("f6", "filter_clear", "Clear filter"),
("f7", "filter(5, 'User agent')", "Filter UA"),
("r", "remove_file(False)", "Remove file"),
("ctrl+r", "remove_file(True)", "Ban file"),
("p", "ban_ip(False)", "Ban IP"),
@ -42,28 +41,28 @@ class NullptrMod(Screen):
ftable.watch_query(None, None)
def action_filter_clear(self):
self.finput.display = False
self.query_one("#filter_container").display = False
ftable = self.query_one("#ftable")
ftable.focus()
ftable.query = ftable.base_query
def action_filter(self, fcol: int, label: str):
self.finput.placeholder = label
self.finput.display = True
self.finput.focus()
self.query_one("#filter_label").update(label)
finput = self.query_one("#filter_input")
self.filter_col = fcol
self.query_one("#filter_container").display = True
finput.focus()
self._refresh_layout()
if self.current_file:
match fcol:
case 1: self.finput.value = ""
case 2: self.finput.value = self.current_file.addr
case 3: self.finput.value = self.current_file.mime
case 4: self.finput.value = self.current_file.ext
case 5: self.finput.value = self.current_file.ua or ""
case 1: finput.value = ""
case 2: finput.value = self.current_file.addr
case 3: finput.value = self.current_file.mime
case 4: finput.value = self.current_file.ext
def on_input_submitted(self, message: Input.Submitted) -> None:
self.finput.display = False
self.query_one("#filter_container").display = False
ftable = self.query_one("#ftable")
ftable.focus()
@ -72,10 +71,9 @@ class NullptrMod(Screen):
case 1:
try: ftable.query = ftable.base_query.filter(File.id == su.debase(message.value))
except ValueError: pass
case 2: ftable.query = ftable.base_query.filter(File.addr.like(message.value))
case 2: ftable.query = ftable.base_query.filter(File.addr == message.value)
case 3: ftable.query = ftable.base_query.filter(File.mime.like(message.value))
case 4: ftable.query = ftable.base_query.filter(File.ext.like(message.value))
case 5: ftable.query = ftable.base_query.filter(File.ua.like(message.value))
else:
ftable.query = ftable.base_query
@ -121,13 +119,13 @@ class NullptrMod(Screen):
def compose(self) -> ComposeResult:
yield Header()
yield Horizontal(
FileTable(id="ftable", zebra_stripes=True, cursor_type="row"),
FileTable(id="ftable", zebra_stripes=True),
Vertical(
DataTable(id="finfo", show_header=False, cursor_type="none"),
DataTable(id="finfo", show_header=False),
MpvWidget(id="mpv"),
RichLog(id="ftextlog", auto_scroll=False),
TextLog(id="ftextlog"),
id="infopane"))
yield Input(id="filter_input")
yield Horizontal(Static("Filter:", id="filter_label"), Input(id="filter_input"), id="filter_container")
yield Footer()
def on_mount(self) -> None:
@ -142,8 +140,6 @@ class NullptrMod(Screen):
self.mpvw = self.query_one("#mpv")
self.ftlog = self.query_one("#ftextlog")
self.finput = self.query_one("#filter_input")
self.mimehandler = mime.MIMEHandler()
self.mimehandler.register(mime.MIMECategory.Archive, self.handle_libarchive)
self.mimehandler.register(mime.MIMECategory.Text, self.handle_text)
@ -253,7 +249,6 @@ class NullptrMod(Screen):
("MIME type:", f.mime),
("SHA256 checksum:", f.sha256),
("Uploaded by:", Text(f.addr)),
("User agent:", Text(f.ua or "")),
("Management token:", f.mgmt_token),
("Secret:", f.secret),
("Is NSFW:", ("Yes" if f.is_nsfw else "No") + (f" (Score: {f.nsfw_score:0.4f})" if f.nsfw_score else " (Not scanned)")),
@ -262,11 +257,13 @@ class NullptrMod(Screen):
])
self.mpvw.stop_mpv(True)
self.ftlog.clear()
self.ftlog.remove()
self.query_one("#infopane").mount(TextLog(id="ftextlog"))
self.ftlog = self.query_one("#ftextlog")
if f.getpath().is_file():
self.mimehandler.handle(f.mime, f.ext)
self.ftlog.scroll_to(x=0, y=0, animate=False)
self.ftlog.scroll_home(animate=False)
class NullptrModApp(App):
CSS_PATH = "mod.css"

View File

@ -1,6 +1,6 @@
from textual.widgets import DataTable, Static
from textual.reactive import Reactive
from textual.message import Message
from textual.message import Message, MessageTarget
from textual import events, log
from jinja2.filters import do_filesizeformat
@ -21,9 +21,9 @@ class FileTable(DataTable):
self.query = self.base_query
class Selected(Message):
def __init__(self, f: File) -> None:
def __init__(self, sender: MessageTarget, f: File) -> None:
self.file = f
super().__init__()
super().__init__(sender)
def watch_order_col(self, old, value) -> None:
self.watch_query(None, None)
@ -44,39 +44,25 @@ class FileTable(DataTable):
)
if (self.query):
self.clear()
order = FileTable.colmap[self.order_col]
q = self.query
if order: q = q.order_by(order.desc() if self.order_desc else order, File.id)
qres = list(map(fmt_file, q.limit(self.limit)))
self.add_rows(map(fmt_file, q.limit(self.limit)))
ri = 0
row = self.cursor_coordinate.row
if row < self.row_count and row >= 0:
ri = int(self.get_row_at(row)[0])
def _scroll_cursor_in_to_view(self, animate: bool = False) -> None:
region = self._get_cell_region(self.cursor_row, 0)
spacing = self._get_cell_border()
self.scroll_to_region(region, animate=animate, spacing=spacing)
self.clear()
self.add_rows(qres)
for i, v in enumerate(qres):
if int(v[0]) == ri:
self.move_cursor(row=i)
break
self.on_selected()
def on_selected(self) -> Selected:
row = self.cursor_coordinate.row
if row < self.row_count and row >= 0:
f = File.query.get(int(self.get_row_at(row)[0]))
self.post_message(self.Selected(f))
def watch_cursor_coordinate(self, old, value) -> None:
super().watch_cursor_coordinate(old, value)
if old != value:
self.on_selected()
async def watch_cursor_cell(self, old, value) -> None:
super().watch_cursor_cell(old, value)
if value[0] < len(self.data) and value[0] >= 0:
f = File.query.get(int(self.data[value[0]][0]))
await self.emit(self.Selected(self, f))
def on_click(self, event: events.Click) -> None:
super().on_click(event)
meta = self.get_style_at(event.x, event.y).meta
if meta:
if meta["row"] == -1: