跳转至

DG-Lab WebSocket 服务端

DGLabWSServer

DGLabWSServer(host: Union[str, Sequence[str]], port: Optional[int] = None, heartbeat_interval: float = None, **kwargs)

DG-Lab WebSocket 服务器

Parameters:

Name Type Description Default
host Union[str, Sequence[str]]

WebSocket 服务器绑定的接口

required
port Optional[int]

监听端口

None
heartbeat_interval float

心跳包发送间隔(秒)

None
kwargs

:class:websockets.server.serve 的其他参数

{}
Source code in pydglab_ws/server/server.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def __init__(
        self,
        host: Union[str, Sequence[str]],
        port: Optional[int] = None,
        heartbeat_interval: float = None,
        **kwargs
):
    self._serve = ws_serve(
        self._ws_handler,
        host=host,
        port=port,
        **kwargs
    )
    self._client_id_to_queue: Dict[UUID4, asyncio.Queue] = {}
    self._uuid_to_ws: Dict[UUID4, WebSocketServerProtocol] = {}
    self._client_id_to_target_id: Dict[UUID4, UUID4] = {}
    self._target_id_to_client_id: Dict[UUID4, UUID4] = {}
    self._message_type_to_handler: Dict[
        MessageType,
        Callable[
            [DGLabWSServer, WebSocketMessage, Optional[WebSocketServerProtocol]],
            Coroutine[Any, Any, None]
        ]
    ] = {
        MessageType.BIND: self._handle_bind,
        MessageType.MSG: self._handle_msg
    }
    self._message_type_to_callbacks: Dict[
        MessageType,
        Set[Callable[[WebSocketMessage, bool], Any]]
    ] = {
        MessageType.BIND: set(),
        MessageType.MSG: set()
    }
    self._connection_callbacks: Tuple[
        Set[Callable[[UUID4, WebSocketServerProtocol], Any]],
        Set[Callable[[UUID4, WebSocketServerProtocol], Any]]
    ] = (set(), set())
    """新连接建立时 与 连接断开时"""
    self._heartbeat_interval = heartbeat_interval
    self._heartbeat_task: Optional[Task] = None

heartbeat_interval property writable

heartbeat_interval: Optional[float]

心跳包发送间隔,可修改(秒)

heartbeat_enabled property

heartbeat_enabled: bool

是否开启了心跳包发送计时器

client_id_to_target_id property

client_id_to_target_id: Dict[UUID4, UUID4]

client_idtarget_id 的映射

target_id_to_client_id property

target_id_to_client_id: Dict[UUID4, UUID4]

target_idclient_id 的映射

uuid_to_ws property

uuid_to_ws: Dict[UUID4, WebSocketServerProtocol]

所有的 WebSocket 客户端 ID(包含终端与 App)到 WebSocket 连接对象的映射

local_client_ids property

local_client_ids: Set[UUID4]

所有的本地终端 ID

new_local_client

new_local_client(max_queue: int = 2 ** 5) -> DGLabLocalClient

创建新的本地终端 DGLabLocalClient,记录并返回

Parameters:

Name Type Description Default
max_queue int

终端消息队列最大长度

2 ** 5

Returns:

Type Description
DGLabLocalClient

创建好的本地终端对象

Source code in pydglab_ws/server/server.py
123
124
125
126
127
128
129
130
131
132
133
134
135
def new_local_client(self, max_queue: int = 2 ** 5) -> DGLabLocalClient:
    """
    创建新的本地终端 [`DGLabLocalClient`][pydglab_ws.client.local.DGLabLocalClient],记录并返回
    :param max_queue: 终端消息队列最大长度
    :return: 创建好的本地终端对象
    """
    client_id = uuid4()
    return DGLabLocalClient(
        client_id,
        self._message_handler,
        self._client_id_to_queue.setdefault,
        max_queue
    )

remove_local_client async

remove_local_client(client_id: UUID4) -> bool

移除已连接的本地终端,并通知 App 终端已掉线

Parameters:

Name Type Description Default
client_id UUID4

要移除的本地终端 DGLabLocalClient 的 ID

required

Returns:

Type Description
bool

如果该终端并没有与服务端连接,返回 False,否则返回 True

Source code in pydglab_ws/server/server.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
async def remove_local_client(self, client_id: UUID4) -> bool:
    """
    移除已连接的本地终端,并通知 App 终端已掉线

    :param client_id: 要移除的本地终端 [`DGLabLocalClient`][pydglab_ws.client.local.DGLabLocalClient] 的 ID
    :return: 如果该终端并没有与服务端连接,返回 ``False``,否则返回 ``True``
    """
    try:
        self._client_id_to_queue.pop(client_id)
    except KeyError:
        return False
    else:
        if client_id in self._client_id_to_target_id:
            target_id = self._client_id_to_target_id.pop(client_id)
            self._target_id_to_client_id.pop(target_id)
            if websocket := self._uuid_to_ws.get(target_id):
                message = WebSocketMessage(
                    type=MessageType.BREAK,
                    client_id=client_id,
                    target_id=target_id,
                    message=RetCode.CLIENT_DISCONNECTED
                )
                await self._send(message, websocket)
        return True

add_receive_callback

add_receive_callback(message_type: Literal[BIND, MSG], func: Callable[[WebSocketMessage, bool], Any]) -> bool

添加回调函数,在收到指定类型的消息后调用

Parameters:

Name Type Description Default
message_type Literal[BIND, MSG]

