跳到主要内容

Message Bus

1. 概述

在游戏拥有托管后端系统且仅使用 PGOS 的某些服务(例如 DS 托管)的场景中,游戏托管后端可能需要与 DS(专用服务器)或 VS(Virtual Server)通信,并接收 PGOS 事件。通常,DS/VS 可以通过公网直接与游戏托管后端通信。然而,这需要游戏托管后端拥有公网网关。此外,即使游戏托管后端已经拥有公网网关,游戏仍然需要进行一些适配工作。因此,PGOS 有必要提供一种名为Message Bus的即用型通信机制。

2. 架构

如果游戏想要使用Message Bus,游戏托管的后端需要集成 PgosMsgBusSDK。DS 可以通过 PgosSDK 参与通信,VS 可以通过 PGOS 后端 HTTP API 参与通信。此外,您可以通过 PGOS 门户控制台 将 PGOS 事件 绑定到Message Bus,从而将 PGOS 事件推送到游戏托管的后端。

image-20240911144656409

支持的场景:

  • DS 向游戏托管后端推送消息(无响应):(1)、(2)、(3)
  • DS 向游戏托管后端发送 RPC 请求:(1)、(2)、(3)、(4)、(5)
  • VS(HTTP API)向游戏托管后端推送消息(无响应):(6)、(2)、(3)
  • VS(HTTP API)向游戏托管后端发送 RPC 请求:(6)、(2)、(3)、(4)、(7)
  • PGOS 事件向游戏托管后端推送消息(无响应):(8)、(2)、(3)
  • 游戏托管后端向 DS 推送消息(无响应):(3)、(4)、(5)
  • 游戏托管后端向 DS 发送 RPC 请求:(3)、(4)、(5)、(1)、(2)、(3)
提示

游戏可以部署多个代理,PGOS 后端将努力维护稳定的通信管道。例如,除非代理实例离线,否则 DS 实例将始终与一个代理实例(而不是多个代理实例)通信。

对于 PGOS 事件,PGOS 后端将根据 route_key 的哈希值选择代理,该哈希值通常是业务模块的实例 ID,例如玩家 ID、队伍 ID、大厅 ID 等。这样做也是为了尽可能保持通信管道的稳定。

3. PgosMsgBusSDK 集成

PgosMsgBusSDK 以源码形式提供给游戏,依赖 gRPC。目前仅提供 C++ 版本,其他语言版本将陆续推出。您可以从 PGOS 门户的下载页面下载 SDK。SDK 包目录说明如下:

├─scripts                        # some helper scripts such as installing gRPC, generating proto codes, and building the sample.
├─source # the c++ source codes of the sample, including the PgosMsgBusSDK source codes.
│ └─generated_proto_codes # the generated gRPC c++ codes of the proto files.
│ └─pgos_msg_bus_sdk # the PgosMsgBusSDK codes, you can copy this directory to your project to integrate.
│ ├─include # the PgosMsgBusSDK header files for the SDK user.
│ └─source # the PgosMsgBusSDK implementation codes.
│ └─protos # gRPC proto files are responsible for the communication between SDK and msg bus.
├─tools # tools for build
│ └─cmake # CMake for windows & linux, convenient for users to build the sample.

由于 SDK 以源代码形式提供,集成 SDK 只需将 pgos_msg_bus_sdk 目录复制到您的 C++ 项目中即可。此外,如果您本地未安装 gRPC,则需要安装。最后,请确保使用本地 gRPC 基于 proto 文件重新生成 C++ 代码。作为参考,SDK 包中的示例项目演示了基于 CMake 的集成和构建方法,如果您感兴趣,请阅读下一章。

3.1 使用 CMake 集成 SDK

以 SDK 包中的示例项目为例:

  • 步骤 1:将 {SDK 包}\source\pgos_msg_bus_sdk\ 复制到目标项目。

image-20240909151831334

  • 步骤 2:将 SDK 和生成的 proto c++ 源代码添加到 CMakeLists.txt 文件。

您可以参考文件:{SDK 包}\CMakeLists.txt

  • 步骤 3:安装 gRPC

Linux 系统:

  • 如果您本地已安装 gRPC,则可直接使用。
  • 如果您本地未安装 gRPC,则需要自行安装。这里有一份安装指南。您也可以参考:{SDK 包}\scripts\linux_install_grpc.sh,并根据实际需求进行调整。

Windows 系统:

  • 您可以参考文件 {SDK 包}\CMakeLists.txt,其中已完成 gRPC 的安装工作。

  • 步骤 4:根据 proto 文件生成 C++ 代码。

proto 文件路径:{SDK 包}\source\pgos_msg_bus_sdk\source\protos\pgos_msg_bus.proto

请根据此文件生成 proto 源代码,目标输出目录:{SDK 包}\source\generated_proto_codes\

Linux 系统:您可以参考 {SDK 包}\scripts\linux_generate_proto_codes.sh,并根据实际需求进行调整。

Windows 系统:您可以参考 {SDK 包}\CMakeLists.txt,代码生成将在 PRE_BUILD 事件中完成。

  • 步骤 5:使用 PgosMsgBusSDK 添加您的业务。

