[docs]defsync(fps:int,post_effect:PostEffectType,show_grid:bool,unlocked_framerate:bool,video_res_height:int,video_res_width:int,webrtc:bool,*,client:Client,pool:Optional[str]=None,replay:Optional[str]=None,)->ClientConnection:"""Pass those commands to the engine via websocket, and pass responses back to the client. Basically, this is a websocket proxy between the frontend/client and the engine."""# noqa: E501kwargs=_get_kwargs(fps=fps,pool=pool,post_effect=post_effect,replay=replay,show_grid=show_grid,unlocked_framerate=unlocked_framerate,video_res_height=video_res_height,video_res_width=video_res_width,webrtc=webrtc,client=client,)returnws_connect(kwargs["url"].replace("http","ws"),additional_headers=kwargs["headers"],close_timeout=120,max_size=None)# type: ignore
[docs]asyncdefasyncio(fps:int,post_effect:PostEffectType,show_grid:bool,unlocked_framerate:bool,video_res_height:int,video_res_width:int,webrtc:bool,*,client:Client,pool:Optional[str]=None,replay:Optional[str]=None,)->WebSocketClientProtocol:"""Pass those commands to the engine via websocket, and pass responses back to the client. Basically, this is a websocket proxy between the frontend/client and the engine."""# noqa: E501kwargs=_get_kwargs(fps=fps,pool=pool,post_effect=post_effect,replay=replay,show_grid=show_grid,unlocked_framerate=unlocked_framerate,video_res_height=video_res_height,video_res_width=video_res_width,webrtc=webrtc,client=client,)returnawaitws_connect_async(kwargs["url"].replace("http","ws"),extra_headers=kwargs["headers"],close_timeout=120,max_size=None,)
[docs]classWebSocket:"""A websocket connection to the API endpoint."""ws:ClientConnection
[docs]def__iter__(self)->Iterator[WebSocketResponse]:""" Iterate on incoming messages. The iterator calls :meth:`recv` and yields messages in an infinite loop. It exits when the connection is closed normally. It raises a :exc:`~websockets.exceptions.ConnectionClosedError` exception after a protocol error or a network failure. """formessageinself.ws:yieldWebSocketResponse(**json.loads(message))
[docs]defsend(self,data:WebSocketRequest):"""Send data to the websocket."""self.ws.send(json.dumps(data.model_dump()))
[docs]defsend_binary(self,data:WebSocketRequest):"""Send data as bson to the websocket."""self.ws.send(bson.encode(data.model_dump()))# type: ignore
[docs]defrecv(self)->WebSocketResponse:"""Receive data from the websocket."""message=self.ws.recv(timeout=60)returnWebSocketResponse(**json.loads(message))
[docs]defclose(self):"""Close the websocket."""self.ws.close()