消息类型,仅支持 MessageType.BIND, MessageType.MSG

required
func Callable[[WebSocketMessage, bool], Any]

回调函数,传入消息数据和服务端处理结果,支持异步函数

required

Returns:

Type Description
bool

是否有找到消息类型

Source code in pydglab_ws/server/server.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def add_receive_callback(
        self,
        message_type: Literal[MessageType.BIND, MessageType.MSG],
        func: Callable[[WebSocketMessage, bool], Any]
) -> bool:
    """
    添加回调函数,在收到指定类型的消息后调用
    :param message_type: 消息类型,仅支持 ``MessageType.BIND``, ``MessageType.MSG``
    :param func: 回调函数,传入消息数据和服务端处理结果,支持异步函数
    :return: 是否有找到消息类型
    """
    try:
        self._message_type_to_callbacks[message_type].add(func)
    except KeyError:
        return False
    else:
        return True

remove_receive_callback

remove_receive_callback(message_type: Literal[BIND, MSG], func: Callable[[WebSocketMessage, bool], Any]) -> bool

移除在收到指定类型的消息后调用的回调函数

Parameters:

Name Type Description Default
message_type Literal[BIND, MSG]

消息类型,仅支持 MessageType.BIND, MessageType.MSG

required
func Callable[[WebSocketMessage, bool], Any]

回调函数,传入消息数据和服务端处理结果,支持异步函数

required

Returns:

Type Description
bool

是否有找到消息类型和回调函数

Source code in pydglab_ws/server/server.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def remove_receive_callback(
        self,
        message_type: Literal[MessageType.BIND, MessageType.MSG],
        func: Callable[[WebSocketMessage, bool], Any]
) -> bool:
    """
    移除在收到指定类型的消息后调用的回调函数
    :param message_type: 消息类型,仅支持 ``MessageType.BIND``, ``MessageType.MSG``
    :param func: 回调函数,传入消息数据和服务端处理结果,支持异步函数
    :return: 是否有找到消息类型和回调函数
    """
    try:
        self._message_type_to_callbacks[message_type].remove(func)
    except KeyError:
        return False
    else:
        return True

add_connection_callback

add_connection_callback(mode: Literal['new_connect', 'disconnect'], func: Callable[[UUID4, WebSocketServerProtocol], Any]) -> bool

添加回调函数,在新的 WebSocket 连接建立时(新客户端)或连接断开时调用

Parameters:

Name Type Description Default
mode Literal['new_connect', 'disconnect']

类型,new_connect - 新连接时,处理消息之前;disconnect - 连接断开时

required
func Callable[[UUID4, WebSocketServerProtocol], Any]

回调函数,传入 终端 / App 的 clientId / targetId 和该客户端的 WebSocket 连接对象,支持异步函数

required

Returns:

Type Description
bool

mode 参数不合法时返回 False,否则返回 True

Source code in pydglab_ws/server/server.py
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
def add_connection_callback(
        self,
        mode: Literal["new_connect", "disconnect"],
        func: Callable[[UUID4, WebSocketServerProtocol], Any]
) -> bool:
    """
    添加回调函数,在新的 WebSocket 连接建立时(新客户端)或连接断开时调用
    :param mode: 类型,``new_connect`` - 新连接时,处理消息之前;``disconnect`` - 连接断开时
    :param func: 回调函数,传入 终端 / App 的 ``clientId`` / ``targetId`` 和该客户端的 WebSocket 连接对象,支持异步函数
    :return: ``mode`` 参数不合法时返回 ``False``,否则返回 ``True``
    """
    new_connect_set, disconnect_set = self._connection_callbacks
    if mode == "new_connect":
        new_connect_set.add(func)
    elif mode == "disconnect":
        disconnect_set.add(func)
    else:
        return False
    return True

remove_connection_callback

remove_connection_callback(mode: Literal['new_connect', 'disconnect'], func: Callable[[UUID4, WebSocketServerProtocol], Any]) -> bool

删除在新的 WebSocket 连接建立时(新客户端)或连接断开时调用的回调函数

Parameters:

Name Type Description Default
mode Literal['new_connect', 'disconnect']

类型,new_connect - 新连接时,处理消息之前;disconnect - 连接断开时

required
func Callable[[UUID4, WebSocketServerProtocol], Any]

回调函数,传入 终端 / App 的 clientId / targetId 和该客户端的 WebSocket 连接对象,支持异步函数

required

Returns:

Type Description
bool

mode 参数是否合法且是否找到了回调函数

Source code in pydglab_ws/server/server.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
def remove_connection_callback(
        self,
        mode: Literal["new_connect", "disconnect"],
        func: Callable[[UUID4, WebSocketServerProtocol], Any]
) -> bool:
    """
    删除在新的 WebSocket 连接建立时(新客户端)或连接断开时调用的回调函数
    :param mode: 类型,``new_connect`` - 新连接时,处理消息之前;``disconnect`` - 连接断开时
    :param func: 回调函数,传入 终端 / App 的 ``clientId`` / ``targetId`` 和该客户端的 WebSocket 连接对象,支持异步函数
    :return: ``mode`` 参数是否合法且是否找到了回调函数
    """
    new_connect_set, disconnect_set = self._connection_callbacks
    try:
        if mode == "new_connect":
            new_connect_set.remove(func)
        elif mode == "disconnect":
            disconnect_set.remove(func)
        else:
            return False
    except KeyError:
        return False
    else:
        return True