import socket import threading import pickle import time import uuid import math import json import pygame # For the dashboard # --- Configuration --- HOST = '127.0.0.1' PORT = 65432 TICKRATE = 128 # ticks per second MAX_USERS = 100 # Racing physics constants. ACCELERATION = 300.0 # pixels per second^2 BRAKE_DECELERATION = 400.0 # pixels per second^2 FRICTION = 200.0 # pixels per second^2 TURN_SPEED = math.radians(240) # radians per second MAX_SPEED = 400.0 # pixels per second MIN_SPEED = -200.0 # pixels per second # Collision / rendering radius. PLAYER_RADIUS = 10 # --- Sensor Configuration for Bots --- BOT_SENSOR_RAY_COUNT = 90 BOT_SENSOR_FOV = math.radians(180) # Field of view of 90 degrees SENSOR_MAX_DISTANCE = 150 # Maximum sensor range # --- Load Map from JSON --- try: with open('map.json', 'r') as f: map_data = json.load(f) print("[INFO] Loaded map.json successfully.") except Exception as e: print(f"[ERROR] Could not load map.json: {e}") map_data = { "obstacle": [], "road": [], "checkpoint": [], "finish_line": [] } obstacles = map_data.get("obstacle", []) roads = map_data.get("road", []) checkpoints = map_data.get("checkpoint", []) finish_lines = map_data.get("finish_line", []) # --- Collision Helpers --- def circle_rect_collision(cx, cy, radius, rect): closest_x = max(rect["x"], min(cx, rect["x"] + rect["width"])) closest_y = max(rect["y"], min(cy, rect["y"] + rect["height"])) dx = cx - closest_x dy = cy - closest_y return (dx * dx + dy * dy) < (radius * radius) def rect_collision(cx, cy, rect): return (rect["x"] <= cx <= rect["x"] + rect["width"] and rect["y"] <= cy <= rect["y"] + rect["height"]) # --- Ray Casting for Sensors --- def ray_cast(x, y, angle, max_distance): step = 5 for d in range(0, int(max_distance), step): test_x = x + d * math.cos(angle) test_y = y + d * math.sin(angle) for obs in obstacles: if (obs["x"] <= test_x <= obs["x"] + obs["width"] and obs["y"] <= test_y <= obs["y"] + obs["height"]): return d return max_distance # --- Network Event Class & Helpers --- class Event: def __init__(self, name, payload): self.name = name self.payload = payload def recvall(sock, n): data = b'' while len(data) < n: try: packet = sock.recv(n - len(data)) except Exception as e: return None if not packet: return None data += packet return data def send_event(sock, event): try: data = pickle.dumps(event) length = len(data) sock.sendall(length.to_bytes(4, byteorder='big') + data) except Exception as e: print(f"[ERROR] send_event: {e}") def recv_event(sock): raw_len = recvall(sock, 4) if not raw_len: return None msg_len = int.from_bytes(raw_len, byteorder='big') data = recvall(sock, msg_len) if data is None: return None return pickle.loads(data) # --- Global State --- event_subscriptions = {} # event name -> list of client sockets client_ids = {} # conn -> client_id client_states = {} # conn -> state dict (for human players) bot_states = {} # bot_id -> state dict (with "bot": True) lock = threading.Lock() reset_in_progress = False # Global flag to avoid repeated resets # --- Bot AI (Smart with Configurable Sensors) --- def update_bot(state, dt): # Determine target based on checkpoints or finish line. if checkpoints and state["checkpoint_index"] < len(checkpoints): cp = checkpoints[state["checkpoint_index"]] target_x = cp["x"] + cp["width"] / 2 target_y = cp["y"] + cp["height"] / 2 elif finish_lines: fl = finish_lines[0] # assume one finish line target_x = fl["x"] + fl["width"] / 2 target_y = fl["y"] + fl["height"] / 2 else: target_x = state["x"] + math.cos(state["angle"]) * 100 target_y = state["y"] + math.sin(state["angle"]) * 100 # Incentivize staying on roads. def is_on_road(x, y): for road in roads: if rect_collision(x, y, road): return True return False if not is_on_road(state["x"], state["y"]) and roads: best_distance = float('inf') best_center = None for road in roads: center_x = road["x"] + road["width"] / 2 center_y = road["y"] + road["height"] / 2 d = math.hypot(center_x - state["x"], center_y - state["y"]) if d < best_distance: best_distance = d best_center = (center_x, center_y) if best_center is not None: # Blend target: 70% checkpoint/finish and 30% road center. target_x = 0.7 * target_x + 0.3 * best_center[0] target_y = 0.7 * target_y + 0.3 * best_center[1] state["target"] = (target_x, target_y) # For dashboard drawing # Compute desired angle toward target. desired_angle_target = math.atan2(target_y - state["y"], target_x - state["x"]) # --- Sensor Readings --- sensor_readings = [] for i in range(BOT_SENSOR_RAY_COUNT): if BOT_SENSOR_RAY_COUNT > 1: offset = -BOT_SENSOR_FOV/2 + i * (BOT_SENSOR_FOV/(BOT_SENSOR_RAY_COUNT - 1)) else: offset = 0 sensor_angle = state["angle"] + offset distance = ray_cast(state["x"], state["y"], sensor_angle, SENSOR_MAX_DISTANCE) sensor_readings.append((offset, distance)) state["sensors"] = sensor_readings # --- Compute Obstacle Avoidance Vector --- avoidance_x = 0 avoidance_y = 0 threshold = SENSOR_MAX_DISTANCE * 0.6 # Activate avoidance if obstacle is closer than 60% of max. for offset, distance in sensor_readings: if distance < threshold: repulsion = (threshold - distance) / threshold avoid_angle = state["angle"] + offset avoidance_x -= repulsion * math.cos(avoid_angle) avoidance_y -= repulsion * math.sin(avoid_angle) avoidance_angle = 0 avoidance_weight = 0 if avoidance_x != 0 or avoidance_y != 0: avoidance_angle = math.atan2(avoidance_y, avoidance_x) avoidance_weight = math.hypot(avoidance_x, avoidance_y) # --- Combine Target and Avoidance --- w_target = 1.0 w_avoid = avoidance_weight target_vec = (math.cos(desired_angle_target), math.sin(desired_angle_target)) avoid_vec = (math.cos(avoidance_angle), math.sin(avoidance_angle)) if w_avoid > 0 else (0, 0) combined_x = w_target * target_vec[0] + w_avoid * avoid_vec[0] combined_y = w_target * target_vec[1] + w_avoid * avoid_vec[1] desired_angle = math.atan2(combined_y, combined_x) # --- Determine Steering Keys --- def normalize_angle(a): while a > math.pi: a -= 2*math.pi while a < -math.pi: a += 2*math.pi return a angle_diff = normalize_angle(desired_angle - state["angle"]) desired_keys = {"up"} # Always accelerate. if angle_diff > 0.1: desired_keys.add("right") elif angle_diff < -0.1: desired_keys.add("left") state["keys"] = desired_keys # --- Physics Update for All Entities --- def update_physics(state, dt): if 'up' in state['keys']: state['speed'] += ACCELERATION * dt elif 'down' in state['keys']: state['speed'] -= BRAKE_DECELERATION * dt else: if state['speed'] > 0: state['speed'] -= FRICTION * dt if state['speed'] < 0: state['speed'] = 0 elif state['speed'] < 0: state['speed'] += FRICTION * dt if state['speed'] > 0: state['speed'] = 0 if state['speed'] > MAX_SPEED: state['speed'] = MAX_SPEED if state['speed'] < MIN_SPEED: state['speed'] = MIN_SPEED if 'left' in state['keys']: state['angle'] -= TURN_SPEED * dt if 'right' in state['keys']: state['angle'] += TURN_SPEED * dt new_x = state['x'] + state['speed'] * math.cos(state['angle']) * dt new_y = state['y'] + state['speed'] * math.sin(state['angle']) * dt collision = False for obs in obstacles: if circle_rect_collision(new_x, new_y, PLAYER_RADIUS, obs): collision = True break if not collision: state['x'] = new_x state['y'] = new_y else: state['speed'] = 0 # --- Checkpoints and Finish Line --- if "checkpoint_index" not in state: state["checkpoint_index"] = 0 if "finished" not in state: state["finished"] = False if checkpoints and not state["finished"]: idx = state["checkpoint_index"] if idx < len(checkpoints): cp = checkpoints[idx] if rect_collision(state["x"], state["y"], cp): state["checkpoint_index"] += 1 print(f"[RACE] {state['username']} passed checkpoint {idx+1}") for fl in finish_lines: if rect_collision(state["x"], state["y"], fl) and not state["finished"]: if (checkpoints and state["checkpoint_index"] >= len(checkpoints)) or not checkpoints: state["finished"] = True print(f"[RACE] {state['username']} has finished the race!") # --- Global Reset Function --- def reset_race(): with lock: for state in list(client_states.values()) + list(bot_states.values()): state["x"] = 100 state["y"] = 100 state["angle"] = 0 state["speed"] = 0 state["keys"] = set() state["checkpoint_index"] = 0 state["finished"] = False print("[RACE] Race finished. Resetting all players.") def clear_reset_flag(): global reset_in_progress with lock: reset_in_progress = False # --- Network Client Handler --- def handle_client(conn, addr): client_id = str(uuid.uuid4()) with lock: client_ids[conn] = client_id client_states[conn] = { "client_id": client_id, "username": "Guest", "x": 100, "y": 100, "angle": 0, "speed": 0, "keys": set(), "finished": False, "checkpoint_index": 0 } print(f"[NEW CONNECTION] {addr} connected as {client_id}") try: send_event(conn, Event("self_id", {"client_id": client_id})) server_info = { "tickrate": TICKRATE, "total_users": len(client_states) + len(bot_states), "max_users": MAX_USERS, "server_ip": HOST, "server_port": PORT, "server_name": "Advanced Top Down Racing Server", "map": map_data } send_event(conn, Event("server_info", server_info)) except Exception as e: print(f"[ERROR] sending self_id/server_info: {e}") broadcast_event("client_connect", {"client_id": client_id}) try: while True: event = recv_event(conn) if event is None: break if event.name == "register_event": event_name = event.payload.get('event') with lock: event_subscriptions.setdefault(event_name, []).append(conn) print(f"[REGISTER] {client_id} subscribed to '{event_name}'") elif event.name == "set_username": username = event.payload.get('username', 'Guest') with lock: if conn in client_states: client_states[conn]['username'] = username print(f"[USERNAME] {client_id} set username to '{username}'") elif event.name == "keydown": key = event.payload.get('key') with lock: if conn in client_states: client_states[conn]['keys'].add(key) print(f"[KEYDOWN] {client_id} key '{key}' pressed") elif event.name == "keyup": key = event.payload.get('key') with lock: if conn in client_states and key in client_states[conn]['keys']: client_states[conn]['keys'].remove(key) print(f"[KEYUP] {client_id} key '{key}' released") elif event.name == "ping": timestamp = event.payload.get('timestamp') send_event(conn, Event("pong", {"timestamp": timestamp})) except Exception as e: print(f"[ERROR] Exception with {client_ids.get(conn, 'unknown')}: {e}") finally: with lock: for subs in event_subscriptions.values(): if conn in subs: subs.remove(conn) client_ids.pop(conn, None) client_states.pop(conn, None) print(f"[DISCONNECT] {client_id} disconnected.") broadcast_event("client_disconnect", {"client_id": client_id}) conn.close() def broadcast_event(event_name, payload): event = Event(event_name, payload) with lock: receivers = event_subscriptions.get(event_name, []) for client in receivers[:]: try: send_event(client, event) except Exception as e: print(f"[ERROR] Failed to send to client, removing: {e}") receivers.remove(client) # --- Bot Spawning --- def spawn_bot(): bot_id = str(uuid.uuid4()) bot_state = { "client_id": bot_id, "username": "Bot_" + bot_id[:4], "x": 150, "y": 150, "angle": 0, "speed": 0, "keys": set(), "finished": False, "checkpoint_index": 0, "bot": True } with lock: bot_states[bot_id] = bot_state print(f"[BOT] Spawned bot {bot_state['username']}") # --- Game Loop --- def game_loop(): global reset_in_progress dt = 1 / TICKRATE while True: time.sleep(dt) with lock: all_states = list(client_states.values()) + list(bot_states.values()) for state in all_states: if state.get("bot", False): update_bot(state, dt) update_physics(state, dt) players_payload = {} for state in client_states.values(): players_payload[state['client_id']] = { "username": state['username'], "x": state['x'], "y": state['y'], "angle": state['angle'], "speed": state['speed'], "finished": state.get("finished", False) } for state in bot_states.values(): players_payload[state['client_id']] = { "username": state['username'], "x": state['x'], "y": state['y'], "angle": state['angle'], "speed": state['speed'], "finished": state.get("finished", False), "sensors": state.get("sensors", []), "target": state.get("target", None) } total_users = len(client_states) + len(bot_states) payload = { "players": players_payload, "total_users": total_users, "max_users": MAX_USERS } broadcast_event("state_update", payload) # --- Server Dashboard (Visualization) --- def server_dashboard(): pygame.init() DASH_WIDTH, DASH_HEIGHT = 1200, 900 screen = pygame.display.set_mode((DASH_WIDTH, DASH_HEIGHT)) pygame.display.set_caption("Server Dashboard: Full Race Course View") clock = pygame.time.Clock() font = pygame.font.SysFont(None, 20) while True: for event in pygame.event.get(): if event.type == pygame.QUIT: pygame.quit() return screen.fill((20, 20, 20)) # Draw Roads. for road in roads: rect = pygame.Rect(road["x"], road["y"], road["width"], road["height"]) pygame.draw.rect(screen, (80, 80, 80), rect) # Draw Obstacles. for obs in obstacles: rect = pygame.Rect(obs["x"], obs["y"], obs["width"], obs["height"]) pygame.draw.rect(screen, (150, 50, 50), rect) # Draw Checkpoints. for cp in checkpoints: rect = pygame.Rect(cp["x"], cp["y"], cp["width"], cp["height"]) pygame.draw.rect(screen, (200, 200, 0), rect, 2) # Draw Finish Lines. for fl in finish_lines: rect = pygame.Rect(fl["x"], fl["y"], fl["width"], fl["height"]) pygame.draw.rect(screen, (0, 0, 255), rect, 2) # Draw Players. with lock: all_states = list(client_states.values()) + list(bot_states.values()) for state in all_states: x = int(state["x"]) y = int(state["y"]) color = (0, 255, 0) if not state.get("bot", False) else (255, 165, 0) pygame.draw.circle(screen, color, (x, y), PLAYER_RADIUS) name = state["username"] label = font.render(name, True, (255,255,255)) screen.blit(label, (x - label.get_width() // 2, y - PLAYER_RADIUS - 15)) # For bots, draw sensor rays and target line. if state.get("bot", False): for offset, distance in state.get("sensors", []): ray_angle = state["angle"] + offset end_x = int(x + distance * math.cos(ray_angle)) end_y = int(y + distance * math.sin(ray_angle)) ray_color = (255, 0, 0) if distance < SENSOR_MAX_DISTANCE else (0, 255, 0) pygame.draw.line(screen, ray_color, (x, y), (end_x, end_y), 2) if "target" in state and state["target"]: tx, ty = state["target"] pygame.draw.line(screen, (255,255,255), (x, y), (int(tx), int(ty)), 2) info_text = font.render("Server Dashboard - Full Race Course View", True, (255,255,255)) screen.blit(info_text, (10, 10)) pygame.display.flip() clock.tick(30) # --- Server Startup --- def start_server(): print("[STARTING] Server is starting...") threading.Thread(target=game_loop, daemon=True).start() threading.Thread(target=server_dashboard, daemon=True).start() spawn_bot() # Spawn one bot (add more if desired) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((HOST, PORT)) s.listen() print(f"[LISTENING] Server is listening on {HOST}:{PORT}") while True: conn, addr = s.accept() threading.Thread(target=handle_client, args=(conn, addr), daemon=True).start() if __name__ == "__main__": start_server()