import socket import threading import json import pygame import sys import time HOST = '127.0.0.1' PORT = 65432 MOVE_SPEED = 200 # pixels per second FAKE_LATENCY = 0.1 # seconds of artificial latency for both sending and receiving # Global state players = {} # Server-authoritative state: client_id -> {"username", "x", "y"} my_client_id = None # Our unique client id from the server # Local predicted state for the local player. predicted_state = {"x": 100, "y": 100, "keys": set(), "username": "Guest"} ping = 0 # measured ping in seconds def listen_to_server(sock): """ Listen for incoming messages from the server and update local state. Also performs reconciliation by snapping to the server state when no keys are pressed. Simulates network latency on incoming messages. """ global my_client_id, players, predicted_state, ping file_obj = sock.makefile('r') while True: try: line = file_obj.readline() if not line: break # Simulate fake latency for incoming messages. time.sleep(FAKE_LATENCY) msg = json.loads(line) event_name = msg.get('event') payload = msg.get('payload') if event_name == "self_id": if my_client_id is None: my_client_id = payload.get("client_id") print(f"[INFO] My client ID: {my_client_id}") elif event_name == "client_connect": print(f"[INFO] Client connected: {payload.get('client_id')}") elif event_name == "client_disconnect": client_id = payload.get('client_id') print(f"[INFO] Client disconnected: {client_id}") if client_id in players: del players[client_id] elif event_name == "state_update": # Update our players dictionary from the server. players = payload.get("players", {}) # If we have an update for our own player, reconcile. if my_client_id and my_client_id in players: server_x = players[my_client_id].get("x", 100) server_y = players[my_client_id].get("y", 100) if not predicted_state["keys"]: # Snap immediately if no movement is occurring. predicted_state["x"] = server_x predicted_state["y"] = server_y else: # If keys are pressed, nudge the predicted state gradually. correction_factor = 0.1 # 10% correction per update while moving. predicted_state["x"] += (server_x - predicted_state["x"]) * correction_factor predicted_state["y"] += (server_y - predicted_state["y"]) * correction_factor elif event_name == "pong": sent_time = payload.get('timestamp') if sent_time: round_trip = time.time() - sent_time ping = round_trip print(f"[PING] {ping*1000:.0f} ms") except Exception as e: print(f"[ERROR] {e}") break def send_message(sock, msg_dict): """ Send a JSON message to the server with fake latency. """ time.sleep(FAKE_LATENCY) # simulate network latency on outgoing messages message = json.dumps(msg_dict) + "\n" sock.sendall(message.encode()) def ping_loop(sock): """ Periodically send ping messages to measure latency. """ while True: ts = time.time() send_message(sock, {'type': 'ping', 'timestamp': ts}) time.sleep(1) def main(): global my_client_id, predicted_state username = input("Enter your username: ") predicted_state["username"] = username # Connect to the server. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((HOST, PORT)) # Start threads to listen to the server and to send pings. threading.Thread(target=listen_to_server, args=(sock,), daemon=True).start() threading.Thread(target=ping_loop, args=(sock,), daemon=True).start() # Register for needed events. default_events = ["client_connect", "client_disconnect", "state_update"] for event in default_events: send_message(sock, {'type': 'register_event', 'event': event}) print(f"[INFO] Registered for event: {event}") # Inform the server of our chosen username. send_message(sock, {'type': 'set_username', 'username': username}) # Initialize Pygame. pygame.init() screen = pygame.display.set_mode((800, 600)) pygame.display.set_caption("Pygame Client - Multiplayer (Ghost vs Client)") clock = pygame.time.Clock() # Define colors. ghost_color = (0, 255, 0) # Green for the server (ghost) state. client_color = (0, 0, 255) # Blue for the client (predicted) state. other_color = (255, 0, 0) # Red for other players. # Local set to track pressed keys. local_keys = set() while True: dt = clock.tick(60) / 1000.0 # Frame time (aiming for 60 FPS) for event in pygame.event.get(): if event.type == pygame.QUIT: pygame.quit() sys.exit() elif event.type == pygame.KEYDOWN: key_name = None if event.key == pygame.K_LEFT: key_name = "left" elif event.key == pygame.K_RIGHT: key_name = "right" elif event.key == pygame.K_UP: key_name = "up" elif event.key == pygame.K_DOWN: key_name = "down" if key_name and key_name not in local_keys: local_keys.add(key_name) predicted_state["keys"].add(key_name) send_message(sock, {'type': 'keydown', 'key': key_name}) elif event.type == pygame.KEYUP: key_name = None if event.key == pygame.K_LEFT: key_name = "left" elif event.key == pygame.K_RIGHT: key_name = "right" elif event.key == pygame.K_UP: key_name = "up" elif event.key == pygame.K_DOWN: key_name = "down" if key_name and key_name in local_keys: local_keys.remove(key_name) if key_name in predicted_state["keys"]: predicted_state["keys"].remove(key_name) send_message(sock, {'type': 'keyup', 'key': key_name}) # Update the predicted state based on locally pressed keys. dx, dy = 0, 0 if "left" in predicted_state["keys"]: dx -= MOVE_SPEED * dt if "right" in predicted_state["keys"]: dx += MOVE_SPEED * dt if "up" in predicted_state["keys"]: dy -= MOVE_SPEED * dt if "down" in predicted_state["keys"]: dy += MOVE_SPEED * dt predicted_state["x"] += dx predicted_state["y"] += dy # Get ghost (server) position without interpolation. if my_client_id and my_client_id in players: ghost_x = players[my_client_id].get("x", 100) ghost_y = players[my_client_id].get("y", 100) else: ghost_x, ghost_y = predicted_state["x"], predicted_state["y"] # Render the scene. screen.fill((0, 0, 0)) # Draw remote players (using server state). for cid, data in players.items(): if cid == my_client_id: continue x = int(data.get("x", 100)) y = int(data.get("y", 100)) uname = data.get("username", "Guest") pygame.draw.circle(screen, other_color, (x, y), 20) font = pygame.font.SysFont(None, 24) text_surface = font.render(uname, True, (255, 255, 255)) text_rect = text_surface.get_rect(center=(x, y - 30)) screen.blit(text_surface, text_rect) # Draw the ghost (server-authoritative) position as an outlined circle. font = pygame.font.SysFont(None, 24) ghost_text = font.render("Ghost", True, (255, 255, 255)) ghost_rect = ghost_text.get_rect(center=(int(ghost_x), int(ghost_y) - 30)) pygame.draw.circle(screen, ghost_color, (int(ghost_x), int(ghost_y)), 20, 2) screen.blit(ghost_text, ghost_rect) # Draw the client-predicted position as the actual player (filled circle). pred_x = int(predicted_state.get("x", 100)) pred_y = int(predicted_state.get("y", 100)) font = pygame.font.SysFont(None, 24) client_text = font.render(predicted_state.get("username", "Guest"), True, (255, 255, 255)) client_rect = client_text.get_rect(center=(pred_x, pred_y - 30)) pygame.draw.circle(screen, client_color, (pred_x, pred_y), 20) screen.blit(client_text, client_rect) # Display ping on the screen. ping_font = pygame.font.SysFont(None, 24) ping_surface = ping_font.render(f"Ping: {int(ping*1000)} ms", True, (255, 255, 0)) screen.blit(ping_surface, (10, 10)) pygame.display.flip() if __name__ == "__main__": main()