#!/usr/bin/env python3 """ sat_proxy.py — SOCKS5 proxy with satellite link condition simulation Injects realistic latency, jitter, and packet loss based on configurable satellite link profiles (GEO, MEO, LEO, VLEO) and weather conditions. Usage: python3 sat_proxy.py [--host 127.0.0.1] [--port 1080] [--profile leo] [--weather clear] Then point your browser/app at SOCKS5 127.0.0.1:1080 Profiles: geo | meo | leo | vleo Weather: clear | light | heavy | storm | snow """ import asyncio import argparse import logging import random import struct import time import math from dataclasses import dataclass logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger("sat_proxy") # --------------------------------------------------------------------------- # Link profiles — mirrors the simulator widget physics # --------------------------------------------------------------------------- @dataclass class LinkProfile: name: str altitude_km: float # orbital altitude base_rtt_ms: float # processing + queuing overhead on top of propagation jitter_base_ms: float # p99 jitter baseline loss_base_pct: float # baseline packet loss % PROFILES = { "geo": LinkProfile("GEO", 35_786, 30, 12, 0.10), "meo": LinkProfile("MEO", 8_000, 20, 8, 0.30), "oneweb": LinkProfile("OneWeb", 1_200, 14, 6, 0.35), # Eutelsat OneWeb — 1200 km polar LEO, ~70ms RTT "leo": LinkProfile("LEO", 550, 8, 3, 0.50), # Starlink Gen1/2 "vleo": LinkProfile("VLEO", 300, 6, 5, 0.80), } @dataclass class WeatherCondition: name: str rain_fade_db: float # dB of rain fade loss_add_pct: float # additional loss % WEATHER = { "clear": WeatherCondition("Clear", 0.0, 0.0), "light": WeatherCondition("Light rain", 1.5, 0.3), "heavy": WeatherCondition("Heavy rain", 5.0, 1.2), "storm": WeatherCondition("Thunderstorm",12.0, 4.5), "snow": WeatherCondition("Snow/ice", 2.5, 0.8), } BAND_RAIN_SENS = { "ku": 1.0, "ka": 2.8, "v": 5.0, "l": 0.1, } # --------------------------------------------------------------------------- # Link condition calculator # --------------------------------------------------------------------------- class LinkConditions: def __init__(self, profile: LinkProfile, weather: WeatherCondition, elevation_deg: float = 45.0, band: str = "ku"): self.profile = profile self.weather = weather self.elevation_deg = elevation_deg self.band = band self._update() def _update(self): elev_rad = math.radians(self.elevation_deg) slant_km = self.profile.altitude_km / math.sin(elev_rad) prop_ms = slant_km / 300.0 # speed of light elev_factor = 1.5 - math.sin(elev_rad) rain_sens = BAND_RAIN_SENS.get(self.band, 1.0) self.one_way_ms = prop_ms + self.profile.base_rtt_ms / 2 self.rtt_ms = prop_ms * 2 + self.profile.base_rtt_ms + (90 - self.elevation_deg) * 0.3 self.loss_pct = min(40.0, max(0.0, self.profile.loss_base_pct + self.weather.loss_add_pct * rain_sens * elev_factor )) self.jitter_ms = ( self.profile.jitter_base_ms + self.weather.rain_fade_db * rain_sens * 0.8 + (90 - self.elevation_deg) * 0.15 ) def sample_delay_ms(self) -> float: """Return a one-way delay sample with jitter applied.""" jitter = random.gauss(0, self.jitter_ms / 2) return max(0, self.one_way_ms + jitter) def should_drop(self) -> bool: """Return True if this chunk should be dropped.""" return random.random() * 100 < self.loss_pct def summary(self) -> str: return ( f"Profile={self.profile.name} Weather={self.weather.name} " f"Elevation={self.elevation_deg}° Band={self.band.upper()}\n" f" One-way delay : {self.one_way_ms:.1f} ms | " f"RTT : {self.rtt_ms:.1f} ms | " f"Jitter(p99) : {self.jitter_ms:.1f} ms | " f"Loss : {self.loss_pct:.2f}%" ) # --------------------------------------------------------------------------- # SOCKS5 constants # --------------------------------------------------------------------------- SOCKS5_VERSION = 0x05 NO_AUTH = 0x00 NO_ACCEPTABLE = 0xFF CMD_CONNECT = 0x01 ATYP_IPV4 = 0x01 ATYP_DOMAIN = 0x03 ATYP_IPV6 = 0x04 REP_SUCCESS = 0x00 REP_FAILURE = 0x01 REP_REFUSED = 0x05 REP_HOSTUNREACH= 0x04 # --------------------------------------------------------------------------- # Impaired pipe — wraps a stream pair and injects link conditions # --------------------------------------------------------------------------- class ImpairedPipe: """Forward data between two stream pairs with satellite impairments.""" def __init__(self, conditions: LinkConditions, tag: str): self.conditions = conditions self.tag = tag self._bytes_in = 0 self._bytes_out = 0 self._dropped = 0 self._chunks = 0 async def _forward(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, direction: str): try: while True: chunk = await reader.read(4096) if not chunk: break self._chunks += 1 if self.conditions.should_drop(): self._dropped += 1 log.debug(f"[{self.tag}] DROP {direction} {len(chunk)}B") continue delay = self.conditions.sample_delay_ms() / 1000.0 await asyncio.sleep(delay) writer.write(chunk) await writer.drain() if direction == ">>": self._bytes_out += len(chunk) else: self._bytes_in += len(chunk) except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError): pass finally: try: writer.close() except Exception: pass async def run(self, client_r: asyncio.StreamReader, client_w: asyncio.StreamWriter, remote_r: asyncio.StreamReader, remote_w: asyncio.StreamWriter): t0 = time.monotonic() await asyncio.gather( self._forward(client_r, remote_w, ">>"), self._forward(remote_r, client_w, "<<"), ) elapsed = time.monotonic() - t0 loss_rate = (self._dropped / self._chunks * 100) if self._chunks else 0 log.info( f"[{self.tag}] closed " f"↑{self._bytes_out//1024}KB ↓{self._bytes_in//1024}KB " f"chunks={self._chunks} dropped={self._dropped} ({loss_rate:.1f}%) " f"elapsed={elapsed:.1f}s" ) # --------------------------------------------------------------------------- # SOCKS5 handshake helpers # --------------------------------------------------------------------------- async def socks5_handshake(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> tuple[str, int] | None: """ Perform SOCKS5 negotiation (no-auth only). Returns (host, port) on success, None on failure. """ # --- method negotiation --- header = await reader.readexactly(2) if header[0] != SOCKS5_VERSION: writer.close() return None n_methods = header[1] methods = await reader.readexactly(n_methods) if NO_AUTH not in methods: writer.write(bytes([SOCKS5_VERSION, NO_ACCEPTABLE])) await writer.drain() writer.close() return None writer.write(bytes([SOCKS5_VERSION, NO_AUTH])) await writer.drain() # --- request --- req = await reader.readexactly(4) if req[0] != SOCKS5_VERSION or req[2] != 0x00: writer.close() return None if req[1] != CMD_CONNECT: # only CONNECT supported writer.write(bytes([SOCKS5_VERSION, 0x07, 0x00, ATYP_IPV4, 0,0,0,0, 0,0])) await writer.drain() writer.close() return None atyp = req[3] if atyp == ATYP_IPV4: addr_bytes = await reader.readexactly(4) host = ".".join(str(b) for b in addr_bytes) elif atyp == ATYP_DOMAIN: length = (await reader.readexactly(1))[0] host = (await reader.readexactly(length)).decode() elif atyp == ATYP_IPV6: addr_bytes = await reader.readexactly(16) import ipaddress host = str(ipaddress.IPv6Address(addr_bytes)) else: writer.close() return None port_bytes = await reader.readexactly(2) port = struct.unpack("!H", port_bytes)[0] return host, port def socks5_reply(success: bool, bind_addr: str = "0.0.0.0", bind_port: int = 0) -> bytes: rep = REP_SUCCESS if success else REP_FAILURE addr_bytes = bytes(int(x) for x in bind_addr.split(".")) port_bytes = struct.pack("!H", bind_port) return bytes([SOCKS5_VERSION, rep, 0x00, ATYP_IPV4]) + addr_bytes + port_bytes # --------------------------------------------------------------------------- # Connection handler # --------------------------------------------------------------------------- _conn_counter = 0 async def handle_client(client_r: asyncio.StreamReader, client_w: asyncio.StreamWriter, conditions: LinkConditions): global _conn_counter _conn_counter += 1 tag = f"conn{_conn_counter:04d}" peer = client_w.get_extra_info("peername", ("?", 0)) try: result = await asyncio.wait_for(socks5_handshake(client_r, client_w), timeout=10) except asyncio.TimeoutError: log.warning(f"[{tag}] handshake timeout from {peer}") client_w.close() return if result is None: return host, port = result log.info(f"[{tag}] CONNECT {host}:{port} (delay~{conditions.one_way_ms:.0f}ms loss={conditions.loss_pct:.1f}%)") try: remote_r, remote_w = await asyncio.wait_for( asyncio.open_connection(host, port), timeout=15 ) except (OSError, asyncio.TimeoutError) as e: log.warning(f"[{tag}] upstream failed: {e}") client_w.write(socks5_reply(False)) await client_w.drain() client_w.close() return client_w.write(socks5_reply(True)) await client_w.drain() pipe = ImpairedPipe(conditions, tag) await pipe.run(client_r, client_w, remote_r, remote_w) # --------------------------------------------------------------------------- # CLI entrypoint # --------------------------------------------------------------------------- async def main(): parser = argparse.ArgumentParser(description="SOCKS5 satellite link simulator proxy") parser.add_argument("--host", default="127.0.0.1", help="Bind address (default: 127.0.0.1)") parser.add_argument("--port", default=1080, type=int, help="Bind port (default: 1080)") parser.add_argument("--profile", default="leo", choices=PROFILES.keys(), help="Satellite profile: geo | meo | oneweb | leo | vleo") parser.add_argument("--weather", default="clear", choices=WEATHER.keys(), help="Weather condition") parser.add_argument("--elevation", default=45.0, type=float, help="Elevation angle degrees (5-90)") parser.add_argument("--band", default="ku", choices=BAND_RAIN_SENS.keys(), help="Frequency band") parser.add_argument("--verbose", action="store_true", help="Debug logging") args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) profile = PROFILES[args.profile] weather = WEATHER[args.weather] conditions = LinkConditions(profile, weather, args.elevation, args.band) log.info("=" * 60) log.info("sat_proxy.py — satellite link simulator") log.info(conditions.summary()) log.info(f"Listening on {args.host}:{args.port}") log.info("=" * 60) server = await asyncio.start_server( lambda r, w: handle_client(r, w, conditions), args.host, args.port ) async with server: await server.serve_forever() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: log.info("Shutting down.")