详情请参阅 PgosMsgBusSDK 使用方法

  • 步骤 6:构建您的 C++ 项目

Linux 系统:您可以参考 {SDK 软件包}\scripts\linux_build_sample.sh

Windows 系统:您可以参考 {SDK 软件包}\scripts\windows_build_sample.bat,注意它依赖于 Visual Studio 2017

请根据您的实际需求进行调整。

4. PgosMsgBusSDK 使用

集成 PgosMsgBusSDK 后,您可以在项目源代码中创建 SDK 实例并调用其提供的 API。您可以参考 {SDK 包}\source\sample.cpp 中的 MyPgosMsgBusSDKManager 类,该类提供了 SDK 的简单使用示例。

4.1 获取 SDK 实例

#include "pgos_msg_bus_sdk.h"

void SomeFunction() {
// It is recommended to use the singleton instance of the 'PgosMsgBusSDK' class.
auto pmbs_sdk = pgos::PgosMsgBusSDK::Singleton();

...
}

4.2 初始化 SDK 实例

#include "pgos_msg_bus_sdk.h"

void SomeFunction() {
pgos::InitSdkParams params;
params.title_id = "your title id"; // obtained your 'title_id' from PGOS portal console.
params.title_region_id = "your title_region_id"; // obtained your 'title_region_id' from PGOS portal console.
params.server_key = "your server_key"; // obtained your 'server_key' from PGOS portal console.
params.wp_log = wp_log;
params.wp_observer = wp_observer;
if (!pgos::PgosMsgBusSDK::Singleton()->Init(params)) {
LOG_ERROR("init PgosMsgBusSDK failed.");
}

...
}

pgos::InitSdkParams 的关键字段:

  • wp_log:如果您希望输出 PgosMsgBusSDK 日志以便日后排查问题,您需要实现 IMsgBusSDKLog 接口,并将指向其实例的弱指针赋值给此字段。
// PgosMsgBusSDK log abstract class.
// If you need to output the PgosMsgBusSDK logs, please implement and set it to InitSDKParams.
class IMsgBusSDKLog {
public:
enum class LogLevel : uint8_t {Debug = 0, Info = 1, Warn = 2, Error = 3 };
virtual void Log(LogLevel level, const char* log) = 0; // Need to support multi-threaded calls.
};
  • wp_observer:如果要监听 PgosMsgBusSDK 的事件,比如收到 DS 的消息或者 SDK 失去连接,则需要实现 IMsgBusSDKEventObserver 接口,并将其实例的弱指针赋值给该字段。
// PgosMsgBusSDK event observer abstract class.
// If you need to output the PgosMsgBusSDK logs, please implement and set it to InitSDKParams.
class IMsgBusSDKEventObserver {
public:
// the event will be triggered when a push message from DS is received.
virtual void OnDsPushMsgReceived(const pgos::DsMsgEvt& evt) {};

// the event will be triggered when a RPCRequest from DS is received,
// and the receiver needs to call the 'resp_cb' to give a response to the sender.
// the 'resp_cb' can be called in the same thread or another thread.
virtual void OnDsRpcRequestReceived(const pgos::DsMsgEvt& evt,
std::function<void(const std::string& resp)>&& resp_cb) {};

// the event will be triggered when a push message from backend HTTP API is received.
virtual void OnHttpApiPushMsgReceived(const pgos::HttpApiMsgEvt& evt) {};

// the event will be triggered when a RPCRequest from backend HTTP API is received,
// and the receiver needs to call the 'resp_cb' to give a response to the sender.
// the 'resp_cb' can be called in the same thread or another thread.
virtual void OnHttpApiRpcRequestReceived(const pgos::HttpApiMsgEvt& evt,
std::function<void(const std::string& resp)>&& resp_cb) {};

// the event will be triggered when a PGOS Event push message is received.
virtual void OnPgosEventPushMsgReceived(const pgos::PgosEventMsgEvt& evt) {};

// the event will be triggered when the connection to the msg bus is lost.
// if this is not due to the 'Destroy' API being called,
// the SDK will automatically try to reconnect 3 times, and if it still fails, this event will be thrown.
virtual void OnConnectionLost(const pgos::PmbSDKConnectionLostEvt& evt) {};
};
提示

对于事件“OnDsRpcRequestReceived”和“OnHttpApiRpcRequestReceived”,如果响应的计算或检索需要大量时间,建议在单独的线程中执行该任务,以防止阻塞其他消息的发送。

4.3 调用 需要的API

目前仅提供向DS发送消息的API。

#include "pgos_msg_bus_sdk.h"

