函数型云托管 集成消息队列(Kafka、Pulsar)和 Redis 实现与客户端收发消息/延时消息的示例。
sequenceDiagram
autonumber
actor c as 客户端
box rgb(33,66,99) 函数型云托管 集群
participant t as 函数 trigger
participant m as 函数 message
end
participant k as Kafka
participant p as Pulsar
participant r as Redis
m --> k: 消费消息(异步)
m --> p: 消费消息(异步)
c ->> +m: 建立 Websocket 连接
note over m: 订阅客户端 channel,异步完成
m --> r: 订阅 Redis channel(异步)
c ->> +t: HTTP 请求触发函数,执行操作
t -->> -c: 返回(同步)
t ->> k: 投递消息
t ->> p: 投递延时消息
note over k: Kafka 投递消息完成后,函数 message 进行消费
note over p: Pulsar 投递延时消息完成后,函数 message 进行消费
m --> r: 收到消息后,发布到客户端 channel(异步)
m -->> c: 推送消息到指定客户端(异步)
.
|-- cloudbase-functions.json # 函数配置文件
|-- cloudrunfunctions # 函数目录
| |-- message # 函数 message
| `-- trigger # 函数 trigger
|-- ecosystem.config.js # pm2 配置文件
|-- package.json # 项目配置文件
|-- packages # 项目共享代码目录
| `-- common
|-- pnpm-workspace.yaml # pnpm 工作空间配置文件
`-- tsconfig.json # TypeScript 配置文件- 前置条件
- 安装
Node.js - 安装
pnpm - 安装
@cloudbase/functions-framework
- 进入项目根目录,安装依赖并构建
pnpm i
pnpm run build- 在项目根目录创建
.env文件,填入中间件参数
touch .env.env 文件内容:
KAFKA_BROKER=ip:port # Kafka 连接地址
KAFKA_TOPIC=your-topic # Kafka 主题名
REDIS_URL=redis://ip:port # Redis 连接地址
PULSAR_SERVICE_URL=http://xxx:8080 # Pulsar 连接地址
PULSAR_TOPIC=your-topic # Pulsar 主题名
PULSAR_SUBSCRIPTION=xxx # Pulsar 订阅名
PULSAR_TOKEN=eyJxxx # Pulsar 认证 token- 启动服务
pnpm start- 客户端向
message函数发起 websocket 长链接
# User-Agent、客户端 IP 用于关联客户端
wscat -c "ws://127.0.0.1:3000/message" -H "User-Agent:unique-client"- 客户端向
trigger函数发送消息
# User-Agent、客户端 IP 用于关联客户端
curl -v http://127.0.0.1:3000/trigger \
-H "content-type:application/json" \
-H "User-Agent:unique-client" \
-d '{"id":"Hello"}'该命令执行后,会触发两条消息推送,其中一个是实时消息,另一个是延时(3s)消息。
- 连接到
message函数的客户端收到服务端推送的消息,消息内容为客户端向trigger函数发送的消息
< {"id":"Hello"}
-
使用
pm2扩展为多节点部署8.1 全局安装
pm2npm i -g pm2
8.2 使用 pm2 启动项目
pm2 start ecosystem.config.js
