Dapr|订阅消息
介绍
Dapr应用订阅消息是让程序之间解耦的主要方式,通过发布订阅的思想也可以实现服务之间的异步调用1。
发布订阅的概念来自于事件驱动架构(EDA)的设计思想,大部分分布式应用都会依赖这样的发布订阅解耦模式。
Dapr为了解决这种问题,提供开箱即用的消息传送抽象和实现,封装在Dapr构建基块中。业务系统只需调用跟据Dapr的要求实现订阅发布即可
使用说明
使用有两个步骤:
- 在应用
route
为/dapr/subscribe
返回,本应用要订阅的消息类型、处理route
等信息 - 实现对应
route
的业务逻辑
示例1 发送消息
client, err := dapr.NewClient()
if err != nil {
panic(err)
}
// 发送的消息数据
data := map[string]interface{}{
"id": 100,
"value": "Golang PubSub",
"value2": "Golang 消息推送:" + strconv.FormatInt(time.Now().Unix(), 10),
}
if err := client.PublishEvent(ctx, "pubsub", "helloPub", data); err != nil {
panic(err)
}
示例2 订阅 及接收消息
r := gin.Default()
client, err := dapr.NewClient()
if err != nil {
panic(err)
}
defer client.Close()
// 返回本应用订阅的消息 主题名称 订阅名称及对应处理的route
r.GET("/dapr/subscribe", func(c *gin.Context) {
// TODO 从 数据库 配置文件 获取
c.JSON(http.StatusOK, []gin.H{
{
"topic": "helloPub", // 主题名称
"pubsubname": "pubsub", // 订阅名称
"route": "dapr/message", // 接收相应的 route
},
})
})
// 根据 `/dapr/subscribe` 的配置,接收处理即可
r.POST("/dapr/message", func(c *gin.Context) {
data, _ := c.GetRawData()
log.Println("dsstatus data", string(data))
// 接收的全部数据,data为发送的数据,其他为统一的结构化数据,
// 若route只有一个,则使用 topic pubsubname 区分消息类型
//{
// "pubsubname":"pubsub",
// "data":{
// "id":100,
// "value":"Golang PubSub",
// "value2":"Golang 消息推送"
// },
// "traceid":"00-d42a8a2c653456b52db8b86f510b13b6-796d67e5bbe32b21-00",
// "tracestate":"",
// "id":"887b0e69-ddb4-4a6a-a2dd-1e09fa533fbe",
// "specversion":"1.0",
// "datacontenttype":"application/json",
// "source":"goapp",
// "type":"com.dapr.event.sent",
// "topic":"helloPub"
//}
c.JSON(http.StatusOK, gin.H{
"success": true,
})
})
r.Run(":8080")