import socket import threading import pickle import time import uuid import math HOST = '127.0.0.1' PORT = 65432 MOVE_SPEED = 200 # pixels per second for players TICKRATE = 128 # ticks per second (sub-tick simulation) MAX_USERS = 100 BULLET_SPEED = 400 # pixels per second BULLET_LIFETIME = 2.0 # seconds BULLET_DAMAGE = 25 # damage per bullet PLAYER_RADIUS = 20 BULLET_RADIUS = 4 SMOKE_GRENADE_FUSE = 0.5 # seconds before smoke activates SMOKE_DURATION = 5.0 # active smoke duration (seconds) FLASH_GRENADE_FUSE = 0.3 # seconds before flash activates FLASH_DURATION = 1.0 # active flash duration (seconds) # Define map obstacles. map_obstacles = [ {"x": 200, "y": 150, "width": 100, "height": 300}, {"x": 500, "y": 100, "width": 50, "height": 400}, {"x": 100, "y": 500, "width": 600, "height": 50}, ] def circle_rect_collision(cx, cy, radius, rect): closest_x = max(rect["x"], min(cx, rect["x"] + rect["width"])) closest_y = max(rect["y"], min(cy, rect["y"] + rect["height"])) dx = cx - closest_x dy = cy - closest_y return (dx*dx + dy*dy) < (radius*radius) def circle_circle_collision(x1, y1, r1, x2, y2, r2): dx = x1 - x2 dy = y1 - y2 return (dx*dx + dy*dy) < ((r1 + r2) ** 2) class Event: def __init__(self, name, payload): self.name = name self.payload = payload def recvall(sock, n): data = b'' while len(data) < n: packet = sock.recv(n - len(data)) if not packet: return None data += packet return data def send_event(sock, event): data = pickle.dumps(event) length = len(data) sock.sendall(length.to_bytes(4, byteorder='big') + data) def recv_event(sock): raw_len = recvall(sock, 4) if not raw_len: return None msg_len = int.from_bytes(raw_len, byteorder='big') data = recvall(sock, msg_len) return pickle.loads(data) # Global dictionaries. event_subscriptions = {} # event name -> list of client sockets client_ids = {} # conn -> client_id client_states = {} # conn -> {"client_id", "username", "x", "y", "keys", "mouse_x", "mouse_y", "health"} lock = threading.Lock() bullets = [] # List of dicts: {"x", "y", "dx", "dy", "life", "shooter"} grenades = [] # List of dicts: {"type": "smoke" or "flash", "x", "y", "fuse", "duration"} def handle_client(conn, addr): client_id = str(uuid.uuid4()) with lock: client_ids[conn] = client_id client_states[conn] = { "client_id": client_id, "username": "Guest", "x": 100, "y": 100, "keys": set(), "mouse_x": 0, "mouse_y": 0, "health": 100 } print(f"[NEW CONNECTION] {addr} connected as {client_id}") try: send_event(conn, Event("self_id", {"client_id": client_id})) server_info = { "tickrate": TICKRATE, "total_users": len(client_states), "max_users": MAX_USERS, "server_ip": HOST, "server_port": PORT, "server_name": "My Multiplayer Shooter Server", "map": map_obstacles } send_event(conn, Event("server_info", server_info)) except Exception as e: print(f"[ERROR] sending self_id/server_info: {e}") broadcast_event("client_connect", {"client_id": client_id}) try: while True: event = recv_event(conn) if event is None: break if event.name == "register_event": event_name = event.payload.get('event') with lock: event_subscriptions.setdefault(event_name, []).append(conn) print(f"[REGISTER] {client_id} subscribed to '{event_name}'") elif event.name == "set_username": username = event.payload.get('username', 'Guest') with lock: if conn in client_states: client_states[conn]['username'] = username print(f"[USERNAME] {client_id} set username to '{username}'") elif event.name == "keydown": key = event.payload.get('key') with lock: if conn in client_states: client_states[conn]['keys'].add(key) print(f"[KEYDOWN] {client_id} key '{key}' pressed") elif event.name == "keyup": key = event.payload.get('key') with lock: if conn in client_states and key in client_states[conn]['keys']: client_states[conn]['keys'].remove(key) print(f"[KEYUP] {client_id} key '{key}' released") elif event.name == "mouse_move": x = event.payload.get('x', 0) y = event.payload.get('y', 0) with lock: if conn in client_states: client_states[conn]['mouse_x'] = x client_states[conn]['mouse_y'] = y elif event.name == "shoot": angle = event.payload.get("angle") with lock: if conn in client_states: state = client_states[conn] bx = state["x"] by = state["y"] dx = math.cos(angle) * BULLET_SPEED dy = math.sin(angle) * BULLET_SPEED bullet = { "x": bx, "y": by, "dx": dx, "dy": dy, "life": BULLET_LIFETIME, "shooter": client_ids[conn] } bullets.append(bullet) print(f"[SHOOT] {client_ids[conn]} fired a bullet") elif event.name == "throw_smoke": with lock: if conn in client_states: state = client_states[conn] grenade = { "type": "smoke", "x": state["x"], "y": state["y"], "fuse": SMOKE_GRENADE_FUSE, "duration": SMOKE_DURATION } grenades.append(grenade) print(f"[GRENADE] {client_ids[conn]} threw a smoke grenade") elif event.name == "throw_flash": with lock: if conn in client_states: state = client_states[conn] grenade = { "type": "flash", "x": state["x"], "y": state["y"], "fuse": FLASH_GRENADE_FUSE, "duration": FLASH_DURATION } grenades.append(grenade) print(f"[GRENADE] {client_ids[conn]} threw a flash grenade") elif event.name == "ping": timestamp = event.payload.get('timestamp') send_event(conn, Event("pong", {"timestamp": timestamp})) except Exception as e: print(f"[ERROR] Exception with {client_ids.get(conn, 'unknown')}: {e}") finally: with lock: for subs in event_subscriptions.values(): if conn in subs: subs.remove(conn) client_ids.pop(conn, None) client_states.pop(conn, None) print(f"[DISCONNECT] {client_id} disconnected.") broadcast_event("client_disconnect", {"client_id": client_id}) conn.close() def broadcast_event(event_name, payload): event = Event(event_name, payload) with lock: receivers = event_subscriptions.get(event_name, []) for client in receivers[:]: try: send_event(client, event) except Exception as e: print(f"[ERROR] Failed to send to client, removing: {e}") receivers.remove(client) def game_loop(): global bullets, grenades dt = 1 / TICKRATE while True: time.sleep(dt) with lock: # Update player positions with axis separation and collision. for state in client_states.values(): # Horizontal movement. dx = 0 if 'left' in state['keys']: dx -= MOVE_SPEED * dt if 'right' in state['keys']: dx += MOVE_SPEED * dt new_x = state['x'] + dx colliding = False for obs in map_obstacles: if circle_rect_collision(new_x, state['y'], PLAYER_RADIUS, obs): colliding = True break if not colliding: state['x'] = new_x # Vertical movement. dy = 0 if 'up' in state['keys']: dy -= MOVE_SPEED * dt if 'down' in state['keys']: dy += MOVE_SPEED * dt new_y = state['y'] + dy colliding = False for obs in map_obstacles: if circle_rect_collision(state['x'], new_y, PLAYER_RADIUS, obs): colliding = True break if not colliding: state['y'] = new_y # Update bullets. new_bullets = [] for bullet in bullets: bullet["x"] += bullet["dx"] * dt bullet["y"] += bullet["dy"] * dt bullet["life"] -= dt if bullet["life"] <= 0: continue # Check collision with obstacles. hit_obstacle = False for obs in map_obstacles: if circle_rect_collision(bullet["x"], bullet["y"], BULLET_RADIUS, obs): hit_obstacle = True break if hit_obstacle: continue # Check collision with players. hit_player = False for state in client_states.values(): if state["client_id"] == bullet["shooter"]: continue if circle_circle_collision(bullet["x"], bullet["y"], BULLET_RADIUS, state["x"], state["y"], PLAYER_RADIUS): state["health"] -= BULLET_DAMAGE if state["health"] <= 0: # Respawn the player. state["health"] = 100 state["x"] = 100 state["y"] = 100 hit_player = True break if hit_player: continue new_bullets.append(bullet) bullets = new_bullets # Update grenades. new_grenades = [] for gren in grenades: if gren["fuse"] > 0: gren["fuse"] -= dt new_grenades.append(gren) else: gren["duration"] -= dt if gren["duration"] > 0: new_grenades.append(gren) grenades = new_grenades # Build payload. players_payload = {} for state in client_states.values(): players_payload[state['client_id']] = { "username": state['username'], "x": state['x'], "y": state['y'], "mouse_x": state.get('mouse_x', 0), "mouse_y": state.get('mouse_y', 0), "health": state.get('health', 100) } total_users = len(client_states) payload = { "players": players_payload, "bullets": bullets, "grenades": grenades, "total_users": total_users, "max_users": MAX_USERS } broadcast_event("state_update", payload) def start_server(): print("[STARTING] Server is starting...") threading.Thread(target=game_loop, daemon=True).start() with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((HOST, PORT)) s.listen() print(f"[LISTENING] Server is listening on {HOST}:{PORT}") while True: conn, addr = s.accept() threading.Thread(target=handle_client, args=(conn, addr), daemon=True).start() if __name__ == "__main__": start_server()