MPGC/test.py
OusmBlueNinja e4117b60ef Yeaaaa
2025-04-03 17:22:17 -05:00

174 lines
5.6 KiB
Python

#!/usr/bin/env python3
import socket
import struct
import threading
import tkinter as tk
from tkinter import scrolledtext, messagebox
import queue
import sys
PACKET_NAMES = {
1: "HEARTBEAT",
2: "JOIN",
3: "DISCONNECT",
4: "MOVE",
5: "CHAT",
6: "PLAYER_UPDATE",
7: "SCORE_UPDATE",
8: "GAME_START",
9: "GAME_END",
10: "PING",
11: "PONG",
12: "AUTH_REQUEST",
13: "AUTH_RESPONSE",
14: "PACKET_MAX"
}
class PacketClient:
def __init__(self, server_ip, server_port):
self.server_ip = server_ip
self.server_port = server_port
self.sock = None
self.running = False
self.recv_queue = queue.Queue() # For thread-safe packet passing
def connect(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.sock.connect((self.server_ip, self.server_port))
except Exception as e:
raise RuntimeError(f"Failed to connect to {self.server_ip}:{self.server_port}: {e}")
self.running = True
threading.Thread(target=self.receive_loop, daemon=True).start()
def receive_loop(self):
while self.running:
try:
header = self._recv_all(4)
if header is None:
break
(payload_len,) = struct.unpack("!I", header)
payload = self._recv_all(payload_len)
if payload is None:
break
packet_type = payload[0]
data = payload[1:].decode("utf-8", errors="replace")
# Enqueue the packet for the GUI.
self.recv_queue.put((packet_type, data))
except Exception as e:
self.recv_queue.put(("Error", str(e)))
break
self.running = False
def _recv_all(self, n):
data = b""
while len(data) < n:
try:
chunk = self.sock.recv(n - len(data))
except Exception:
return None
if not chunk:
return None
data += chunk
return data
def send_packet(self, packet_type, data):
data_bytes = data.encode("utf-8")
payload = struct.pack("!B", packet_type) + data_bytes
packet = struct.pack("!I", len(payload)) + payload
try:
self.sock.sendall(packet)
except Exception as e:
raise RuntimeError(f"Failed to send packet: {e}")
def close(self):
self.running = False
if self.sock:
self.sock.close()
# The GUI application.
class PacketClientGUI:
def __init__(self, master, client: PacketClient):
self.master = master
self.client = client
master.title("Packet Client GUI")
# Create a scrolling text widget for received packets.
self.received_text = scrolledtext.ScrolledText(master, width=80, height=20, state="disabled")
self.received_text.grid(row=0, column=0, columnspan=4, padx=10, pady=10)
# Packet type label and entry.
tk.Label(master, text="Packet Type:").grid(row=1, column=0, sticky="e", padx=(10,2))
self.type_entry = tk.Entry(master, width=5)
self.type_entry.grid(row=1, column=1, sticky="w")
self.type_entry.insert(0, "4") # Default to MOVE packet
# Data label and entry.
tk.Label(master, text="Data:").grid(row=1, column=2, sticky="e", padx=(10,2))
self.data_entry = tk.Entry(master, width=40)
self.data_entry.grid(row=1, column=3, sticky="w", padx=(0,10))
# Send button.
self.send_button = tk.Button(master, text="Send Packet", command=self.send_packet)
self.send_button.grid(row=2, column=0, columnspan=4, pady=(5,10))
# Start periodic GUI update.
self.master.after(100, self.process_recv_queue)
def process_recv_queue(self):
while not self.client.recv_queue.empty():
pkt_type, data = self.client.recv_queue.get()
if isinstance(pkt_type, int):
name = PACKET_NAMES.get(pkt_type, f"Unknown ({pkt_type})")
else:
name = pkt_type # For error messages
self.append_text(f"{name} > {data}\n")
self.master.after(100, self.process_recv_queue)
def append_text(self, text):
self.received_text.config(state="normal")
self.received_text.insert(tk.END, text)
self.received_text.see(tk.END)
self.received_text.config(state="disabled")
def send_packet(self):
try:
pkt_type = int(self.type_entry.get())
except ValueError:
tk.messagebox.showerror("Error", "Packet type must be an integer")
return
data = self.data_entry.get()
try:
self.client.send_packet(pkt_type, data)
# Show the packet sent, using the name if available.
pkt_name = PACKET_NAMES.get(pkt_type, f"Unknown ({pkt_type})")
self.append_text(f"Sent Packet | Type: {pkt_name} | Data: {data}\n")
except Exception as e:
tk.messagebox.showerror("Error", str(e))
def main():
if len(sys.argv) < 3:
print("Usage: python packet_client_gui.py <server_ip> <server_port>")
sys.exit(1)
server_ip = sys.argv[1]
server_port = int(sys.argv[2])
client = PacketClient(server_ip, server_port)
try:
client.connect()
except Exception as e:
print(e)
sys.exit(1)
root = tk.Tk()
app = PacketClientGUI(root, client)
try:
root.mainloop()
finally:
client.close()
if __name__ == "__main__":
main()