feat: add servicelink RPC mesh endpoint
Build and Push Docker Container / build-and-push (push) Successful in 3m21s
Build and Push Docker Container / build-and-push (push) Successful in 3m21s
- Add the servicelink submodule and register POST /rpc for node-to-node file operations. - Require bearer tokens with the mesh scope and apply rate/body-size limits to RPC calls. - Map database connectivity failures to the existing 504 database error flow, with JSON responses for API routes. - Cover the new RPC handlers and database error handling with focused pytest tests. - Bump the NanoShare package version to 1.21.0.
This commit is contained in:
@@ -0,0 +1,139 @@
|
||||
'''ServiceLink mesh endpoint for the picoshare/NanoShare node.
|
||||
|
||||
Lets other nodes (browser-cli, website) push files in and read file metadata
|
||||
over the shared servicelink envelope at POST /rpc. Sits alongside the existing
|
||||
web UI and /api routes.
|
||||
|
||||
Security: every call needs a bearer token that decodes to the `mesh` scope
|
||||
(or `all`); the endpoint is rate limited and body-size capped. Intended to be
|
||||
reachable only from the internal node network (lock it down at the reverse
|
||||
proxy), with the token as defence-in-depth.
|
||||
'''
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
|
||||
from quart import Blueprint, Response, current_app, request
|
||||
|
||||
from my_modules.app.setup import LIMITER
|
||||
from my_modules.expiry import ensure_utc, parse_expires
|
||||
from my_modules.file_meta import format_size, iso_stamp_filename
|
||||
from servicelink import Forbidden, InvalidParams, NotFound, Router, Unauthorized, bearer_verifier, handle_envelope
|
||||
|
||||
# Cap the /rpc body. base64 inflates ~33%, so this bounds upload size too.
|
||||
MAX_RPC_BODY = 16 * 1024 * 1024
|
||||
MESH_SCOPE = 'mesh'
|
||||
|
||||
router = Router('picoshare')
|
||||
|
||||
def _user_id(ctx):
|
||||
if ctx.principal is None:
|
||||
raise Unauthorized('authentication required')
|
||||
return ctx.principal.subject
|
||||
|
||||
@router.method('files.upload')
|
||||
async def files_upload(params, ctx):
|
||||
user_id = _user_id(ctx)
|
||||
text = params.get('text')
|
||||
content_b64 = params.get('content_b64')
|
||||
note = params.get('note', '')
|
||||
expires_at = ensure_utc(parse_expires(params.get('expires', '')))
|
||||
|
||||
if content_b64:
|
||||
data = base64.b64decode(content_b64)
|
||||
content_type = params.get('content_type') or 'application/octet-stream'
|
||||
default_ext = 'bin'
|
||||
elif text is not None:
|
||||
data = text.encode('utf-8')
|
||||
content_type = 'text/plain'
|
||||
default_ext = 'txt'
|
||||
else:
|
||||
raise InvalidParams('provide text or content_b64')
|
||||
|
||||
file_name = params.get('file_name') or iso_stamp_filename('mesh', default_ext)
|
||||
storage_id = await current_app.convex.send_to_storage(data=data, content_type=content_type)
|
||||
await current_app.convex.add_file(
|
||||
file_name=file_name,
|
||||
file_size=format_size(len(data)),
|
||||
note=note,
|
||||
content_type=content_type,
|
||||
expires_at=expires_at,
|
||||
storage_id=storage_id,
|
||||
user_id=user_id,
|
||||
)
|
||||
return {'file_name': file_name, 'size': len(data), 'content_type': content_type}
|
||||
|
||||
@router.method('files.list')
|
||||
async def files_list(params, ctx):
|
||||
return {'files': await current_app.convex.get_files(user_id=_user_id(ctx))}
|
||||
|
||||
@router.method('files.get')
|
||||
async def files_get(params, ctx):
|
||||
file_id = params.get('file_id')
|
||||
if not file_id:
|
||||
raise InvalidParams('file_id is required')
|
||||
meta = await current_app.convex.get_file(file_id)
|
||||
if not meta:
|
||||
raise NotFound('no such file', data={'file_id': file_id})
|
||||
return meta
|
||||
|
||||
@router.method('files.info')
|
||||
async def files_info(params, ctx):
|
||||
file_id = params.get('file_id')
|
||||
if not file_id:
|
||||
raise InvalidParams('file_id is required')
|
||||
return await current_app.convex.get_file_informations(file_id, _user_id(ctx))
|
||||
|
||||
@router.method('files.update')
|
||||
async def files_update(params, ctx):
|
||||
file_id = params.get('file_id')
|
||||
file_name = params.get('file_name')
|
||||
if not file_id or not file_name:
|
||||
raise InvalidParams('file_id and file_name are required')
|
||||
await current_app.convex.update_file(
|
||||
file_id=file_id,
|
||||
file_name=file_name,
|
||||
note=params.get('note', ''),
|
||||
expires_at=ensure_utc(parse_expires(params.get('expires', ''))),
|
||||
user_id=_user_id(ctx),
|
||||
)
|
||||
return {'updated': True}
|
||||
|
||||
@router.method('files.delete')
|
||||
async def files_delete(params, ctx):
|
||||
file_id = params.get('file_id')
|
||||
if not file_id:
|
||||
raise InvalidParams('file_id is required')
|
||||
await current_app.convex.delete_file(file_id, _user_id(ctx))
|
||||
return {'deleted': True}
|
||||
|
||||
async def _decode_access_token(token):
|
||||
payload = await current_app.convex.decode_access_token_payload(access_token=token)
|
||||
if not payload or payload.get('error') or not payload.get('sub'):
|
||||
raise ValueError((payload or {}).get('error', 'invalid token'))
|
||||
return payload
|
||||
|
||||
_base_verify = bearer_verifier(_decode_access_token)
|
||||
|
||||
async def _verify(authorization, req):
|
||||
principal = await _base_verify(authorization, req)
|
||||
if principal is None or not principal.has_scope(MESH_SCOPE):
|
||||
raise Forbidden(f'token lacks the {MESH_SCOPE!r} scope')
|
||||
return principal
|
||||
|
||||
link_bp = Blueprint('servicelink_picoshare', __name__)
|
||||
|
||||
@link_bp.post('/rpc')
|
||||
@LIMITER.limit('30 per minute')
|
||||
async def rpc_endpoint():
|
||||
if (request.content_length or 0) > MAX_RPC_BODY:
|
||||
return Response('{"error":"payload too large"}', status=413, content_type='application/json')
|
||||
raw = await request.get_data()
|
||||
status, body, content_type = await handle_envelope(
|
||||
router,
|
||||
raw,
|
||||
authorization=request.headers.get('Authorization'),
|
||||
verify=_verify,
|
||||
content_type=request.headers.get('Content-Type', 'application/json'),
|
||||
)
|
||||
return Response(body, status=status, content_type=content_type)
|
||||
@@ -13,6 +13,7 @@ from my_modules.functions import (
|
||||
)
|
||||
from quart_common.web.env import is_development_environment
|
||||
from quart_common.web.wide_event import add_wide_event_context
|
||||
import httpx
|
||||
|
||||
IGNORED_404_PATHS = [
|
||||
"/.well-known/",
|
||||
@@ -186,6 +187,11 @@ async def internal_server_error(e):
|
||||
file={'name': '500.webp', 'alt': "Astronaut jumping and clicking on random Buttons as a red alert gone off - They is a Text on the Image saying: Why don't shit Work!?!"},
|
||||
), 500
|
||||
|
||||
@app.errorhandler(httpx.HTTPError)
|
||||
@app.errorhandler(TimeoutError)
|
||||
async def database_unavailable_error(e):
|
||||
return await database_server_error(e)
|
||||
|
||||
@app.errorhandler(504)
|
||||
async def database_server_error(e):
|
||||
try:
|
||||
@@ -193,8 +199,12 @@ async def database_server_error(e):
|
||||
except LookupError as e:
|
||||
return await to_many_requests(e)
|
||||
|
||||
context = get_request_context()
|
||||
add_wide_event_context(error={"type": type(e).__name__, "message": str(e)})
|
||||
logger.error(e)
|
||||
if context.path.startswith("/api"):
|
||||
return jsonify({"error": "Database Error", "message": "The database is currently unavailable. Please try again in a moment."}), 504
|
||||
|
||||
return await render_template('views/basics/error.htm',
|
||||
title='Database Error',
|
||||
header={'title': '504 - Database Error', 'message': "It looks like something is broke on our end... but don't worry, we're fixing it! Either way, thanks for helping us find new ways to crash our system. Stay curious, hacker-friend!"},
|
||||
|
||||
Reference in New Issue
Block a user