void SomeFunction() {
auto pmbs_sdk = pgos::PgosMsgBusSDK::Singleton();

// call API: PushMsgToDS
std::string msg = "data_sent_via_PushMsgToDS_API";
auto ret = pmbs_sdk->PushMsgToDS(battle_session_id, msg);
LOG_INFO("RunSomeBusiness: PushMsgToDS, bs_id=(%s), msg=(%s), err_code=(%u), err_msg=(%s).",
battle_session_id.c_str(), msg.c_str(), ret.err_code, ret.err_msg.c_str());

// call API: RPCRequestToDS
msg = "data_sent_via_RPCRequestToDS_API";
uint32_t timeout_ms = 10 * 1000;
auto rsp = pmbs_sdk->RPCRequestToDS(battle_session_id, msg, timeout_ms);
LOG_INFO("RunSomeBusiness: RPCRequestToDS, bs_id=(%s), msg=(%s), err_code=(%u), err_msg=(%s), rsp=(%s).",
battle_session_id.c_str(), msg.c_str(), rsp.err_code, rsp.err_msg.c_str(), rsp.respone.c_str());

// call API: AsyncRPCRequestToDS
msg = "data_sent_via_AsyncRPCRequestToDS_API";
bool invoke_ret = pmbs_sdk->AsyncRPCRequestToDS(battle_session_id, msg, timeout_ms,
[battle_session_id, msg](const pgos::RpcRespone& rsp) {
LOG_INFO("RunSomeBusiness: AsyncRPCRequestToDS, bs_id=(%s), msg=(%s), err_code=(%u), err_msg=(%s), rsp=(%s).",
battle_session_id.c_str(), msg.c_str(), rsp.err_code, rsp.err_msg.c_str(), rsp.respone.c_str());
});
LOG_INFO("RunSomeBusiness: AsyncRPCRequestToDS invoke %s", invoke_ret? "success": "failed");

...
}

4.4 关闭 SDK 实例

关闭 PgosMsgBusSDK 实例,如果可能,应在进程关闭时调用此方法。注意:此方法应与 Init API 在同一线程中调用。

#include "pgos_msg_bus_sdk.h"

void OnProcessShutdown() {
pgos::PgosMsgBusSDK::Singleton()->Destroy();

...
}

4.5 关键错误处理

错误码Handling Suggestion
kPmbSdkNotInit (600000)SDK 尚未启动,请在发送消息前进行初始化。
kPmbSdkInvalidParameter (600001)API 输入参数或其某个字段值无效,请查看错误消息以了解更多详细信息。
kPmbSdkBadAlloc (600002)无法为 sdk 分配内存,这意味着发生了操作系统错误。
kPmbSdkWriteTimeout (600003)已达到指定的超时时间,但消息未完成写入操作。如有必要,可以稍后重试发送。
kPmbSdkReadTimeout (600004)已达到指定的超时时间,消息已写入,但尚未收到对等方的响应。如有必要,可以稍后重试发送。
kPmbSdkUnavailable (600005)sdk 实例不可用,通常是由于与 msg 总线的连接问题造成的。
kPmbSdkQueueIsFull (600006)写入队列已达到其限制(100),通常是由于短时间内写入操作过多或网络问题造成的。如有必要,可以稍后重试发送。
kPmbSdkConnectionLost (600007)与 msg 总线的连接丢失。
kPmbSdkDestroyed (600008)SDK 已被破坏。

5. 在门户中查看

此处的“终端”特指持有 PgosMsgBusSDK 实例的游戏后端服务器进程,该进程由游戏托管、启动和关闭。登录 Web 门户,进入控制台,然后前往 Extension > Message Bus,即可查看终端的统计信息和日志。

5.1 端点列表

点击“Endpoint”选项卡,您将获得活跃终端的列表:

image-20240913114438374

端点列表字段说明:

  • 实例 ID:端点的实例 ID。
  • 终端 IP:端点的公网 IP 地址。
  • 消息总数:端点的入口/出口消息总数。
  • 最新消息数:端点最新入口/出口消息数。
  • 连接时间:表示端点与 PGOS 后端建立消息总线通道的时间。

5.2 Message Bus 通道

Message Bus通道是终端与 PGOS 后端之间的虚拟连接。 点击终端的“实例 ID”,您将获得所连接终端的Message Bus通道详情:

image-20240913114510447

它主要分为三个部分:基本信息、推送消息统计和 RPC 消息统计。

5.2.1 基本信息

它列出了此Message Bus通道终端对应的实例 ID、状态、公网 IP 地址、首次连接时间和上次连接时间(如有重连)。

5.2.2 推送消息统计

列表字段说明:

  • 来源:指定推送消息的发送者。
  • 已发送总条数:指定已发送的推送消息条数。
  • 中继总条数:指定已成功中继的推送消息条数。
  • 过去一分钟已发送条数:指定过去一分钟内已发送的推送消息条数。
  • 过去一分钟中继条数:指定过去一分钟内已成功中继的推送消息条数。

5.2.3 RPC 消息统计

列表字段说明:

  • 来源:指定推送消息的发送者。
  • 总发送数:指定已发送的 RPC 消息数。
  • 总中继数:指定已成功中继的 RPC 消息数。
  • 总响应数:指定已成功获得响应的 RPC 消息数。
  • 最后一分钟发送数:指定最后一分钟发送的 RPC 消息数。
  • 最后一分钟中继数:指定最后一分钟成功中继的 RPC 消息数。
  • 最后一分钟响应数:指定最后一分钟成功获得响应的 RPC 消息数。

5.3 终端日志

点击“Log”选项卡,您将获得所有终端行为的简要日志列表:

image-20240913114638116