From 45726367eee475a7717d7dcad78895de348f0b97 Mon Sep 17 00:00:00 2001 From: Elizabeth Hunt Date: Thu, 5 Sep 2024 00:15:14 -0700 Subject: [PATCH] checkpoint; working position updates --- kennel/engine/systems/network.py | 81 +++++++++++++++++++------------- kennel/kennel.py | 28 +++++------ kennel/main.py | 33 +++++++------ static/src/main.ts | 6 ++- static/src/mouse_controller.ts | 15 ++---- static/src/network.ts | 6 ++- 6 files changed, 91 insertions(+), 78 deletions(-) diff --git a/kennel/engine/systems/network.py b/kennel/engine/systems/network.py index 1c1d71c..d1542e9 100644 --- a/kennel/engine/systems/network.py +++ b/kennel/engine/systems/network.py @@ -1,7 +1,7 @@ import asyncio from abc import abstractmethod from enum import Enum -from typing import List, Optional +from typing import List, Optional, Dict from kennel.app import logger from kennel.engine.entities.entity import Entity, EntityManager @@ -55,7 +55,9 @@ class Event: class InitialStateEvent(Event): - def __init__(self, world_width: int, world_height: int, entities: List[Entity]): + def __init__( + self, world_width: int, world_height: int, entities: Dict[str, Entity] + ): self.world_width = world_width self.world_height = world_height self.entities = entities @@ -63,7 +65,7 @@ class InitialStateEvent(Event): EventType.INITIAL_STATE, { "world": {"width": world_width, "height": world_height}, - "entities": [entity.to_dict() for entity in entities], + "entities": entities, }, ) @@ -98,51 +100,66 @@ class EntityDeathEvent(Event): class Publishable: @abstractmethod - async def publish(self, event: Event): + async def publish(self, event: List[Event]): pass -class EventProcessor: +class UpstreamEventProcessor: @abstractmethod def accept( - self, entity_manager: EntityManager, event: Event | tuple[Event, str] - ) -> None: + self, entity_manager: EntityManager, client_event: tuple[Event, str] + ) -> Optional[Event]: pass class NetworkSystem(System): - def __init__(self, event_processor: EventProcessor): - super().__init__(SystemType.NETWORK) - self.event_processor = event_processor + event_processor: UpstreamEventProcessor + sever_events: List[Event] + client_upstream_events: List[tuple[Event, str]] + clients: Dict[str, tuple[Publishable, List[Event]]] + + def __init__(self, upstream_event_processor: UpstreamEventProcessor): + super().__init__(SystemType.NETWORK) + self.upstream_event_processor = upstream_event_processor + + self.server_events = [] # events to propogate to the entire network + self.client_upstream_events = [] # events that come from the clients - self.events = [] - self.client_events = [] self.clients = {} async def update(self, entity_manager: EntityManager, delta_time: float) -> None: - if len(self.events) + len(self.client_events) == 0: + for event in self.client_upstream_events: + produced_event = self.upstream_event_processor.accept(entity_manager, event) + if produced_event is not None: + self.server_events.append(produced_event) + promises = [ + client.publish(self.server_events + events) + for client, events in self.clients.values() + if len(self.server_events + events) > 0 + ] + await asyncio.gather(*promises) + + self.server_events = [] + self.client_upstream_events = [] + for client_id in self.clients.keys(): + (client, _) = self.clients[client_id] + self.clients[client_id] = (client, []) + + def server_global_event(self, event: Event) -> None: + self.server_events.append(event) + + def client_upstream_event(self, client_id: str, event: Event) -> None: + self.client_upstream_events.append((event, client_id)) + + def client_downstream_event(self, client_id: str, event: Event) -> None: + if client_id not in self.clients: + logger.info(f"client {client_id} not found") return - for event in self.events + self.client_events: - self.event_processor.accept(entity_manager, event) - client_sendable = self.events + [event for event, _ in self.client_events] - await asyncio.gather( - *[ - client.publish(event) - for client in self.clients.values() - for event in client_sendable - ] - ) - self.events = [] - self.client_events = [] - - def client_event(self, client_id: str, event: Event) -> None: - self.client_events.append((event, client_id)) - - def add_event(self, event: Event) -> None: - self.events.append(event) + (client, events) = self.clients[client_id] + self.clients[client_id] = (client, events + [event]) def add_client(self, client_id: str, client: Publishable) -> None: - self.clients[client_id] = client + self.clients[client_id] = (client, []) def remove_client(self, client_id: str) -> None: del self.clients[client_id] diff --git a/kennel/kennel.py b/kennel/kennel.py index 1e23670..6852e4c 100644 --- a/kennel/kennel.py +++ b/kennel/kennel.py @@ -1,5 +1,5 @@ import uuid -from typing import List +from typing import List, Optional from kennel.config import config from kennel.engine.components.component import ComponentType @@ -9,7 +9,7 @@ from kennel.engine.game import Game from kennel.engine.systems.network import ( EntityPositionUpdateEvent, Event, - EventProcessor, + UpstreamEventProcessor, EventType, NetworkSystem, ) @@ -22,27 +22,22 @@ entity_manager = EntityManager() system_manager = SystemManager() -class KennelEventProcessor(EventProcessor): +class KennelEventProcessor(UpstreamEventProcessor): def accept( - self, entity_manager: EntityManager, event: Event | tuple[Event, str] - ) -> None: - if isinstance(event, tuple): - client_event, client_id = event - self._process_client_event(entity_manager, client_event, client_id) - return - - def _process_client_event( - self, entity_manager: EntityManager, event: Event, client_id: str - ) -> None: - if event.event_type == EventType.ENTITY_POSITION_UPDATE: - self._process_entity_position_update(entity_manager, event, client_id) + self, entity_manager: EntityManager, event: tuple[Event, str] + ) -> Optional[Event]: + client_event, client_id = event + if client_event.event_type == EventType.ENTITY_POSITION_UPDATE: + return self._process_entity_position_update( + entity_manager, client_event, client_id + ) def _process_entity_position_update( self, entity_manager: EntityManager, event: EntityPositionUpdateEvent, client_id: str, - ) -> None: + ) -> Optional[Event]: entity = entity_manager.get_entity(event.data["id"]) if entity is None: logger.error(f"Entity(id={event.data['id']}) does not exist") @@ -60,6 +55,7 @@ class KennelEventProcessor(EventProcessor): entity.add_component(position) entity_manager.add_entity(entity) + return event system_manager.add_system(WorldSystem(config.WORLD_WIDTH, config.WORLD_HEIGHT)) diff --git a/kennel/main.py b/kennel/main.py index c1106fe..0808eb5 100644 --- a/kennel/main.py +++ b/kennel/main.py @@ -1,6 +1,6 @@ import asyncio import uuid -from typing import Annotated, Optional +from typing import Annotated, Optional, List from fastapi import ( Cookie, @@ -82,8 +82,8 @@ class WebSocketClient(Publishable): def __init__(self, websocket: WebSocket): self.websocket = websocket - async def publish(self, event: Event): - await self.websocket.send_json([event.event_type, event.data]) + async def publish(self, events: List[Event]): + await self.websocket.send_json([event.to_dict() for event in events]) @app.websocket("/ws") @@ -95,27 +95,30 @@ async def websocket_endpoint( client = WebSocketClient(websocket) logger.info(f"Websocket connection established for session {session}") - initial_state = InitialStateEvent( - config.WORLD_WIDTH, config.WORLD_HEIGHT, kennel.entity_manager.to_dict() - ) - await client.publish(initial_state) - session_entities = create_session_controllable_entities(session) logger.info(f"Creating {len(session_entities)} entities for session {session}") try: network_system = system_manager.get_system(SystemType.NETWORK) if network_system is None: raise "Network system not found" + network_system.add_client(session, WebSocketClient(websocket)) + + network_system.client_downstream_event( + session, + InitialStateEvent( + config.WORLD_WIDTH, config.WORLD_HEIGHT, kennel.entity_manager.to_dict() + ), + ) for entity in session_entities: logger.info(f"Adding entity {entity.id} for session {session}") entity_manager.add_entity(entity) - network_system.add_event(EntityBornEvent(entity)) - set_controllable_event = SetControllableEvent(entity.id, session) - await client.publish(set_controllable_event) + network_system.server_global_event(EntityBornEvent(entity)) + network_system.client_downstream_event( + session, SetControllableEvent(entity.id, session) + ) - network_system.add_client(session, WebSocketClient(websocket)) while True: message = await websocket.receive_json() if not isinstance(message, list): @@ -125,20 +128,20 @@ async def websocket_endpoint( logger.info(f"Invalid events in: {message}"[:100]) continue for event in events: - network_system.client_event(session, event) + network_system.client_upstream_event(session, event) except WebSocketDisconnect: logger.info(f"Websocket connection closed by client: {session}") except Exception as e: logger.error("Exception occurred", exc_info=e) finally: - logger.info("Websocket connection closed") + network_system.remove_client(session) for entity in session_entities: logger.info(f"Removing entity {entity.id} for session {session}") entity_manager.remove_entity(entity.id) network_system.add_event(Event(EventType.ENTITY_DEATH, {"id": entity.id})) - network_system.remove_client(session) await websocket.close() + logger.info("Websocket connection closed") @app.get("/healthcheck") diff --git a/static/src/main.ts b/static/src/main.ts index 16de636..c3588de 100644 --- a/static/src/main.ts +++ b/static/src/main.ts @@ -65,9 +65,11 @@ class KennelClient { break; } } - - console.log(events, dt); + if (events.length > 0) { + console.log(events, dt); + } this.event_queue.clear(); + this.event_publisher.publish(); } private process_set_controllable_event(event: SetControllableEvent) { diff --git a/static/src/mouse_controller.ts b/static/src/mouse_controller.ts index c7ae304..bd8d51a 100644 --- a/static/src/mouse_controller.ts +++ b/static/src/mouse_controller.ts @@ -7,7 +7,6 @@ export class MouseController { constructor( private readonly publisher: (new_movement: Vec2) => void | Promise, private readonly debounce_ms = 200, - private readonly l2_norm_threshold = 40, ) {} public start() { @@ -29,20 +28,14 @@ export class MouseController { } public move(x: number, y: number) { - const new_movement = new Vec2(x, y); - if ( - typeof this.last_movement !== "undefined" && - new_movement.distance_to(this.last_movement) >= this.l2_norm_threshold - ) { - this.publish_movement(); - } - this.last_movement = new_movement; + this.last_movement = new Vec2(x, y); + this.publish_movement(); } private publish_movement() { if ( - typeof this.last_movement === "undefined" || - Date.now() - this.last_event_time < this.debounce_ms + Date.now() - this.last_event_time < this.debounce_ms || + typeof this.last_movement === "undefined" ) { return; } diff --git a/static/src/network.ts b/static/src/network.ts index f707148..fac96df 100644 --- a/static/src/network.ts +++ b/static/src/network.ts @@ -66,8 +66,7 @@ export class WebSocketEventQueue implements EventQueue { private listen_to(websocket: WebSocket) { websocket.onmessage = ({ data }) => { - const [event_type, event_data] = JSON.parse(data); - this.queue.push({ event_type, data: event_data } as Event); + this.queue = this.queue.concat(JSON.parse(data)); }; } } @@ -84,6 +83,9 @@ export class WebsocketEventPublisher implements EventPublisher { } public publish() { + if (this.queue.length === 0) { + return; + } this.websocket.send(JSON.stringify(this.queue)); this.queue = []; }