dapr - 事件的传递机制
gRPC services 定义
dapr 支持在各组件之间使用 gRPC 的方式通信。这些 gRPC 的 services 定义存放在 dapr/dapr 目录下。dapr/dapr/proto 下各目录的定义如下:
packages | description |
---|---|
common | common protos that are imported by multiple packages |
internals | internal gRPC and protobuf definitions which is used for Dapr internal |
runtime | Dapr and App Callback services and its associated protobuf messages |
operator | Dapr Operator gRPC service |
placement | Dapr Placement service |
sentry | Dapr Sentry for CA service |
本例的目的在于分析事件在不同 app 间的传递原理,所以将主要参考上述 runtime 部分的实现。
runtime gRPC services 的实现分为两个部分:
- dapr(dapr/dapr/proto/runtime/v1/dapr.proto),提供了 building blocks 的 APIs 供用户应用使用。
- appcallback(dapr/dapr/proto/runtime/v1/appcallback.proto),提供了用户应用与 dapr runtime 交互的能力,使用者(包括 dapr 的 sdks)可以通过实现这些接口从 dapr runtime 处获取事件。
以 pubsub 中的事件为例,它的 rpc service 的定义部分(位于 dapr/dapr/proto/runtime/v1/dapr.proto):
1 | // Dapr service provides APIs to user application to access Dapr building blocks. |
注册 gRPC services
在 dapr runtime 初始化的时候,会创建 grpc api server(位于 dapr/pkg/runtime/runtime.go):
1 | func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error { |
在这个过程中,dapr 会注册 /dapr.proto.runtime.v1.Dapr/PublishEvent
这些调用地址(位于 dapr/pkg/proto/runtime/v1/dapr_grpc.pb.go):
1 | func RegisterDaprServer(s grpc.ServiceRegistrar, srv DaprServer) { |
实现 server
事件的传递需要由“发送”和“接收”两部分组成。那么可以对上述 gRPC services 作以下理解:
- dapr services 承担“发送”部分的定义,通常在 dapr 仓库中实现
- appcallback services 承担“接收”部分的定义,通常在 sdk 仓库中实现
“发送”的实现
dapr 中的 DaprServer
是 runtime gRPC services 中 dapr service 的 server 实现接口(位于 dapr/pkg/proto/runtime/v1/dapr_grpc.pb.go):
1 | // DaprServer is the server API for Dapr service. |
dapr 的 grpc api 是其实现(位于 dapr/pkg/grpc/api.go):
1 | type api struct { |
可以看到,在 server 中 PublishEvent
函数的工作内容如下:
判断 pubsub 相关配置是否正确(runtime 加载是否正常、有无 pubsub component、有无 topic)
判断事件数据是否为 raw 格式(通过判断事件请求的 metadata 中是否有 “rawPayload” 配置,参考:https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-raw/)
从 context 中解析 tracing 相关配置
生成用于在 building blocks 中传递的事件请求实例 req,其结构如下:
1
2
3
4
5
6
7
8// PublishRequest is the request to publish a message.
type PublishRequest struct {
Data []byte `json:"data"`
PubsubName string `json:"pubsubname"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata"`
ContentType *string `json:"contentType,omitempty"`
}如果步骤 2 判断事件数据为 raw 格式,那么会直接将 in.Data (事件请求中的数据)传递给 req.Data
如果步骤 2 判断事件数据非 raw 格式,那么会生成一个 CloudEvent 格式的 envelope 来封装 in.Data:
1
2
3
4
5
6
7
8
9envelope, err := runtime_pubsub.NewCloudEvent(&runtime_pubsub.CloudEvent{
ID: a.id,
Topic: in.Topic,
DataContentType: in.DataContentType,
Data: body,
TraceID: corID,
TraceState: traceState,
Pubsub: in.PubsubName,
})针对非 raw 格式的事件,还需要应用各 pubsub components 的 features(目前看只有一个
FeatureMessageTTL
,用于计算事件是否过期)通过以下方法发送事件请求:
1
err := a.pubsubAdapter.Publish(&req)
这里会通过 pubsub components 中的
Publish
方法实现具体的发送事件的逻辑,以 kafka 为例(位于 components-contrib/pubsub/kafka/kafka.go):1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36// Publish message to Kafka cluster.
func (k *Kafka) Publish(req *pubsub.PublishRequest) error {
if k.producer == nil {
return errors.New("component is closed")
}
k.logger.Debugf("Publishing topic %v with data: %v", req.Topic, req.Data)
msg := &sarama.ProducerMessage{
Topic: req.Topic,
Value: sarama.ByteEncoder(req.Data),
}
for name, value := range req.Metadata {
if name == key {
msg.Key = sarama.StringEncoder(value)
} else {
if msg.Headers == nil {
msg.Headers = make([]sarama.RecordHeader, 0, len(req.Metadata))
}
msg.Headers = append(msg.Headers, sarama.RecordHeader{
Key: []byte(name),
Value: []byte(value),
})
}
}
partition, offset, err := k.producer.SendMessage(msg)
k.logger.Debugf("Partition: %v, offset: %v", partition, offset)
if err != nil {
return err
}
return nil
}
“接收”的实现
在 dapr runtime 启动时,会开始监听 pubsub 的事件(位于 dapr/pkg/runtime/runtime.go):
1 | func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error { |
在 beginPubSub
函数中会进行以下处理(位于 dapr/pkg/runtime/runtime.go,以 grpc 为例):
通过
ListTopicSubscriptions
获取应用监听的 pubsub components 的 topics 列表通过 pubsub components 的
Subscribe
方法从这些订阅渠道中获取事件,将其转换为var cloudEvent map[string]interface{}
格式调用
publishMessageGRPC
方法将事件发送给应用生成一个新的 envelope 实例,用于封装 msg(kafka 中的原始消息).cloudEvent(dapr 组装的事件):
1
2
3
4
5
6
7
8
9
10envelope := &runtimev1pb.TopicEventRequest{
Id: extractCloudEventProperty(cloudEvent, pubsub.IDField),
Source: extractCloudEventProperty(cloudEvent, pubsub.SourceField),
DataContentType: extractCloudEventProperty(cloudEvent, pubsub.DataContentTypeField),
Type: extractCloudEventProperty(cloudEvent, pubsub.TypeField),
SpecVersion: extractCloudEventProperty(cloudEvent, pubsub.SpecVersionField),
Topic: msg.topic,
PubsubName: msg.metadata[pubsubName],
Path: msg.path,
}根据 msg.cloudEvent(事件)中的 pubsub.DataBase64Field 以及 pubsub.DataField 字段获取事件中的数据,填充到 envelope 中
处理 msg.cloudEvent 中的 tracing 相关配置
将 msg.metadata 加载至 context 中
调用 appcallback gRPC services,此处调用
OnTopicEvent
方法1
res, err := clientV1.OnTopicEvent(ctx, envelope)
进行发送后的一些处理
OnTopicEvent
这部分的工作内容如下(位于 go-sdk/service/grpc/topic.go):
生成一个 common.TopicEvent,并将监听到的事件请求中的 in.Data 填充至其中,这个 e 实例用于在用户应用中传递事件:
1
2
3
4
5
6
7
8
9
10
11e := &common.TopicEvent{
ID: in.Id,
Source: in.Source,
Type: in.Type,
SpecVersion: in.SpecVersion,
DataContentType: in.DataContentType,
Data: data,
RawData: in.Data,
Topic: in.Topic,
PubsubName: in.PubsubName,
}使用通过调用
AddTopicEventHandler
注册的 handlers 对事件进行处理
案例
参考 dapr/go-sdk 中的 pubsub 案例:https://github.com/dapr/go-sdk/blob/main/examples/pubsub/pub/pub.go
在 go-sdk/client/client.go 中有 Client
接口,它将作为 runtime gRPC services 中 dapr service 的实现。
1 | // Client is the interface for Dapr client implementation. |
具体是实现是同文件中的 GRPCClient
结构体:
1 | // GRPCClient is the gRPC implementation of Dapr client. |
其中 protoClient 指向了 dapr client 的 pb 接口定义部分,即 dapr/pkg/proto/runtime/v1/dapr_grpc.pb.go 中的 DaprClient
。
这是用于实现 runtime gRPC services 中 dapr service 的接口。
1 | // DaprClient is the client API for Dapr service. |
生成 client
通过以下代码生成一个 GRPCClient 实例:
1 | func newClientWithConnectionAndCancelFunc( |
发送事件
发送事件的操作会调用 Client 接口中的 PublishEvent 方法。
在 GRPCClient 中的实现如下:
1 | // PublishEvent publishes data onto specific pubsub topic. |
PublishEvent 对发送数据(即 data)的处理如下:
- 直接存储 []byte 类型的数据
- 将 string 类型的数据转换成 []byte 类型后存储
- 将不满足上述两种情况的数据按 JSON 序列化后存储
之后,调用 protoClient.PublishEvent
方法发送数据,protoClient.PublishEvent 的实现为:
1 | func (c *daprClient) PublishEvent(ctx context.Context, in *PublishEventRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { |
处理订阅到的事件
同 “接收”的实现