dapr - 事件的传递机制

gRPC services 定义

dapr 支持在各组件之间使用 gRPC 的方式通信。这些 gRPC 的 services 定义存放在 dapr/dapr 目录下。dapr/dapr/proto 下各目录的定义如下:

packages description
common common protos that are imported by multiple packages
internals internal gRPC and protobuf definitions which is used for Dapr internal
runtime Dapr and App Callback services and its associated protobuf messages
operator Dapr Operator gRPC service
placement Dapr Placement service
sentry Dapr Sentry for CA service

本例的目的在于分析事件在不同 app 间的传递原理,所以将主要参考上述 runtime 部分的实现。

runtime gRPC services 的实现分为两个部分:

  • dapr(dapr/dapr/proto/runtime/v1/dapr.proto),提供了 building blocks 的 APIs 供用户应用使用。
  • appcallback(dapr/dapr/proto/runtime/v1/appcallback.proto),提供了用户应用与 dapr runtime 交互的能力,使用者(包括 dapr 的 sdks)可以通过实现这些接口从 dapr runtime 处获取事件。

以 pubsub 中的事件为例,它的 rpc service 的定义部分(位于 dapr/dapr/proto/runtime/v1/dapr.proto):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Dapr service provides APIs to user application to access Dapr building blocks.
service Dapr {
...

// Publishes events to the specific topic.
rpc PublishEvent(PublishEventRequest) returns (google.protobuf.Empty) {}

...
}

// PublishEventRequest is the message to publish event data to pubsub topic
message PublishEventRequest {
// The name of the pubsub component
string pubsub_name = 1;

// The pubsub topic
string topic = 2;

// The data which will be published to topic.
bytes data = 3;

// The content type for the data (optional).
string data_content_type = 4;

// The metadata passing to pub components
//
// metadata property:
// - key : the key of the message.
map<string, string> metadata = 5;
}

注册 gRPC services

在 dapr runtime 初始化的时候,会创建 grpc api server(位于 dapr/pkg/runtime/runtime.go):

1
2
3
4
5
6
7
8
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
...
// Create and start internal and external gRPC servers
grpcAPI := a.getGRPCAPI()

err = a.startGRPCAPIServer(grpcAPI, a.runtimeConfig.APIGRPCPort)
...
}

在这个过程中,dapr 会注册 /dapr.proto.runtime.v1.Dapr/PublishEvent 这些调用地址(位于 dapr/pkg/proto/runtime/v1/dapr_grpc.pb.go):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func RegisterDaprServer(s grpc.ServiceRegistrar, srv DaprServer) {
s.RegisterService(&Dapr_ServiceDesc, srv)
}

// Dapr_ServiceDesc is the grpc.ServiceDesc for Dapr service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Dapr_ServiceDesc = grpc.ServiceDesc{
ServiceName: "dapr.proto.runtime.v1.Dapr",
HandlerType: (*DaprServer)(nil),
Methods: []grpc.MethodDesc{
...
{
MethodName: "PublishEvent",
Handler: _Dapr_PublishEvent_Handler,
},
...
},
Streams: []grpc.StreamDesc{
{
StreamName: "SubscribeConfigurationAlpha1",
Handler: _Dapr_SubscribeConfigurationAlpha1_Handler,
ServerStreams: true,
},
},
Metadata: "dapr/proto/runtime/v1/dapr.proto",
}

func _Dapr_PublishEvent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PublishEventRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DaprServer).PublishEvent(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/dapr.proto.runtime.v1.Dapr/PublishEvent",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DaprServer).PublishEvent(ctx, req.(*PublishEventRequest))
}
return interceptor(ctx, in, info, handler)
}

实现 server

事件的传递需要由“发送”和“接收”两部分组成。那么可以对上述 gRPC services 作以下理解:

  • dapr services 承担“发送”部分的定义,通常在 dapr 仓库中实现
  • appcallback services 承担“接收”部分的定义,通常在 sdk 仓库中实现

“发送”的实现

dapr 中的 DaprServer 是 runtime gRPC services 中 dapr service 的 server 实现接口(位于 dapr/pkg/proto/runtime/v1/dapr_grpc.pb.go):

1
2
3
4
5
6
7
8
9
// DaprServer is the server API for Dapr service.
// All implementations should embed UnimplementedDaprServer
// for forward compatibility
type DaprServer interface {
...
// Publishes events to the specific topic.
PublishEvent(context.Context, *PublishEventRequest) (*emptypb.Empty, error)
...
}

