commit f0f4e70e4fa1cef653c65bbe63b51666aa603ebe Author: chelsea Date: Thu Mar 26 19:43:08 2026 -0500 Add sat.py diff --git a/sat.py b/sat.py new file mode 100644 index 0000000..ad220f5 --- /dev/null +++ b/sat.py @@ -0,0 +1,353 @@ +#!/usr/bin/env python3 +""" +sat_proxy.py — SOCKS5 proxy with satellite link condition simulation + +Injects realistic latency, jitter, and packet loss based on configurable +satellite link profiles (GEO, MEO, LEO, VLEO) and weather conditions. + +Usage: + python3 sat_proxy.py [--host 127.0.0.1] [--port 1080] [--profile leo] [--weather clear] + +Then point your browser/app at SOCKS5 127.0.0.1:1080 + +Profiles: geo | meo | leo | vleo +Weather: clear | light | heavy | storm | snow +""" + +import asyncio +import argparse +import logging +import random +import struct +import time +import math +from dataclasses import dataclass + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%H:%M:%S", +) +log = logging.getLogger("sat_proxy") + +# --------------------------------------------------------------------------- +# Link profiles — mirrors the simulator widget physics +# --------------------------------------------------------------------------- + +@dataclass +class LinkProfile: + name: str + altitude_km: float # orbital altitude + base_rtt_ms: float # processing + queuing overhead on top of propagation + jitter_base_ms: float # p99 jitter baseline + loss_base_pct: float # baseline packet loss % + +PROFILES = { + "geo": LinkProfile("GEO", 35_786, 30, 12, 0.10), + "meo": LinkProfile("MEO", 8_000, 20, 8, 0.30), + "oneweb": LinkProfile("OneWeb", 1_200, 14, 6, 0.35), # Eutelsat OneWeb — 1200 km polar LEO, ~70ms RTT + "leo": LinkProfile("LEO", 550, 8, 3, 0.50), # Starlink Gen1/2 + "vleo": LinkProfile("VLEO", 300, 6, 5, 0.80), +} + +@dataclass +class WeatherCondition: + name: str + rain_fade_db: float # dB of rain fade + loss_add_pct: float # additional loss % + +WEATHER = { + "clear": WeatherCondition("Clear", 0.0, 0.0), + "light": WeatherCondition("Light rain", 1.5, 0.3), + "heavy": WeatherCondition("Heavy rain", 5.0, 1.2), + "storm": WeatherCondition("Thunderstorm",12.0, 4.5), + "snow": WeatherCondition("Snow/ice", 2.5, 0.8), +} + +BAND_RAIN_SENS = { + "ku": 1.0, + "ka": 2.8, + "v": 5.0, + "l": 0.1, +} + +# --------------------------------------------------------------------------- +# Link condition calculator +# --------------------------------------------------------------------------- + +class LinkConditions: + def __init__(self, profile: LinkProfile, weather: WeatherCondition, + elevation_deg: float = 45.0, band: str = "ku"): + self.profile = profile + self.weather = weather + self.elevation_deg = elevation_deg + self.band = band + self._update() + + def _update(self): + elev_rad = math.radians(self.elevation_deg) + slant_km = self.profile.altitude_km / math.sin(elev_rad) + prop_ms = slant_km / 300.0 # speed of light + elev_factor = 1.5 - math.sin(elev_rad) + rain_sens = BAND_RAIN_SENS.get(self.band, 1.0) + + self.one_way_ms = prop_ms + self.profile.base_rtt_ms / 2 + self.rtt_ms = prop_ms * 2 + self.profile.base_rtt_ms + (90 - self.elevation_deg) * 0.3 + self.loss_pct = min(40.0, max(0.0, + self.profile.loss_base_pct + + self.weather.loss_add_pct * rain_sens * elev_factor + )) + self.jitter_ms = ( + self.profile.jitter_base_ms + + self.weather.rain_fade_db * rain_sens * 0.8 + + (90 - self.elevation_deg) * 0.15 + ) + + def sample_delay_ms(self) -> float: + """Return a one-way delay sample with jitter applied.""" + jitter = random.gauss(0, self.jitter_ms / 2) + return max(0, self.one_way_ms + jitter) + + def should_drop(self) -> bool: + """Return True if this chunk should be dropped.""" + return random.random() * 100 < self.loss_pct + + def summary(self) -> str: + return ( + f"Profile={self.profile.name} Weather={self.weather.name} " + f"Elevation={self.elevation_deg}° Band={self.band.upper()}\n" + f" One-way delay : {self.one_way_ms:.1f} ms | " + f"RTT : {self.rtt_ms:.1f} ms | " + f"Jitter(p99) : {self.jitter_ms:.1f} ms | " + f"Loss : {self.loss_pct:.2f}%" + ) + +# --------------------------------------------------------------------------- +# SOCKS5 constants +# --------------------------------------------------------------------------- + +SOCKS5_VERSION = 0x05 +NO_AUTH = 0x00 +NO_ACCEPTABLE = 0xFF +CMD_CONNECT = 0x01 +ATYP_IPV4 = 0x01 +ATYP_DOMAIN = 0x03 +ATYP_IPV6 = 0x04 +REP_SUCCESS = 0x00 +REP_FAILURE = 0x01 +REP_REFUSED = 0x05 +REP_HOSTUNREACH= 0x04 + +# --------------------------------------------------------------------------- +# Impaired pipe — wraps a stream pair and injects link conditions +# --------------------------------------------------------------------------- + +class ImpairedPipe: + """Forward data between two stream pairs with satellite impairments.""" + + def __init__(self, conditions: LinkConditions, tag: str): + self.conditions = conditions + self.tag = tag + self._bytes_in = 0 + self._bytes_out = 0 + self._dropped = 0 + self._chunks = 0 + + async def _forward(self, reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, direction: str): + try: + while True: + chunk = await reader.read(4096) + if not chunk: + break + self._chunks += 1 + + if self.conditions.should_drop(): + self._dropped += 1 + log.debug(f"[{self.tag}] DROP {direction} {len(chunk)}B") + continue + + delay = self.conditions.sample_delay_ms() / 1000.0 + await asyncio.sleep(delay) + + writer.write(chunk) + await writer.drain() + + if direction == ">>": + self._bytes_out += len(chunk) + else: + self._bytes_in += len(chunk) + + except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError): + pass + finally: + try: + writer.close() + except Exception: + pass + + async def run(self, + client_r: asyncio.StreamReader, client_w: asyncio.StreamWriter, + remote_r: asyncio.StreamReader, remote_w: asyncio.StreamWriter): + t0 = time.monotonic() + await asyncio.gather( + self._forward(client_r, remote_w, ">>"), + self._forward(remote_r, client_w, "<<"), + ) + elapsed = time.monotonic() - t0 + loss_rate = (self._dropped / self._chunks * 100) if self._chunks else 0 + log.info( + f"[{self.tag}] closed " + f"↑{self._bytes_out//1024}KB ↓{self._bytes_in//1024}KB " + f"chunks={self._chunks} dropped={self._dropped} ({loss_rate:.1f}%) " + f"elapsed={elapsed:.1f}s" + ) + +# --------------------------------------------------------------------------- +# SOCKS5 handshake helpers +# --------------------------------------------------------------------------- + +async def socks5_handshake(reader: asyncio.StreamReader, + writer: asyncio.StreamWriter) -> tuple[str, int] | None: + """ + Perform SOCKS5 negotiation (no-auth only). + Returns (host, port) on success, None on failure. + """ + # --- method negotiation --- + header = await reader.readexactly(2) + if header[0] != SOCKS5_VERSION: + writer.close() + return None + n_methods = header[1] + methods = await reader.readexactly(n_methods) + if NO_AUTH not in methods: + writer.write(bytes([SOCKS5_VERSION, NO_ACCEPTABLE])) + await writer.drain() + writer.close() + return None + writer.write(bytes([SOCKS5_VERSION, NO_AUTH])) + await writer.drain() + + # --- request --- + req = await reader.readexactly(4) + if req[0] != SOCKS5_VERSION or req[2] != 0x00: + writer.close() + return None + if req[1] != CMD_CONNECT: + # only CONNECT supported + writer.write(bytes([SOCKS5_VERSION, 0x07, 0x00, ATYP_IPV4, 0,0,0,0, 0,0])) + await writer.drain() + writer.close() + return None + + atyp = req[3] + if atyp == ATYP_IPV4: + addr_bytes = await reader.readexactly(4) + host = ".".join(str(b) for b in addr_bytes) + elif atyp == ATYP_DOMAIN: + length = (await reader.readexactly(1))[0] + host = (await reader.readexactly(length)).decode() + elif atyp == ATYP_IPV6: + addr_bytes = await reader.readexactly(16) + import ipaddress + host = str(ipaddress.IPv6Address(addr_bytes)) + else: + writer.close() + return None + + port_bytes = await reader.readexactly(2) + port = struct.unpack("!H", port_bytes)[0] + return host, port + + +def socks5_reply(success: bool, bind_addr: str = "0.0.0.0", bind_port: int = 0) -> bytes: + rep = REP_SUCCESS if success else REP_FAILURE + addr_bytes = bytes(int(x) for x in bind_addr.split(".")) + port_bytes = struct.pack("!H", bind_port) + return bytes([SOCKS5_VERSION, rep, 0x00, ATYP_IPV4]) + addr_bytes + port_bytes + +# --------------------------------------------------------------------------- +# Connection handler +# --------------------------------------------------------------------------- + +_conn_counter = 0 + +async def handle_client(client_r: asyncio.StreamReader, + client_w: asyncio.StreamWriter, + conditions: LinkConditions): + global _conn_counter + _conn_counter += 1 + tag = f"conn{_conn_counter:04d}" + peer = client_w.get_extra_info("peername", ("?", 0)) + + try: + result = await asyncio.wait_for(socks5_handshake(client_r, client_w), timeout=10) + except asyncio.TimeoutError: + log.warning(f"[{tag}] handshake timeout from {peer}") + client_w.close() + return + + if result is None: + return + + host, port = result + log.info(f"[{tag}] CONNECT {host}:{port} (delay~{conditions.one_way_ms:.0f}ms loss={conditions.loss_pct:.1f}%)") + + try: + remote_r, remote_w = await asyncio.wait_for( + asyncio.open_connection(host, port), timeout=15 + ) + except (OSError, asyncio.TimeoutError) as e: + log.warning(f"[{tag}] upstream failed: {e}") + client_w.write(socks5_reply(False)) + await client_w.drain() + client_w.close() + return + + client_w.write(socks5_reply(True)) + await client_w.drain() + + pipe = ImpairedPipe(conditions, tag) + await pipe.run(client_r, client_w, remote_r, remote_w) + +# --------------------------------------------------------------------------- +# CLI entrypoint +# --------------------------------------------------------------------------- + +async def main(): + parser = argparse.ArgumentParser(description="SOCKS5 satellite link simulator proxy") + parser.add_argument("--host", default="127.0.0.1", help="Bind address (default: 127.0.0.1)") + parser.add_argument("--port", default=1080, type=int, help="Bind port (default: 1080)") + parser.add_argument("--profile", default="leo", choices=PROFILES.keys(), help="Satellite profile: geo | meo | oneweb | leo | vleo") + parser.add_argument("--weather", default="clear", choices=WEATHER.keys(), help="Weather condition") + parser.add_argument("--elevation", default=45.0, type=float, help="Elevation angle degrees (5-90)") + parser.add_argument("--band", default="ku", choices=BAND_RAIN_SENS.keys(), help="Frequency band") + parser.add_argument("--verbose", action="store_true", help="Debug logging") + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + profile = PROFILES[args.profile] + weather = WEATHER[args.weather] + conditions = LinkConditions(profile, weather, args.elevation, args.band) + + log.info("=" * 60) + log.info("sat_proxy.py — satellite link simulator") + log.info(conditions.summary()) + log.info(f"Listening on {args.host}:{args.port}") + log.info("=" * 60) + + server = await asyncio.start_server( + lambda r, w: handle_client(r, w, conditions), + args.host, args.port + ) + + async with server: + await server.serve_forever() + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + log.info("Shutting down.")