524 lines
19 KiB
Python
524 lines
19 KiB
Python
import socket
|
|
import threading
|
|
import pickle
|
|
import time
|
|
import uuid
|
|
import math
|
|
import json
|
|
import pygame # For the dashboard
|
|
|
|
# --- Configuration ---
|
|
HOST = '127.0.0.1'
|
|
PORT = 65432
|
|
TICKRATE = 128 # ticks per second
|
|
MAX_USERS = 100
|
|
|
|
# Racing physics constants.
|
|
ACCELERATION = 300.0 # pixels per second^2
|
|
BRAKE_DECELERATION = 400.0 # pixels per second^2
|
|
FRICTION = 200.0 # pixels per second^2
|
|
TURN_SPEED = math.radians(240) # radians per second
|
|
MAX_SPEED = 400.0 # pixels per second
|
|
MIN_SPEED = -200.0 # pixels per second
|
|
|
|
# Collision / rendering radius.
|
|
PLAYER_RADIUS = 10
|
|
|
|
# --- Sensor Configuration for Bots ---
|
|
BOT_SENSOR_RAY_COUNT = 90
|
|
BOT_SENSOR_FOV = math.radians(180) # Field of view of 90 degrees
|
|
SENSOR_MAX_DISTANCE = 150 # Maximum sensor range
|
|
|
|
# --- Load Map from JSON ---
|
|
try:
|
|
with open('map.json', 'r') as f:
|
|
map_data = json.load(f)
|
|
print("[INFO] Loaded map.json successfully.")
|
|
except Exception as e:
|
|
print(f"[ERROR] Could not load map.json: {e}")
|
|
map_data = {
|
|
"obstacle": [],
|
|
"road": [],
|
|
"checkpoint": [],
|
|
"finish_line": []
|
|
}
|
|
obstacles = map_data.get("obstacle", [])
|
|
roads = map_data.get("road", [])
|
|
checkpoints = map_data.get("checkpoint", [])
|
|
finish_lines = map_data.get("finish_line", [])
|
|
|
|
# --- Collision Helpers ---
|
|
def circle_rect_collision(cx, cy, radius, rect):
|
|
closest_x = max(rect["x"], min(cx, rect["x"] + rect["width"]))
|
|
closest_y = max(rect["y"], min(cy, rect["y"] + rect["height"]))
|
|
dx = cx - closest_x
|
|
dy = cy - closest_y
|
|
return (dx * dx + dy * dy) < (radius * radius)
|
|
|
|
def rect_collision(cx, cy, rect):
|
|
return (rect["x"] <= cx <= rect["x"] + rect["width"] and
|
|
rect["y"] <= cy <= rect["y"] + rect["height"])
|
|
|
|
# --- Ray Casting for Sensors ---
|
|
def ray_cast(x, y, angle, max_distance):
|
|
step = 5
|
|
for d in range(0, int(max_distance), step):
|
|
test_x = x + d * math.cos(angle)
|
|
test_y = y + d * math.sin(angle)
|
|
for obs in obstacles:
|
|
if (obs["x"] <= test_x <= obs["x"] + obs["width"] and
|
|
obs["y"] <= test_y <= obs["y"] + obs["height"]):
|
|
return d
|
|
return max_distance
|
|
|
|
# --- Network Event Class & Helpers ---
|
|
class Event:
|
|
def __init__(self, name, payload):
|
|
self.name = name
|
|
self.payload = payload
|
|
|
|
def recvall(sock, n):
|
|
data = b''
|
|
while len(data) < n:
|
|
try:
|
|
packet = sock.recv(n - len(data))
|
|
except Exception as e:
|
|
return None
|
|
if not packet:
|
|
return None
|
|
data += packet
|
|
return data
|
|
|
|
def send_event(sock, event):
|
|
try:
|
|
data = pickle.dumps(event)
|
|
length = len(data)
|
|
sock.sendall(length.to_bytes(4, byteorder='big') + data)
|
|
except Exception as e:
|
|
print(f"[ERROR] send_event: {e}")
|
|
|
|
def recv_event(sock):
|
|
raw_len = recvall(sock, 4)
|
|
if not raw_len:
|
|
return None
|
|
msg_len = int.from_bytes(raw_len, byteorder='big')
|
|
data = recvall(sock, msg_len)
|
|
if data is None:
|
|
return None
|
|
return pickle.loads(data)
|
|
|
|
# --- Global State ---
|
|
event_subscriptions = {} # event name -> list of client sockets
|
|
client_ids = {} # conn -> client_id
|
|
client_states = {} # conn -> state dict (for human players)
|
|
bot_states = {} # bot_id -> state dict (with "bot": True)
|
|
lock = threading.Lock()
|
|
reset_in_progress = False # Global flag to avoid repeated resets
|
|
|
|
# --- Bot AI (Smart with Configurable Sensors) ---
|
|
def update_bot(state, dt):
|
|
# Determine target based on checkpoints or finish line.
|
|
if checkpoints and state["checkpoint_index"] < len(checkpoints):
|
|
cp = checkpoints[state["checkpoint_index"]]
|
|
target_x = cp["x"] + cp["width"] / 2
|
|
target_y = cp["y"] + cp["height"] / 2
|
|
elif finish_lines:
|
|
fl = finish_lines[0] # assume one finish line
|
|
target_x = fl["x"] + fl["width"] / 2
|
|
target_y = fl["y"] + fl["height"] / 2
|
|
else:
|
|
target_x = state["x"] + math.cos(state["angle"]) * 100
|
|
target_y = state["y"] + math.sin(state["angle"]) * 100
|
|
|
|
# Incentivize staying on roads.
|
|
def is_on_road(x, y):
|
|
for road in roads:
|
|
if rect_collision(x, y, road):
|
|
return True
|
|
return False
|
|
if not is_on_road(state["x"], state["y"]) and roads:
|
|
best_distance = float('inf')
|
|
best_center = None
|
|
for road in roads:
|
|
center_x = road["x"] + road["width"] / 2
|
|
center_y = road["y"] + road["height"] / 2
|
|
d = math.hypot(center_x - state["x"], center_y - state["y"])
|
|
if d < best_distance:
|
|
best_distance = d
|
|
best_center = (center_x, center_y)
|
|
if best_center is not None:
|
|
# Blend target: 70% checkpoint/finish and 30% road center.
|
|
target_x = 0.7 * target_x + 0.3 * best_center[0]
|
|
target_y = 0.7 * target_y + 0.3 * best_center[1]
|
|
|
|
state["target"] = (target_x, target_y) # For dashboard drawing
|
|
|
|
# Compute desired angle toward target.
|
|
desired_angle_target = math.atan2(target_y - state["y"], target_x - state["x"])
|
|
|
|
# --- Sensor Readings ---
|
|
sensor_readings = []
|
|
for i in range(BOT_SENSOR_RAY_COUNT):
|
|
if BOT_SENSOR_RAY_COUNT > 1:
|
|
offset = -BOT_SENSOR_FOV/2 + i * (BOT_SENSOR_FOV/(BOT_SENSOR_RAY_COUNT - 1))
|
|
else:
|
|
offset = 0
|
|
sensor_angle = state["angle"] + offset
|
|
distance = ray_cast(state["x"], state["y"], sensor_angle, SENSOR_MAX_DISTANCE)
|
|
sensor_readings.append((offset, distance))
|
|
state["sensors"] = sensor_readings
|
|
|
|
# --- Compute Obstacle Avoidance Vector ---
|
|
avoidance_x = 0
|
|
avoidance_y = 0
|
|
threshold = SENSOR_MAX_DISTANCE * 0.6 # Activate avoidance if obstacle is closer than 60% of max.
|
|
for offset, distance in sensor_readings:
|
|
if distance < threshold:
|
|
repulsion = (threshold - distance) / threshold
|
|
avoid_angle = state["angle"] + offset
|
|
avoidance_x -= repulsion * math.cos(avoid_angle)
|
|
avoidance_y -= repulsion * math.sin(avoid_angle)
|
|
avoidance_angle = 0
|
|
avoidance_weight = 0
|
|
if avoidance_x != 0 or avoidance_y != 0:
|
|
avoidance_angle = math.atan2(avoidance_y, avoidance_x)
|
|
avoidance_weight = math.hypot(avoidance_x, avoidance_y)
|
|
|
|
# --- Combine Target and Avoidance ---
|
|
w_target = 1.0
|
|
w_avoid = avoidance_weight
|
|
target_vec = (math.cos(desired_angle_target), math.sin(desired_angle_target))
|
|
avoid_vec = (math.cos(avoidance_angle), math.sin(avoidance_angle)) if w_avoid > 0 else (0, 0)
|
|
combined_x = w_target * target_vec[0] + w_avoid * avoid_vec[0]
|
|
combined_y = w_target * target_vec[1] + w_avoid * avoid_vec[1]
|
|
desired_angle = math.atan2(combined_y, combined_x)
|
|
|
|
# --- Determine Steering Keys ---
|
|
def normalize_angle(a):
|
|
while a > math.pi:
|
|
a -= 2*math.pi
|
|
while a < -math.pi:
|
|
a += 2*math.pi
|
|
return a
|
|
angle_diff = normalize_angle(desired_angle - state["angle"])
|
|
desired_keys = {"up"} # Always accelerate.
|
|
if angle_diff > 0.1:
|
|
desired_keys.add("right")
|
|
elif angle_diff < -0.1:
|
|
desired_keys.add("left")
|
|
state["keys"] = desired_keys
|
|
|
|
# --- Physics Update for All Entities ---
|
|
def update_physics(state, dt):
|
|
if 'up' in state['keys']:
|
|
state['speed'] += ACCELERATION * dt
|
|
elif 'down' in state['keys']:
|
|
state['speed'] -= BRAKE_DECELERATION * dt
|
|
else:
|
|
if state['speed'] > 0:
|
|
state['speed'] -= FRICTION * dt
|
|
if state['speed'] < 0:
|
|
state['speed'] = 0
|
|
elif state['speed'] < 0:
|
|
state['speed'] += FRICTION * dt
|
|
if state['speed'] > 0:
|
|
state['speed'] = 0
|
|
|
|
if state['speed'] > MAX_SPEED:
|
|
state['speed'] = MAX_SPEED
|
|
if state['speed'] < MIN_SPEED:
|
|
state['speed'] = MIN_SPEED
|
|
|
|
if 'left' in state['keys']:
|
|
state['angle'] -= TURN_SPEED * dt
|
|
if 'right' in state['keys']:
|
|
state['angle'] += TURN_SPEED * dt
|
|
|
|
new_x = state['x'] + state['speed'] * math.cos(state['angle']) * dt
|
|
new_y = state['y'] + state['speed'] * math.sin(state['angle']) * dt
|
|
|
|
collision = False
|
|
for obs in obstacles:
|
|
if circle_rect_collision(new_x, new_y, PLAYER_RADIUS, obs):
|
|
collision = True
|
|
break
|
|
if not collision:
|
|
state['x'] = new_x
|
|
state['y'] = new_y
|
|
else:
|
|
state['speed'] = 0
|
|
|
|
# --- Checkpoints and Finish Line ---
|
|
if "checkpoint_index" not in state:
|
|
state["checkpoint_index"] = 0
|
|
if "finished" not in state:
|
|
state["finished"] = False
|
|
|
|
if checkpoints and not state["finished"]:
|
|
idx = state["checkpoint_index"]
|
|
if idx < len(checkpoints):
|
|
cp = checkpoints[idx]
|
|
if rect_collision(state["x"], state["y"], cp):
|
|
state["checkpoint_index"] += 1
|
|
print(f"[RACE] {state['username']} passed checkpoint {idx+1}")
|
|
for fl in finish_lines:
|
|
if rect_collision(state["x"], state["y"], fl) and not state["finished"]:
|
|
if (checkpoints and state["checkpoint_index"] >= len(checkpoints)) or not checkpoints:
|
|
state["finished"] = True
|
|
print(f"[RACE] {state['username']} has finished the race!")
|
|
|
|
# --- Global Reset Function ---
|
|
def reset_race():
|
|
with lock:
|
|
for state in list(client_states.values()) + list(bot_states.values()):
|
|
state["x"] = 100
|
|
state["y"] = 100
|
|
state["angle"] = 0
|
|
state["speed"] = 0
|
|
state["keys"] = set()
|
|
state["checkpoint_index"] = 0
|
|
state["finished"] = False
|
|
print("[RACE] Race finished. Resetting all players.")
|
|
|
|
def clear_reset_flag():
|
|
global reset_in_progress
|
|
with lock:
|
|
reset_in_progress = False
|
|
|
|
# --- Network Client Handler ---
|
|
def handle_client(conn, addr):
|
|
client_id = str(uuid.uuid4())
|
|
with lock:
|
|
client_ids[conn] = client_id
|
|
client_states[conn] = {
|
|
"client_id": client_id,
|
|
"username": "Guest",
|
|
"x": 100,
|
|
"y": 100,
|
|
"angle": 0,
|
|
"speed": 0,
|
|
"keys": set(),
|
|
"finished": False,
|
|
"checkpoint_index": 0
|
|
}
|
|
print(f"[NEW CONNECTION] {addr} connected as {client_id}")
|
|
|
|
try:
|
|
send_event(conn, Event("self_id", {"client_id": client_id}))
|
|
server_info = {
|
|
"tickrate": TICKRATE,
|
|
"total_users": len(client_states) + len(bot_states),
|
|
"max_users": MAX_USERS,
|
|
"server_ip": HOST,
|
|
"server_port": PORT,
|
|
"server_name": "Advanced Top Down Racing Server",
|
|
"map": map_data
|
|
}
|
|
send_event(conn, Event("server_info", server_info))
|
|
except Exception as e:
|
|
print(f"[ERROR] sending self_id/server_info: {e}")
|
|
|
|
broadcast_event("client_connect", {"client_id": client_id})
|
|
|
|
try:
|
|
while True:
|
|
event = recv_event(conn)
|
|
if event is None:
|
|
break
|
|
if event.name == "register_event":
|
|
event_name = event.payload.get('event')
|
|
with lock:
|
|
event_subscriptions.setdefault(event_name, []).append(conn)
|
|
print(f"[REGISTER] {client_id} subscribed to '{event_name}'")
|
|
elif event.name == "set_username":
|
|
username = event.payload.get('username', 'Guest')
|
|
with lock:
|
|
if conn in client_states:
|
|
client_states[conn]['username'] = username
|
|
print(f"[USERNAME] {client_id} set username to '{username}'")
|
|
elif event.name == "keydown":
|
|
key = event.payload.get('key')
|
|
with lock:
|
|
if conn in client_states:
|
|
client_states[conn]['keys'].add(key)
|
|
print(f"[KEYDOWN] {client_id} key '{key}' pressed")
|
|
elif event.name == "keyup":
|
|
key = event.payload.get('key')
|
|
with lock:
|
|
if conn in client_states and key in client_states[conn]['keys']:
|
|
client_states[conn]['keys'].remove(key)
|
|
print(f"[KEYUP] {client_id} key '{key}' released")
|
|
elif event.name == "ping":
|
|
timestamp = event.payload.get('timestamp')
|
|
send_event(conn, Event("pong", {"timestamp": timestamp}))
|
|
except Exception as e:
|
|
print(f"[ERROR] Exception with {client_ids.get(conn, 'unknown')}: {e}")
|
|
finally:
|
|
with lock:
|
|
for subs in event_subscriptions.values():
|
|
if conn in subs:
|
|
subs.remove(conn)
|
|
client_ids.pop(conn, None)
|
|
client_states.pop(conn, None)
|
|
print(f"[DISCONNECT] {client_id} disconnected.")
|
|
broadcast_event("client_disconnect", {"client_id": client_id})
|
|
conn.close()
|
|
|
|
def broadcast_event(event_name, payload):
|
|
event = Event(event_name, payload)
|
|
with lock:
|
|
receivers = event_subscriptions.get(event_name, [])
|
|
for client in receivers[:]:
|
|
try:
|
|
send_event(client, event)
|
|
except Exception as e:
|
|
print(f"[ERROR] Failed to send to client, removing: {e}")
|
|
receivers.remove(client)
|
|
|
|
# --- Bot Spawning ---
|
|
def spawn_bot():
|
|
bot_id = str(uuid.uuid4())
|
|
bot_state = {
|
|
"client_id": bot_id,
|
|
"username": "Bot_" + bot_id[:4],
|
|
"x": 150,
|
|
"y": 150,
|
|
"angle": 0,
|
|
"speed": 0,
|
|
"keys": set(),
|
|
"finished": False,
|
|
"checkpoint_index": 0,
|
|
"bot": True
|
|
}
|
|
with lock:
|
|
bot_states[bot_id] = bot_state
|
|
print(f"[BOT] Spawned bot {bot_state['username']}")
|
|
|
|
# --- Game Loop ---
|
|
def game_loop():
|
|
global reset_in_progress
|
|
dt = 1 / TICKRATE
|
|
while True:
|
|
time.sleep(dt)
|
|
with lock:
|
|
all_states = list(client_states.values()) + list(bot_states.values())
|
|
for state in all_states:
|
|
if state.get("bot", False):
|
|
update_bot(state, dt)
|
|
update_physics(state, dt)
|
|
|
|
|
|
|
|
|
|
players_payload = {}
|
|
for state in client_states.values():
|
|
players_payload[state['client_id']] = {
|
|
"username": state['username'],
|
|
"x": state['x'],
|
|
"y": state['y'],
|
|
"angle": state['angle'],
|
|
"speed": state['speed'],
|
|
"finished": state.get("finished", False)
|
|
}
|
|
for state in bot_states.values():
|
|
players_payload[state['client_id']] = {
|
|
"username": state['username'],
|
|
"x": state['x'],
|
|
"y": state['y'],
|
|
"angle": state['angle'],
|
|
"speed": state['speed'],
|
|
"finished": state.get("finished", False),
|
|
"sensors": state.get("sensors", []),
|
|
"target": state.get("target", None)
|
|
}
|
|
total_users = len(client_states) + len(bot_states)
|
|
payload = {
|
|
"players": players_payload,
|
|
"total_users": total_users,
|
|
"max_users": MAX_USERS
|
|
}
|
|
broadcast_event("state_update", payload)
|
|
|
|
# --- Server Dashboard (Visualization) ---
|
|
def server_dashboard():
|
|
pygame.init()
|
|
DASH_WIDTH, DASH_HEIGHT = 1200, 900
|
|
screen = pygame.display.set_mode((DASH_WIDTH, DASH_HEIGHT))
|
|
pygame.display.set_caption("Server Dashboard: Full Race Course View")
|
|
clock = pygame.time.Clock()
|
|
font = pygame.font.SysFont(None, 20)
|
|
|
|
while True:
|
|
for event in pygame.event.get():
|
|
if event.type == pygame.QUIT:
|
|
pygame.quit()
|
|
return
|
|
|
|
screen.fill((20, 20, 20))
|
|
|
|
# Draw Roads.
|
|
for road in roads:
|
|
rect = pygame.Rect(road["x"], road["y"], road["width"], road["height"])
|
|
pygame.draw.rect(screen, (80, 80, 80), rect)
|
|
|
|
# Draw Obstacles.
|
|
for obs in obstacles:
|
|
rect = pygame.Rect(obs["x"], obs["y"], obs["width"], obs["height"])
|
|
pygame.draw.rect(screen, (150, 50, 50), rect)
|
|
|
|
# Draw Checkpoints.
|
|
for cp in checkpoints:
|
|
rect = pygame.Rect(cp["x"], cp["y"], cp["width"], cp["height"])
|
|
pygame.draw.rect(screen, (200, 200, 0), rect, 2)
|
|
|
|
# Draw Finish Lines.
|
|
for fl in finish_lines:
|
|
rect = pygame.Rect(fl["x"], fl["y"], fl["width"], fl["height"])
|
|
pygame.draw.rect(screen, (0, 0, 255), rect, 2)
|
|
|
|
# Draw Players.
|
|
with lock:
|
|
all_states = list(client_states.values()) + list(bot_states.values())
|
|
for state in all_states:
|
|
x = int(state["x"])
|
|
y = int(state["y"])
|
|
color = (0, 255, 0) if not state.get("bot", False) else (255, 165, 0)
|
|
pygame.draw.circle(screen, color, (x, y), PLAYER_RADIUS)
|
|
name = state["username"]
|
|
label = font.render(name, True, (255,255,255))
|
|
screen.blit(label, (x - label.get_width() // 2, y - PLAYER_RADIUS - 15))
|
|
|
|
# For bots, draw sensor rays and target line.
|
|
if state.get("bot", False):
|
|
for offset, distance in state.get("sensors", []):
|
|
ray_angle = state["angle"] + offset
|
|
end_x = int(x + distance * math.cos(ray_angle))
|
|
end_y = int(y + distance * math.sin(ray_angle))
|
|
ray_color = (255, 0, 0) if distance < SENSOR_MAX_DISTANCE else (0, 255, 0)
|
|
pygame.draw.line(screen, ray_color, (x, y), (end_x, end_y), 2)
|
|
if "target" in state and state["target"]:
|
|
tx, ty = state["target"]
|
|
pygame.draw.line(screen, (255,255,255), (x, y), (int(tx), int(ty)), 2)
|
|
|
|
info_text = font.render("Server Dashboard - Full Race Course View", True, (255,255,255))
|
|
screen.blit(info_text, (10, 10))
|
|
pygame.display.flip()
|
|
clock.tick(30)
|
|
|
|
# --- Server Startup ---
|
|
def start_server():
|
|
print("[STARTING] Server is starting...")
|
|
threading.Thread(target=game_loop, daemon=True).start()
|
|
threading.Thread(target=server_dashboard, daemon=True).start()
|
|
spawn_bot() # Spawn one bot (add more if desired)
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.bind((HOST, PORT))
|
|
s.listen()
|
|
print(f"[LISTENING] Server is listening on {HOST}:{PORT}")
|
|
while True:
|
|
conn, addr = s.accept()
|
|
threading.Thread(target=handle_client, args=(conn, addr), daemon=True).start()
|
|
|
|
if __name__ == "__main__":
|
|
start_server()
|