WIP: ECS / Network System #1

Draft
simponic wants to merge 13 commits from websockets into main
6 changed files with 91 additions and 78 deletions
Showing only changes of commit 45726367ee - Show all commits

View File

@ -1,7 +1,7 @@
import asyncio
from abc import abstractmethod
from enum import Enum
from typing import List, Optional
from typing import List, Optional, Dict
from kennel.app import logger
from kennel.engine.entities.entity import Entity, EntityManager
@ -55,7 +55,9 @@ class Event:
class InitialStateEvent(Event):
def __init__(self, world_width: int, world_height: int, entities: List[Entity]):
def __init__(
self, world_width: int, world_height: int, entities: Dict[str, Entity]
):
self.world_width = world_width
self.world_height = world_height
self.entities = entities
@ -63,7 +65,7 @@ class InitialStateEvent(Event):
EventType.INITIAL_STATE,
{
"world": {"width": world_width, "height": world_height},
"entities": [entity.to_dict() for entity in entities],
"entities": entities,
},
)
@ -98,51 +100,66 @@ class EntityDeathEvent(Event):
class Publishable:
@abstractmethod
async def publish(self, event: Event):
async def publish(self, event: List[Event]):
pass
class EventProcessor:
class UpstreamEventProcessor:
@abstractmethod
def accept(
self, entity_manager: EntityManager, event: Event | tuple[Event, str]
) -> None:
self, entity_manager: EntityManager, client_event: tuple[Event, str]
) -> Optional[Event]:
pass
class NetworkSystem(System):
def __init__(self, event_processor: EventProcessor):
super().__init__(SystemType.NETWORK)
self.event_processor = event_processor
event_processor: UpstreamEventProcessor
sever_events: List[Event]
client_upstream_events: List[tuple[Event, str]]
clients: Dict[str, tuple[Publishable, List[Event]]]
def __init__(self, upstream_event_processor: UpstreamEventProcessor):
super().__init__(SystemType.NETWORK)
self.upstream_event_processor = upstream_event_processor
self.server_events = [] # events to propogate to the entire network
self.client_upstream_events = [] # events that come from the clients
self.events = []
self.client_events = []
self.clients = {}
async def update(self, entity_manager: EntityManager, delta_time: float) -> None:
if len(self.events) + len(self.client_events) == 0:
return
for event in self.events + self.client_events:
self.event_processor.accept(entity_manager, event)
client_sendable = self.events + [event for event, _ in self.client_events]
await asyncio.gather(
*[
client.publish(event)
for client in self.clients.values()
for event in client_sendable
for event in self.client_upstream_events:
produced_event = self.upstream_event_processor.accept(entity_manager, event)
if produced_event is not None:
self.server_events.append(produced_event)
promises = [
client.publish(self.server_events + events)
for client, events in self.clients.values()
if len(self.server_events + events) > 0
]
)
self.events = []
self.client_events = []
await asyncio.gather(*promises)
def client_event(self, client_id: str, event: Event) -> None:
self.client_events.append((event, client_id))
self.server_events = []
self.client_upstream_events = []
for client_id in self.clients.keys():
(client, _) = self.clients[client_id]
self.clients[client_id] = (client, [])
def add_event(self, event: Event) -> None:
self.events.append(event)
def server_global_event(self, event: Event) -> None:
self.server_events.append(event)
def client_upstream_event(self, client_id: str, event: Event) -> None:
self.client_upstream_events.append((event, client_id))
def client_downstream_event(self, client_id: str, event: Event) -> None:
if client_id not in self.clients:
logger.info(f"client {client_id} not found")
return
(client, events) = self.clients[client_id]
self.clients[client_id] = (client, events + [event])
def add_client(self, client_id: str, client: Publishable) -> None:
self.clients[client_id] = client
self.clients[client_id] = (client, [])
def remove_client(self, client_id: str) -> None:
del self.clients[client_id]

View File

@ -1,5 +1,5 @@
import uuid
from typing import List
from typing import List, Optional
from kennel.config import config
from kennel.engine.components.component import ComponentType
@ -9,7 +9,7 @@ from kennel.engine.game import Game
from kennel.engine.systems.network import (
EntityPositionUpdateEvent,
Event,
EventProcessor,
UpstreamEventProcessor,
EventType,
NetworkSystem,
)
@ -22,27 +22,22 @@ entity_manager = EntityManager()
system_manager = SystemManager()
class KennelEventProcessor(EventProcessor):
class KennelEventProcessor(UpstreamEventProcessor):
def accept(
self, entity_manager: EntityManager, event: Event | tuple[Event, str]
) -> None:
if isinstance(event, tuple):
self, entity_manager: EntityManager, event: tuple[Event, str]
) -> Optional[Event]:
client_event, client_id = event
self._process_client_event(entity_manager, client_event, client_id)
return
def _process_client_event(
self, entity_manager: EntityManager, event: Event, client_id: str
) -> None:
if event.event_type == EventType.ENTITY_POSITION_UPDATE:
self._process_entity_position_update(entity_manager, event, client_id)
if client_event.event_type == EventType.ENTITY_POSITION_UPDATE:
return self._process_entity_position_update(
entity_manager, client_event, client_id
)
def _process_entity_position_update(
self,
entity_manager: EntityManager,
event: EntityPositionUpdateEvent,
client_id: str,
) -> None:
) -> Optional[Event]:
entity = entity_manager.get_entity(event.data["id"])
if entity is None:
logger.error(f"Entity(id={event.data['id']}) does not exist")
@ -60,6 +55,7 @@ class KennelEventProcessor(EventProcessor):
entity.add_component(position)
entity_manager.add_entity(entity)
return event
system_manager.add_system(WorldSystem(config.WORLD_WIDTH, config.WORLD_HEIGHT))