dapr 的 grpc api 是其实现(位于 dapr/pkg/grpc/api.go):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
type api struct {
actor actors.Actors
directMessaging messaging.DirectMessaging
appChannel channel.AppChannel
stateStores map[string]state.Store
transactionalStateStores map[string]state.TransactionalStore
secretStores map[string]secretstores.SecretStore
secretsConfiguration map[string]config.SecretsScope
configurationStores map[string]configuration.Store
configurationSubscribe map[string]bool
configurationSubscribeLock sync.Mutex
pubsubAdapter runtime_pubsub.Adapter
id string
sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error)
tracingSpec config.TracingSpec
accessControlList *config.AccessControlList
appProtocol string
extendedMetadata sync.Map
components []components_v1alpha.Component
shutdown func()
}

func (a *api) PublishEvent(ctx context.Context, in *runtimev1pb.PublishEventRequest) (*emptypb.Empty, error) {
if a.pubsubAdapter == nil {
err := status.Error(codes.FailedPrecondition, messages.ErrPubsubNotConfigured)
apiServerLogger.Debug(err)
return &emptypb.Empty{}, err
}

pubsubName := in.PubsubName
if pubsubName == "" {
err := status.Error(codes.InvalidArgument, messages.ErrPubsubEmpty)
apiServerLogger.Debug(err)
return &emptypb.Empty{}, err
}

thepubsub := a.pubsubAdapter.GetPubSub(pubsubName)
if thepubsub == nil {
err := status.Errorf(codes.InvalidArgument, messages.ErrPubsubNotFound, pubsubName)
apiServerLogger.Debug(err)
return &emptypb.Empty{}, err
}

topic := in.Topic
if topic == "" {
err := status.Errorf(codes.InvalidArgument, messages.ErrTopicEmpty, pubsubName)
apiServerLogger.Debug(err)
return &emptypb.Empty{}, err
}

rawPayload, metaErr := contrib_metadata.IsRawPayload(in.Metadata)
if metaErr != nil {
err := status.Errorf(codes.InvalidArgument, messages.ErrMetadataGet, metaErr.Error())
apiServerLogger.Debug(err)
return &emptypb.Empty{}, err
}

span := diag_utils.SpanFromContext(ctx)
// Populate W3C traceparent to cloudevent envelope
corID := diag.SpanContextToW3CString(span.SpanContext())
// Populate W3C tracestate to cloudevent envelope
traceState := diag.TraceStateToW3CString(span.SpanContext())

body := []byte{}
if in.Data != nil {
body = in.Data
}

data := body

if !rawPayload {
envelope, err := runtime_pubsub.NewCloudEvent(&runtime_pubsub.CloudEvent{
ID: a.id,
Topic: in.Topic,
DataContentType: in.DataContentType,
Data: body,
TraceID: corID,
TraceState: traceState,
Pubsub: in.PubsubName,
})
if err != nil {
err = status.Errorf(codes.InvalidArgument, messages.ErrPubsubCloudEventCreation, err.Error())
apiServerLogger.Debug(err)
return &emptypb.Empty{}, err
}

features := thepubsub.Features()
pubsub.ApplyMetadata(envelope, features, in.Metadata)

data, err = jsoniter.ConfigFastest.Marshal(envelope)
if err != nil {
err = status.Errorf(codes.InvalidArgument, messages.ErrPubsubCloudEventsSer, topic, pubsubName, err.Error())
apiServerLogger.Debug(err)
return &emptypb.Empty{}, err
}
}

req := pubsub.PublishRequest{
PubsubName: pubsubName,
Topic: topic,
Data: data,
Metadata: in.Metadata,
}

err := a.pubsubAdapter.Publish(&req)
if err != nil {
nerr := status.Errorf(codes.Internal, messages.ErrPubsubPublishMessage, topic, pubsubName, err.Error())
if errors.As(err, &runtime_pubsub.NotAllowedError{}) {
nerr = status.Errorf(codes.PermissionDenied, err.Error())
}

if errors.As(err, &runtime_pubsub.NotFoundError{}) {
nerr = status.Errorf(codes.NotFound, err.Error())
}
apiServerLogger.Debug(nerr)
return &emptypb.Empty{}, nerr
}
return &emptypb.Empty{}, nil
}

