← Back Download
Neome_Bridge/bridgeserver.pyCopy
#!/usr/bin/env python3

import asyncio
import hashlib
import json
import secrets

from aiohttp import web

HOST = "127.0.0.1"
PORT = 8777
REQUEST_TIMEOUT = 600
MAX_BODY = 500 * 1024 * 1024

clients = {}
pending = {}


def hash_key(key):
	return hashlib.sha256(key.encode()).hexdigest()


def make_request_id():
	return secrets.token_hex(16)


def get_key(headers):
	auth = headers.get("Authorization", "")

	if auth.lower().startswith("bearer "):
		return auth[7:].strip()

	return ""


def json_response(data, status=200):
	resp = web.json_response(data, status=status)

	resp.headers["Access-Control-Allow-Origin"] = "*"
	resp.headers["Access-Control-Allow-Headers"] = "Authorization, Content-Type"
	resp.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS"

	return resp


async def send(ws, obj):
	await ws.send_str(
		json.dumps(
			obj,
			separators=(",", ":"),
		)
	)


async def health(request):
	return json_response({
		"ok": True,
	})


async def api(request):
	if request.method == "OPTIONS":
		return json_response({"ok": True})

	key = get_key(request.headers)

	if not key:
		return json_response({
			"error": {
				"message": "Missing key",
			}
		}, 401)

	ws = clients.get(hash_key(key))

	if not ws:
		return json_response({
			"error": {
				"message": "Bridge offline",
			}
		}, 503)

	body = {}

	try:
		raw = await request.read()

		if len(raw) > MAX_BODY:
			return json_response({
				"error": {
					"message": "Body too large",
				}
			}, 413)

		if raw:
			body = json.loads(
				raw.decode(
					"utf-8",
					errors="replace",
				)
			)

	except Exception:
		return json_response({
			"error": {
				"message": "Invalid JSON",
			}
		}, 400)

	request_id = make_request_id()

	loop = asyncio.get_running_loop()
	fut = loop.create_future()

	pending[request_id] = fut

	try:
		await send(ws, {
			"type": "request",
			"request_id": request_id,
			"method": request.method,
			"path": request.path,
			"headers": dict(request.headers),
			"body": body,
		})

		result = await asyncio.wait_for(
			fut,
			timeout=REQUEST_TIMEOUT,
		)

		return json_response(
			result.get("body", {}),
			result.get("status", 200),
		)

	except asyncio.TimeoutError:

		return json_response({
			"error": {
				"message": "Worker timeout",
			}
		}, 504)

	finally:
		pending.pop(request_id, None)


async def websocket_handler(request):
	ws = web.WebSocketResponse(
		max_msg_size=MAX_BODY,
		heartbeat=25,
	)

	await ws.prepare(request)

	key_hash = ""

	try:
		async for msg in ws:

			if msg.type != web.WSMsgType.TEXT:
				continue

			try:
				data = json.loads(msg.data)
			except Exception:
				continue

			if data.get("type") == "register":

				key = str(
					data.get("key") or ""
				).strip()

				if not key:
					await ws.close()
					return ws

				key_hash = hash_key(key)

				old = clients.get(key_hash)

				if old:
					try:
						await old.close()
					except Exception:
						pass

				clients[key_hash] = ws

				await send(ws, {
					"type": "registered",
					"ok": True,
				})

				print(
					"[connected]",
					key_hash[:8],
					flush=True,
				)

				continue

			if data.get("type") == "response":

				request_id = str(
					data.get("request_id") or ""
				)

				fut = pending.pop(
					request_id,
					None,
				)

				if fut and not fut.done():
					fut.set_result(data)

	except Exception as e:
		print(e, flush=True)

	finally:

		if (
			key_hash and
			clients.get(key_hash) is ws
		):
			del clients[key_hash]

		print(
			"[disconnected]",
			flush=True,
		)

	return ws


app = web.Application(
	client_max_size=MAX_BODY,
)

app.router.add_get(
	"/health",
	health,
)

app.router.add_route(
	"*",
	"/ws",
	websocket_handler,
)

app.router.add_route(
	"*",
	"/{tail:.*}",
	api,
)


if __name__ == "__main__":

	print(
		f"Listening on {HOST}:{PORT}",
		flush=True,
	)

	web.run_app(
		app,
		host=HOST,
		port=PORT,
	)