View File

@ -1,6 +1,6 @@
import asyncio
import uuid
from typing import Annotated, Optional
from typing import Annotated, Optional, List
from fastapi import (
Cookie,
@ -82,8 +82,8 @@ class WebSocketClient(Publishable):
def __init__(self, websocket: WebSocket):
self.websocket = websocket
async def publish(self, event: Event):
await self.websocket.send_json([event.event_type, event.data])
async def publish(self, events: List[Event]):
await self.websocket.send_json([event.to_dict() for event in events])
@app.websocket("/ws")
@ -95,27 +95,30 @@ async def websocket_endpoint(
client = WebSocketClient(websocket)
logger.info(f"Websocket connection established for session {session}")
initial_state = InitialStateEvent(
config.WORLD_WIDTH, config.WORLD_HEIGHT, kennel.entity_manager.to_dict()
)
await client.publish(initial_state)
session_entities = create_session_controllable_entities(session)
logger.info(f"Creating {len(session_entities)} entities for session {session}")
try:
network_system = system_manager.get_system(SystemType.NETWORK)
if network_system is None:
raise "Network system not found"
network_system.add_client(session, WebSocketClient(websocket))
network_system.client_downstream_event(
session,
InitialStateEvent(
config.WORLD_WIDTH, config.WORLD_HEIGHT, kennel.entity_manager.to_dict()
),
)
for entity in session_entities:
logger.info(f"Adding entity {entity.id} for session {session}")
entity_manager.add_entity(entity)
network_system.add_event(EntityBornEvent(entity))
set_controllable_event = SetControllableEvent(entity.id, session)
await client.publish(set_controllable_event)
network_system.server_global_event(EntityBornEvent(entity))
network_system.client_downstream_event(
session, SetControllableEvent(entity.id, session)
)
network_system.add_client(session, WebSocketClient(websocket))
while True:
message = await websocket.receive_json()
if not isinstance(message, list):
@ -125,20 +128,20 @@ async def websocket_endpoint(
logger.info(f"Invalid events in: {message}"[:100])
continue
for event in events:
network_system.client_event(session, event)
network_system.client_upstream_event(session, event)
except WebSocketDisconnect:
logger.info(f"Websocket connection closed by client: {session}")
except Exception as e:
logger.error("Exception occurred", exc_info=e)
finally:
logger.info("Websocket connection closed")
network_system.remove_client(session)
for entity in session_entities:
logger.info(f"Removing entity {entity.id} for session {session}")
entity_manager.remove_entity(entity.id)
network_system.add_event(Event(EventType.ENTITY_DEATH, {"id": entity.id}))
network_system.remove_client(session)
await websocket.close()
logger.info("Websocket connection closed")
@app.get("/healthcheck")

View File

@ -65,9 +65,11 @@ class KennelClient {
break;
}
}
if (events.length > 0) {
console.log(events, dt);
}
this.event_queue.clear();
this.event_publisher.publish();
}
private process_set_controllable_event(event: SetControllableEvent) {

View File

@ -7,7 +7,6 @@ export class MouseController {
constructor(
private readonly publisher: (new_movement: Vec2) => void | Promise<void>,
private readonly debounce_ms = 200,
private readonly l2_norm_threshold = 40,
) {}
public start() {
@ -29,20 +28,14 @@ export class MouseController {
}
public move(x: number, y: number) {
const new_movement = new Vec2(x, y);
if (
typeof this.last_movement !== "undefined" &&
new_movement.distance_to(this.last_movement) >= this.l2_norm_threshold
) {
this.last_movement = new Vec2(x, y);
this.publish_movement();
}
this.last_movement = new_movement;
}
private publish_movement() {
if (
typeof this.last_movement === "undefined" ||
Date.now() - this.last_event_time < this.debounce_ms
Date.now() - this.last_event_time < this.debounce_ms ||
typeof this.last_movement === "undefined"
) {
return;
}

View File

@ -66,8 +66,7 @@ export class WebSocketEventQueue implements EventQueue {
private listen_to(websocket: WebSocket) {
websocket.onmessage = ({ data }) => {
const [event_type, event_data] = JSON.parse(data);
this.queue.push({ event_type, data: event_data } as Event);
this.queue = this.queue.concat(JSON.parse(data));
};
}
}
@ -84,6 +83,9 @@ export class WebsocketEventPublisher implements EventPublisher {
}
public publish() {
if (this.queue.length === 0) {
return;
}
this.websocket.send(JSON.stringify(this.queue));
this.queue = [];
}