可以看到,在 server 中 PublishEvent 函数的工作内容如下:

  1. 判断 pubsub 相关配置是否正确(runtime 加载是否正常、有无 pubsub component、有无 topic)

  2. 判断事件数据是否为 raw 格式(通过判断事件请求的 metadata 中是否有 “rawPayload” 配置,参考:https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-raw/)

  3. 从 context 中解析 tracing 相关配置

  4. 生成用于在 building blocks 中传递的事件请求实例 req,其结构如下:

    1
    2
    3
    4
    5
    6
    7
    8
    // PublishRequest is the request to publish a message.
    type PublishRequest struct {
    Data []byte `json:"data"`
    PubsubName string `json:"pubsubname"`
    Topic string `json:"topic"`
    Metadata map[string]string `json:"metadata"`
    ContentType *string `json:"contentType,omitempty"`
    }

    如果步骤 2 判断事件数据为 raw 格式,那么会直接将 in.Data (事件请求中的数据)传递给 req.Data

    如果步骤 2 判断事件数据非 raw 格式,那么会生成一个 CloudEvent 格式的 envelope 来封装 in.Data:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    envelope, err := runtime_pubsub.NewCloudEvent(&runtime_pubsub.CloudEvent{
    ID: a.id,
    Topic: in.Topic,
    DataContentType: in.DataContentType,
    Data: body,
    TraceID: corID,
    TraceState: traceState,
    Pubsub: in.PubsubName,
    })

    针对非 raw 格式的事件,还需要应用各 pubsub components 的 features(目前看只有一个 FeatureMessageTTL,用于计算事件是否过期)

  5. 通过以下方法发送事件请求:

    1
    err := a.pubsubAdapter.Publish(&req)

    这里会通过 pubsub components 中的 Publish 方法实现具体的发送事件的逻辑,以 kafka 为例(位于 components-contrib/pubsub/kafka/kafka.go):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    // Publish message to Kafka cluster.
    func (k *Kafka) Publish(req *pubsub.PublishRequest) error {
    if k.producer == nil {
    return errors.New("component is closed")
    }
    k.logger.Debugf("Publishing topic %v with data: %v", req.Topic, req.Data)

    msg := &sarama.ProducerMessage{
    Topic: req.Topic,
    Value: sarama.ByteEncoder(req.Data),
    }

    for name, value := range req.Metadata {
    if name == key {
    msg.Key = sarama.StringEncoder(value)
    } else {
    if msg.Headers == nil {
    msg.Headers = make([]sarama.RecordHeader, 0, len(req.Metadata))
    }
    msg.Headers = append(msg.Headers, sarama.RecordHeader{
    Key: []byte(name),
    Value: []byte(value),
    })
    }
    }

    partition, offset, err := k.producer.SendMessage(msg)

    k.logger.Debugf("Partition: %v, offset: %v", partition, offset)

    if err != nil {
    return err
    }

    return nil
    }

“接收”的实现

在 dapr runtime 启动时,会开始监听 pubsub 的事件(位于 dapr/pkg/runtime/runtime.go):

1
2
3
4
5
6
7
8
9
10
11
12
13
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
...
a.startSubscribing()
...
}

func (a *DaprRuntime) startSubscribing() {
for name, pubsub := range a.pubSubs {
if err := a.beginPubSub(name, pubsub); err != nil {
log.Errorf("error occurred while beginning pubsub %s: %s", name, err)
}
}
}

beginPubSub 函数中会进行以下处理(位于 dapr/pkg/runtime/runtime.go,以 grpc 为例):

  • 通过 ListTopicSubscriptions 获取应用监听的 pubsub components 的 topics 列表

  • 通过 pubsub components 的 Subscribe 方法从这些订阅渠道中获取事件,将其转换为 var cloudEvent map[string]interface{} 格式

  • 调用 publishMessageGRPC 方法将事件发送给应用

    • 生成一个新的 envelope 实例,用于封装 msg(kafka 中的原始消息).cloudEvent(dapr 组装的事件):

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      envelope := &runtimev1pb.TopicEventRequest{
      Id: extractCloudEventProperty(cloudEvent, pubsub.IDField),
      Source: extractCloudEventProperty(cloudEvent, pubsub.SourceField),
      DataContentType: extractCloudEventProperty(cloudEvent, pubsub.DataContentTypeField),
      Type: extractCloudEventProperty(cloudEvent, pubsub.TypeField),
      SpecVersion: extractCloudEventProperty(cloudEvent, pubsub.SpecVersionField),
      Topic: msg.topic,
      PubsubName: msg.metadata[pubsubName],
      Path: msg.path,
      }
    • 根据 msg.cloudEvent(事件)中的 pubsub.DataBase64Field 以及 pubsub.DataField 字段获取事件中的数据,填充到 envelope 中

    • 处理 msg.cloudEvent 中的 tracing 相关配置

    • 将 msg.metadata 加载至 context 中

    • 调用 appcallback gRPC services,此处调用 OnTopicEvent 方法

      1
      res, err := clientV1.OnTopicEvent(ctx, envelope)
    • 进行发送后的一些处理

