介绍

Dapr应用订阅消息是让程序之间解耦的主要方式,通过发布订阅的思想也可以实现服务之间的异步调用1。

发布订阅的概念来自于事件驱动架构(EDA)的设计思想,大部分分布式应用都会依赖这样的发布订阅解耦模式。

Dapr为了解决这种问题,提供开箱即用的消息传送抽象和实现,封装在Dapr构建基块中。业务系统只需调用跟据Dapr的要求实现订阅发布即可

使用说明

使用有两个步骤:

  1. 在应用 route/dapr/subscribe返回,本应用要订阅的消息类型、处理route等信息
  2. 实现对应route的业务逻辑

示例1 发送消息

client, err := dapr.NewClient()
if err != nil {
	panic(err)
}
// 发送的消息数据
data := map[string]interface{}{
	"id":     100,
	"value":  "Golang PubSub",
	"value2": "Golang 消息推送:" + strconv.FormatInt(time.Now().Unix(), 10),
}
if err := client.PublishEvent(ctx, "pubsub", "helloPub", data); err != nil {
	panic(err)
}

示例2 订阅 及接收消息

r := gin.Default()
client, err := dapr.NewClient()
if err != nil {
	panic(err)
}
defer client.Close()
// 返回本应用订阅的消息 主题名称 订阅名称及对应处理的route
r.GET("/dapr/subscribe", func(c *gin.Context) {
    // TODO 从 数据库 配置文件 获取
	c.JSON(http.StatusOK, []gin.H{
		{
			"topic":      "helloPub",     // 主题名称
			"pubsubname": "pubsub",       // 订阅名称
			"route":      "dapr/message", // 接收相应的 route
		},
	})
})
// 根据 `/dapr/subscribe` 的配置,接收处理即可
r.POST("/dapr/message", func(c *gin.Context) {
	data, _ := c.GetRawData()
	log.Println("dsstatus data", string(data))
    // 接收的全部数据,data为发送的数据,其他为统一的结构化数据,
    // 若route只有一个,则使用 topic pubsubname 区分消息类型
	//{
	//    "pubsubname":"pubsub",
	//    "data":{
	//        "id":100,
	//        "value":"Golang PubSub",
	//        "value2":"Golang 消息推送"
	//    },
	//    "traceid":"00-d42a8a2c653456b52db8b86f510b13b6-796d67e5bbe32b21-00",
	//    "tracestate":"",
	//    "id":"887b0e69-ddb4-4a6a-a2dd-1e09fa533fbe",
	//    "specversion":"1.0",
	//    "datacontenttype":"application/json",
	//    "source":"goapp",
	//    "type":"com.dapr.event.sent",
	//    "topic":"helloPub"
	//}
	c.JSON(http.StatusOK, gin.H{
		"success": true,
	})
})
r.Run(":8080")