#!/usr/bin/env python3 import socket import struct import threading import tkinter as tk from tkinter import scrolledtext, messagebox import queue import sys import time # Import time module for current timestamp in ms PACKET_NAMES = { 1: "HEARTBEAT", 2: "JOIN", 3: "DISCONNECT", 4: "MOVE", 5: "CHAT", 6: "PLAYER_UPDATE", 7: "SCORE_UPDATE", 8: "GAME_START", 9: "GAME_END", 10: "PING", 11: "PONG", 12: "AUTH_REQUEST", 13: "AUTH_RESPONSE", 14: "PACKET_MAX" } class PacketClient: def __init__(self, server_ip, server_port): self.server_ip = server_ip self.server_port = server_port self.sock = None self.running = False self.recv_queue = queue.Queue() # For thread-safe packet passing def connect(self): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: self.sock.connect((self.server_ip, self.server_port)) except Exception as e: raise RuntimeError(f"Failed to connect to {self.server_ip}:{self.server_port}: {e}") self.running = True threading.Thread(target=self.receive_loop, daemon=True).start() def receive_loop(self): while self.running: try: header = self._recv_all(4) if header is None: break (payload_len,) = struct.unpack("!I", header) payload = self._recv_all(payload_len) if payload is None: break packet_type = payload[0] data = payload[1:].decode("utf-8", errors="replace") # Enqueue the packet for the GUI. self.recv_queue.put((packet_type, data)) except Exception as e: self.recv_queue.put(("Error", str(e))) break self.running = False def _recv_all(self, n): data = b"" while len(data) < n: try: chunk = self.sock.recv(n - len(data)) except Exception: return None if not chunk: return None data += chunk return data def send_packet(self, packet_type, data): data_bytes = data.encode("utf-8") payload = struct.pack("!B", packet_type) + data_bytes packet = struct.pack("!I", len(payload)) + payload try: self.sock.sendall(packet) except Exception as e: raise RuntimeError(f"Failed to send packet: {e}") def close(self): self.running = False if self.sock: self.sock.close() # The GUI application. class PacketClientGUI: def __init__(self, master, client: PacketClient): self.master = master self.client = client master.title("Packet Client GUI") # Create a scrolling text widget for received packets. self.received_text = scrolledtext.ScrolledText(master, width=80, height=20, state="disabled") self.received_text.grid(row=0, column=0, columnspan=4, padx=10, pady=10) # Packet type label and entry. tk.Label(master, text="Packet Type:").grid(row=1, column=0, sticky="e", padx=(10,2)) self.type_entry = tk.Entry(master, width=5) self.type_entry.grid(row=1, column=1, sticky="w") self.type_entry.insert(0, "4") # Default to MOVE packet # Data label and entry. tk.Label(master, text="Data:").grid(row=1, column=2, sticky="e", padx=(10,2)) self.data_entry = tk.Entry(master, width=40) self.data_entry.grid(row=1, column=3, sticky="w", padx=(0,10)) # Send button. self.send_button = tk.Button(master, text="Send Packet", command=self.send_packet) self.send_button.grid(row=2, column=0, columnspan=4, pady=(5,10)) # Start periodic GUI update. self.master.after(100, self.process_recv_queue) def process_recv_queue(self): while not self.client.recv_queue.empty(): pkt_type, data = self.client.recv_queue.get() # For heartbeat packets (type 1), calculate ping and update window title. if pkt_type == 1: try: serverTimestamp = int(data) msNow = int(time.time() * 1000) ping = msNow - serverTimestamp self.master.title(f"Heartbeat: {ping} ms") except Exception as e: # If there's an error parsing the timestamp, show the error in the log. self.append_text(f"Error processing HEARTBEAT: {e}\n") continue # For ping packets (type 10), update the window title using the packet data. if pkt_type == 10: self.master.title(data) continue if isinstance(pkt_type, int): name = PACKET_NAMES.get(pkt_type, f"Unknown ({pkt_type})") else: name = pkt_type # For error messages self.append_text(f"{name} > {data}\n") self.master.after(100, self.process_recv_queue) def append_text(self, text): self.received_text.config(state="normal") self.received_text.insert(tk.END, text) self.received_text.see(tk.END) self.received_text.config(state="disabled") def send_packet(self): try: pkt_type = int(self.type_entry.get()) except ValueError: tk.messagebox.showerror("Error", "Packet type must be an integer") return data = self.data_entry.get() try: self.client.send_packet(pkt_type, data) # Show the packet sent, using the name if available. pkt_name = PACKET_NAMES.get(pkt_type, f"Unknown ({pkt_type})") self.append_text(f"Sent Packet | Type: {pkt_name} | Data: {data}\n") except Exception as e: tk.messagebox.showerror("Error", str(e)) def main(): if len(sys.argv) < 3: print("Usage: python packet_client_gui.py ") sys.exit(1) server_ip = sys.argv[1] server_port = int(sys.argv[2]) client = PacketClient(server_ip, server_port) try: client.connect() except Exception as e: print(e) sys.exit(1) root = tk.Tk() app = PacketClientGUI(root, client) try: root.mainloop() finally: client.close() if __name__ == "__main__": main()