OnTopicEvent 这部分的工作内容如下(位于 go-sdk/service/grpc/topic.go):

  • 生成一个 common.TopicEvent,并将监听到的事件请求中的 in.Data 填充至其中,这个 e 实例用于在用户应用中传递事件:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    e := &common.TopicEvent{
    ID: in.Id,
    Source: in.Source,
    Type: in.Type,
    SpecVersion: in.SpecVersion,
    DataContentType: in.DataContentType,
    Data: data,
    RawData: in.Data,
    Topic: in.Topic,
    PubsubName: in.PubsubName,
    }
  • 使用通过调用 AddTopicEventHandler 注册的 handlers 对事件进行处理

案例

参考 dapr/go-sdk 中的 pubsub 案例:https://github.com/dapr/go-sdk/blob/main/examples/pubsub/pub/pub.go

在 go-sdk/client/client.go 中有 Client 接口,它将作为 runtime gRPC services 中 dapr service 的实现。

1
2
3
4
5
6
7
8
9
// Client is the interface for Dapr client implementation.
type Client interface {
...

// PublishEvent publishes data onto topic in specific pubsub component.
PublishEvent(ctx context.Context, pubsubName, topicName string, data interface{}, opts ...PublishEventOption) error

...
}

具体是实现是同文件中的 GRPCClient 结构体:

1
2
3
4
5
6
7
8
// GRPCClient is the gRPC implementation of Dapr client.
type GRPCClient struct {
connection *grpc.ClientConn
ctxCancelFunc context.CancelFunc
protoClient pb.DaprClient
authToken string
mux sync.Mutex
}

其中 protoClient 指向了 dapr client 的 pb 接口定义部分,即 dapr/pkg/proto/runtime/v1/dapr_grpc.pb.go 中的 DaprClient

这是用于实现 runtime gRPC services 中 dapr service 的接口。

1
2
3
4
5
6
// DaprClient is the client API for Dapr service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type DaprClient interface {
...
}

生成 client

通过以下代码生成一个 GRPCClient 实例:

1
2
3
4
5
6
7
8
9
10
11
func newClientWithConnectionAndCancelFunc(
conn *grpc.ClientConn,
cancelFunc context.CancelFunc,
) Client {
return &GRPCClient{
connection: conn,
ctxCancelFunc: cancelFunc,
protoClient: pb.NewDaprClient(conn),
authToken: os.Getenv(apiTokenEnvVarName),
}
}

发送事件

发送事件的操作会调用 Client 接口中的 PublishEvent 方法。

在 GRPCClient 中的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// PublishEvent publishes data onto specific pubsub topic.
func (c *GRPCClient) PublishEvent(ctx context.Context, pubsubName, topicName string, data interface{}, opts ...PublishEventOption) error {
if pubsubName == "" {
return errors.New("pubsubName name required")
}
if topicName == "" {
return errors.New("topic name required")
}

request := &pb.PublishEventRequest{
PubsubName: pubsubName,
Topic: topicName,
}
for _, opt := range opts {
opt(request)
}

if data != nil {
switch d := data.(type) {
case []byte:
request.Data = d
case string:
request.Data = []byte(d)
default:
var err error
request.DataContentType = "application/json"
request.Data, err = json.Marshal(d)
if err != nil {
return errors.WithMessage(err, "error serializing input struct")
}
}
}

_, err := c.protoClient.PublishEvent(c.withAuthToken(ctx), request)
if err != nil {
return errors.Wrapf(err, "error publishing event unto %s topic", topicName)
}

return nil
}

PublishEvent 对发送数据(即 data)的处理如下:

  • 直接存储 []byte 类型的数据
  • 将 string 类型的数据转换成 []byte 类型后存储
  • 将不满足上述两种情况的数据按 JSON 序列化后存储

之后,调用 protoClient.PublishEvent 方法发送数据,protoClient.PublishEvent 的实现为:

1
2
3
4
5
6
7
8
func (c *daprClient) PublishEvent(ctx context.Context, in *PublishEventRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/dapr.proto.runtime.v1.Dapr/PublishEvent", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

处理订阅到的事件

“接收”的实现