Argo Events 的 Sensor 逻辑分析
概述
本文简单分析了 Argo Events 项目中 Sensor CRD 是如何监听事件并引导事件进入触发阶段。
Sensor CRD
Sensor CRD 中有两部分用于对事件源进行逻辑运算,一部分是对事件的过滤:
针对每个 dependency (即事件源),设置一个 filters 。其作用是对 CloudEvents 格式的事件进行内容、元数据上的筛选过滤。
1 | spec: |
另一部分是对事件源的过滤:
针对每个 triggers ,设置一个 conditions 。其作用是在 conditions 中的事件源经过运算后为真值时(以收到事件为判断条件),才进入触发逻辑,否则不进入触发逻辑。
1 | spec: |
实现
argo-events/sensors/listener.go 中的 listenEvents 函数负责监听并处理事件。
先通过 getDependencyExpression 函数处理 Trigger 中的 .template.conditions ,它将返回一个事件源列表,告诉 Trigger 应该监听哪些事件源(其中 dependency 指的就是事件源)
处理 conditions 中的逻辑运算符:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// Translate original expression which might contain group names
// to an expression only contains dependency names
translate := func(originalExpr string, parameters map[string]string) (string, error) {
originalExpr = strings.ReplaceAll(originalExpr, "&&", " + \"&&\" + ")
originalExpr = strings.ReplaceAll(originalExpr, "||", " + \"||\" + ")
originalExpr = strings.ReplaceAll(originalExpr, "-", "_")
originalExpr = strings.ReplaceAll(originalExpr, "(", "\"(\"+")
originalExpr = strings.ReplaceAll(originalExpr, ")", "+\")\"")
program, err := expr.Compile(originalExpr, expr.Env(parameters))
if err != nil {
logger.Errorw("Failed to compile original dependency expression", zap.Error(err))
return "", err
}
result, err := expr.Run(program, parameters)
if err != nil {
logger.Errorw("Failed to parse original dependency expression", zap.Error(err))
return "", err
}
newExpr := fmt.Sprintf("%v", result)
newExpr = strings.ReplaceAll(newExpr, "\"(\"", "(")
newExpr = strings.ReplaceAll(newExpr, "\")\"", ")")
return newExpr, nil
}并对逻辑运算进行简化:
1
2
3
4
5
6// NewBoolExpression returns a Minifier instance
// It is used to simplify boolean epressions.
// For example, "(a || b || c) && (a && b)" can be simplified as "a && b"
// It is achieved by using Quine–McCluskey algorithm.
// See https://en.wikipedia.org/wiki/Quine%E2%80%93McCluskey_algorithm
func NewBoolExpression(expression string) (Minifier, error) {...}生成一个 triggerMapping ,用于做一个 dependencyExpression 和对应的 triggers 的映射
dependencyExpression 可以理解为通过逻辑运算符对 dependency 运算后得到的事件源集合。
1
2
3
4
5
6
7
8
9
10
11
12
13
14triggerMapping := make(map[string][]v1alpha1.Trigger)
for _, trigger := range sensor.Spec.Triggers {
depExpr, err := sensorCtx.getDependencyExpression(ctx, trigger)
if err != nil {
logger.Errorw("failed to get dependency expression", zap.Error(err))
return err
}
triggers, ok := triggerMapping[depExpr]
if !ok {
triggers = []v1alpha1.Trigger{}
}
triggers = append(triggers, trigger)
triggerMapping[depExpr] = triggers
}针对每个 triggerMapping 中的 item ,会通过一个 subscribeFunc 方法来处理触发逻辑
其中 ebDriver.SubscribeEventSources 函数的用途和参数说明如下:
1
2
3
4
5
6
7
8
9
10// SubscribeEventSources is used to subscribe multiple event source dependencies
// Parameter - ctx, context
// Parameter - conn, eventbus connection
// Parameter - group, NATS Streaming queue group or Kafka consumer group
// Parameter - closeCh, channel to indicate to close the subscription
// Parameter - dependencyExpr, example: "(dep1 || dep2) && dep3"
// Parameter - dependencies, array of dependencies information
// Parameter - filter, a function used to filter the message
// Parameter - action, a function to be triggered after all conditions meet
SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error这里提到的 filterFunc 的作用是根据
.spec.dependencies.filters
对事件进行筛选 :1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16filterFunc := func(depName string, event cloudevents.Event) bool {
dep, ok := depMapping[depName]
if !ok {
return false
}
if dep.Filters == nil {
return true
}
e := convertEvent(event)
result, err := sensordependencies.Filter(e, dep.Filters)
if err != nil {
logger.Errorw("failed to apply filters", zap.Error(err))
return false
}
return result
}actionFunc 的作用是执行真正的触发逻辑:
1
2
3
4
5actionFunc := func(events map[string]cloudevents.Event) {
if err := sensorCtx.triggerActions(cctx, sensor, events, triggers); err != nil {
logger.Errorw("failed to trigger actions", zap.Error(err))
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16subscribeFunc := func() {
wg1.Add(1)
go func() {
defer wg1.Done()
// release the lock when goroutine exits
defer atomic.StoreUint32(&subLock, 0)
logger.Infof("started subscribing to events for triggers %s with client %s", fmt.Sprintf("[%s]", strings.Join(triggerNames, " ")), clientID)
err = ebDriver.SubscribeEventSources(cctx, conn, group, closeSubCh, depExpression, deps, filterFunc, actionFunc)
if err != nil {
logger.Errorw("failed to subscribe to eventbus", zap.Any("clientID", clientID), zap.Error(err))
return
}
}()
}在 SubscribeEventSources 中,会创建一个订阅者来获取 NATS Streaming 中的事件
而只有在这个订阅者注册后出现的事件才会被处理
1
2
3
4
5
6
7sub, err := nsc.stanConn.QueueSubscribe(n.subject, group, func(m *stan.Msg) {
n.processEventSourceMsg(m, msgHolder, filter, action, log)
}, stan.DurableName(durableName),
stan.SetManualAckMode(),
stan.StartAt(pb.StartPosition_NewOnly),
stan.AckWait(1*time.Second),
stan.MaxInflight(len(msgHolder.depNames)+2))这时候出现了一个很重要的结构体:eventSourceMessageHolder 。它的作用是对事件源的事件进行缓存,以及提供相关的元数据信息(可以根据各属性的注释了解它们的含义)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15// eventSourceMessageHolder is a struct used to hold the message information of subscribed dependencies
type eventSourceMessageHolder struct {
// time that all conditions meet
lastMeetTime int64
// timestamp of last msg when all the conditions meet
latestGoodMsgTimestamp int64
expr *govaluate.EvaluableExpression
depNames []string
// Mapping of [eventSourceName + eventName]dependencyName
sourceDepMap map[string]string
parameters map[string]interface{}
msgs map[string]*eventSourceMessage
// A sync map used to cache the message IDs, it is used to guarantee Exact Once triggering
smap *sync.Map
}在 processEventSourceMsg 函数中,会使用 eventSourceMessageHolder 对事件进行一系列的处理:
如果当前在处理的事件的时间戳早于或等于 eventSourceMessageHolder 中最近的事件的时间戳,那么就重置它的时间错为当前时间戳。
1
2
3
4
5
6
7
8
9
10
11
12
13if existingMsg, ok := msgHolder.msgs[depName]; ok {
if m.Timestamp == existingMsg.timestamp {
// Re-delivered latest messge, update delivery timestamp and return
existingMsg.lastDeliveredTime = now
msgHolder.msgs[depName] = existingMsg
return
} else if m.Timestamp < existingMsg.timestamp {
// Re-delivered old message, ack and return
msgHolder.ackAndCache(m, event.ID())
log.Debugw("Dropping this message because later ones also satisfy", "eventID", event.ID())
return
}
}如果当前在处理的事件的时间戳晚于 eventSourceMessageHolder 中最近的事件的时间戳,那么就生成一个新的 eventSourceMessage 用于存储这个事件。
1
2msgHolder.msgs[depName] = &eventSourceMessage{seq: m.Sequence, timestamp: m.Timestamp, event: event, lastDeliveredTime: now}
msgHolder.parameters[depName] = true清理那些留存时间超过 10m 的事件(它们被认为已经在 NATS 侧被删除)。
1
2
3
4
5
6
7
8
9
10
11
12hasStale := false
for k, v := range msgHolder.msgs {
// Since the message is not acked, the server will keep re-sending it.
// If a message being held didn't get re-delivered in the last 10 minutes, treat it as stale.
if (now - v.lastDeliveredTime) > 10*60 {
msgHolder.reset(k)
hasStale = true
}
}
if hasStale {
return
}判断事件是否满足过滤条件。如果满足条件则表示这一次的事件将会触发最终目标。设置 msgHolder.latestGoodMsgTimestamp 为这个过滤成功的事件的时间戳,同时设置 msgHolder.lastMeetTime 为当前的时间戳。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16result, err := msgHolder.expr.Evaluate(msgHolder.parameters)
if err != nil {
log.Errorf("failed to evaluate dependency expression: %v", err)
// TODO: how to handle this situation?
return
}
if result != true {
return
}
msgHolder.latestGoodMsgTimestamp = m.Timestamp
msgHolder.lastMeetTime = time.Now().Unix()
// Trigger actions
messages := make(map[string]cloudevents.Event)
for k, v := range msgHolder.msgs {
messages[k] = *v.event
}执行触发逻辑。
1
go action(messages)