neome.com
[email protected]
← Back    Download
VirtualEnv_Client/client.py
import asyncio
import base64
import binascii
import json
import os
import shutil
import websockets

SERVER_URL = "wss://env.neome.com"

# allow larger websocket messages (base64 is bigger than binary)
WS_MAX_SIZE = 50 * 1024 * 1024

# lock root once (real path)
ROOT = os.path.realpath(".")
WRITE_CLIENT_ID = os.environ.get("NEOME_WRITE_CLIENT_ID") == "1"
ID_FILE = os.path.join(ROOT, "client.id")

def safe_path(path):
	if not isinstance(path, str):
		raise Exception("Invalid path")

	path = path.strip().lstrip("/")

	full = os.path.realpath(os.path.join(ROOT, path))

	if not full.startswith(ROOT + os.sep) and full != ROOT:
		raise Exception("Path escape blocked")

	return full


def handle_command(cmd):
	try:
		if not isinstance(cmd, list) or not cmd:
			return "Invalid command"

		cmd_type = cmd[0]

		# =========================
		# TREE
		# =========================
		if cmd_type == "tree":
			path = cmd[1] if len(cmd) > 1 else "."
			full = safe_path(path)

			if not os.path.exists(full):
				return f"Path not found: {path}"

			if not os.path.isdir(full):
				return f"Not a directory: {path}"

			items = sorted(os.listdir(full))
			return "\n".join(items) if items else "[empty]"

		# =========================
		# VIEW (text only)
		# =========================
		elif cmd_type == "view":
			if len(cmd) < 2:
				return "Missing path"

			path = cmd[1]
			full = safe_path(path)

			if not os.path.exists(full):
				return f"File not found: {path}"

			if not os.path.isfile(full):
				return f"Not a file: {path}"

			with open(full, "rb") as f:
				sample = f.read(4096)

			if b"\x00" in sample:
				return f"Binary file: {path}"

			try:
				with open(full, "r", encoding="utf-8", errors="strict") as f:
					return f.read()
			except UnicodeDecodeError:
				return f"Binary file: {path}"

		# =========================
		# MKDIR
		# =========================
		elif cmd_type == "mkdir":
			if len(cmd) < 2:
				return "Missing path"

			path = cmd[1]
			full = safe_path(path)

			if os.path.exists(full):
				return f"Already exists: {path}"

			os.makedirs(full, exist_ok=True)
			return f"Created folder: {path}"

		# =========================
		# WRITE (text)
		# =========================
		elif cmd_type == "write":
			if len(cmd) < 2:
				return "Missing path"

			path = cmd[1]
			content = cmd[2] if len(cmd) > 2 else ""

			full = safe_path(path)

			parent = os.path.dirname(full)
			if parent:
				os.makedirs(parent, exist_ok=True)

			with open(full, "w", encoding="utf-8") as f:
				f.write(content)

			return f"Wrote {path}"

		# =========================
		# COPY
		# =========================
		elif cmd_type == "copy":
			if len(cmd) < 3:
				return "Missing source or destination"

			src_path = cmd[1]
			dst_path = cmd[2]

			src_full = safe_path(src_path)
			dst_full = safe_path(dst_path)

			if not os.path.exists(src_full):
				return f"Source not found: {src_path}"

			if src_full == dst_full:
				return "Source and destination are the same"

			dst_parent = os.path.dirname(dst_full)
			if dst_parent:
				os.makedirs(dst_parent, exist_ok=True)

			if os.path.isfile(src_full):
				if os.path.isdir(dst_full):
					dst_full = os.path.join(dst_full, os.path.basename(src_full))

				os.makedirs(os.path.dirname(dst_full), exist_ok=True)
				shutil.copy2(src_full, dst_full)
				return f"Copied file: {src_path} -> {dst_path}"

			if os.path.isdir(src_full):
				if os.path.exists(dst_full):
					return f"Destination already exists: {dst_path}"

				shutil.copytree(src_full, dst_full)
				return f"Copied folder: {src_path} -> {dst_path}"

			return "Unknown type"

		# =========================
		# WRITE_B64 (binary-safe)
		# =========================
		elif cmd_type == "write_b64":
			if len(cmd) < 3:
				return "Missing path or base64 data"

			path = cmd[1]
			b64_data = cmd[2]

			if not isinstance(b64_data, str):
				return "Invalid base64 data"

			full = safe_path(path)

			parent = os.path.dirname(full)
			if parent:
				os.makedirs(parent, exist_ok=True)

			try:
				raw = base64.b64decode(b64_data, validate=True)
			except (binascii.Error, ValueError):
				return "Invalid base64 data"

			with open(full, "wb") as f:
				f.write(raw)

			return f"Wrote binary {path} ({len(raw)} bytes)"

		# =========================
		# READ_B64 (download any file)
		# =========================
		elif cmd_type == "read_b64":
			if len(cmd) < 2:
				return "Missing path"

			path = cmd[1]
			full = safe_path(path)

			if not os.path.exists(full):
				return f"File not found: {path}"

			if not os.path.isfile(full):
				return f"Not a file: {path}"

			with open(full, "rb") as f:
				raw = f.read()

			return "B64:" + base64.b64encode(raw).decode("ascii")

		# =========================
		# DELETE
		# =========================
		elif cmd_type == "delete":
			if len(cmd) < 2:
				return "Missing path"

			path = cmd[1]
			full = safe_path(path)

			if not os.path.exists(full):
				return f"Path not found: {path}"

			if os.path.isfile(full):
				os.remove(full)
				return f"Deleted file: {path}"

			if os.path.isdir(full):
				shutil.rmtree(full)
				return f"Deleted folder: {path}"

			return "Unknown type"

		return f"Unknown command: {cmd_type}"

	except Exception as e:
		return f"Error: {str(e)}"


async def send_json(ws, data):
	await ws.send(json.dumps(data))


async def main():
	while True:
		try:
			async with websockets.connect(
				SERVER_URL,
				max_size=WS_MAX_SIZE
			) as ws:
				await send_json(ws, {"role": "client"})
				print("[+] connected to server", flush=True)

				async for raw in ws:
					try:
						data = json.loads(raw)
					except:
						continue

					msg_type = data.get("type")

					if msg_type == "hello" and data.get("id"):
						print("CLIENT_ID:", data["id"], flush=True)

						if WRITE_CLIENT_ID:
							try:
								with open(ID_FILE, "w", encoding="utf-8") as f:
									f.write(data["id"])
							except Exception as e:
								print("[!] failed to write client.id:", e, flush=True)

						continue

					if msg_type == "cmd":
						result = handle_command(data.get("cmd"))

						await send_json(ws, {
							"type": "result",
							"output": result
						})

		except Exception as e:
			print("[!] disconnected:", e, flush=True)
			await asyncio.sleep(2)


if __name__ == "__main__":
	asyncio.run(main())