Neome_Bridge/bridgeserver.pyCopy
#!/usr/bin/env python3
import asyncio
import hashlib
import json
import secrets
from aiohttp import web
HOST = "127.0.0.1"
PORT = 8777
REQUEST_TIMEOUT = 600
MAX_BODY = 500 * 1024 * 1024
clients = {}
pending = {}
def hash_key(key):
return hashlib.sha256(key.encode()).hexdigest()
def make_request_id():
return secrets.token_hex(16)
def get_key(headers):
auth = headers.get("Authorization", "")
if auth.lower().startswith("bearer "):
return auth[7:].strip()
return ""
def json_response(data, status=200):
resp = web.json_response(data, status=status)
resp.headers["Access-Control-Allow-Origin"] = "*"
resp.headers["Access-Control-Allow-Headers"] = "Authorization, Content-Type"
resp.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS"
return resp
async def send(ws, obj):
await ws.send_str(
json.dumps(
obj,
separators=(",", ":"),
)
)
async def health(request):
return json_response({
"ok": True,
})
async def api(request):
if request.method == "OPTIONS":
return json_response({"ok": True})
key = get_key(request.headers)
if not key:
return json_response({
"error": {
"message": "Missing key",
}
}, 401)
ws = clients.get(hash_key(key))
if not ws:
return json_response({
"error": {
"message": "Bridge offline",
}
}, 503)
body = {}
try:
raw = await request.read()
if len(raw) > MAX_BODY:
return json_response({
"error": {
"message": "Body too large",
}
}, 413)
if raw:
body = json.loads(
raw.decode(
"utf-8",
errors="replace",
)
)
except Exception:
return json_response({
"error": {
"message": "Invalid JSON",
}
}, 400)
request_id = make_request_id()
loop = asyncio.get_running_loop()
fut = loop.create_future()
pending[request_id] = fut
try:
await send(ws, {
"type": "request",
"request_id": request_id,
"method": request.method,
"path": request.path,
"headers": dict(request.headers),
"body": body,
})
result = await asyncio.wait_for(
fut,
timeout=REQUEST_TIMEOUT,
)
return json_response(
result.get("body", {}),
result.get("status", 200),
)
except asyncio.TimeoutError:
return json_response({
"error": {
"message": "Worker timeout",
}
}, 504)
finally:
pending.pop(request_id, None)
async def websocket_handler(request):
ws = web.WebSocketResponse(
max_msg_size=MAX_BODY,
heartbeat=25,
)
await ws.prepare(request)
key_hash = ""
try:
async for msg in ws:
if msg.type != web.WSMsgType.TEXT:
continue
try:
data = json.loads(msg.data)
except Exception:
continue
if data.get("type") == "register":
key = str(
data.get("key") or ""
).strip()
if not key:
await ws.close()
return ws
key_hash = hash_key(key)
old = clients.get(key_hash)
if old:
try:
await old.close()
except Exception:
pass
clients[key_hash] = ws
await send(ws, {
"type": "registered",
"ok": True,
})
print(
"[connected]",
key_hash[:8],
flush=True,
)
continue
if data.get("type") == "response":
request_id = str(
data.get("request_id") or ""
)
fut = pending.pop(
request_id,
None,
)
if fut and not fut.done():
fut.set_result(data)
except Exception as e:
print(e, flush=True)
finally:
if (
key_hash and
clients.get(key_hash) is ws
):
del clients[key_hash]
print(
"[disconnected]",
flush=True,
)
return ws
app = web.Application(
client_max_size=MAX_BODY,
)
app.router.add_get(
"/health",
health,
)
app.router.add_route(
"*",
"/ws",
websocket_handler,
)
app.router.add_route(
"*",
"/{tail:.*}",
api,
)
if __name__ == "__main__":
print(
f"Listening on {HOST}:{PORT}",
flush=True,
)
web.run_app(
app,
host=HOST,
port=PORT,
)