Argo Events 的 Sensor 逻辑分析

概述

本文简单分析了 Argo Events 项目中 Sensor CRD 是如何监听事件并引导事件进入触发阶段。

Sensor CRD

Sensor CRD 中有两部分用于对事件源进行逻辑运算,一部分是对事件的过滤:

针对每个 dependency (即事件源),设置一个 filters 。其作用是对 CloudEvents 格式的事件进行内容、元数据上的筛选过滤。

1
2
3
4
5
6
7
8
9
10
11
12
spec:
dependencies:
- name: test-dep
eventSourceName: webhook
eventName: example
filters:
data:
- path: bucket
type: string
value:
- argo-workflow-input
- argo-workflow-input1

另一部分是对事件源的过滤:

针对每个 triggers ,设置一个 conditions 。其作用是在 conditions 中的事件源经过运算后为真值时(以收到事件为判断条件),才进入触发逻辑,否则不进入触发逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
spec:
dependencies:
- name: test-dep
eventSourceName: webhook
eventName: example
- name: test-dep-foo
eventSourceName: webhook
eventName: example-foo
triggers:
- template:
conditions: "test-dep" && "test-dep-foo"
name: webhook-workflow-trigger

实现

argo-events/sensors/listener.go 中的 listenEvents 函数负责监听并处理事件。

  1. 先通过 getDependencyExpression 函数处理 Trigger 中的 .template.conditions ,它将返回一个事件源列表,告诉 Trigger 应该监听哪些事件源(其中 dependency 指的就是事件源)

    处理 conditions 中的逻辑运算符:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    // Translate original expression which might contain group names
    // to an expression only contains dependency names
    translate := func(originalExpr string, parameters map[string]string) (string, error) {
    originalExpr = strings.ReplaceAll(originalExpr, "&&", " + \"&&\" + ")
    originalExpr = strings.ReplaceAll(originalExpr, "||", " + \"||\" + ")
    originalExpr = strings.ReplaceAll(originalExpr, "-", "_")
    originalExpr = strings.ReplaceAll(originalExpr, "(", "\"(\"+")
    originalExpr = strings.ReplaceAll(originalExpr, ")", "+\")\"")

    program, err := expr.Compile(originalExpr, expr.Env(parameters))
    if err != nil {
    logger.Errorw("Failed to compile original dependency expression", zap.Error(err))
    return "", err
    }
    result, err := expr.Run(program, parameters)
    if err != nil {
    logger.Errorw("Failed to parse original dependency expression", zap.Error(err))
    return "", err
    }
    newExpr := fmt.Sprintf("%v", result)
    newExpr = strings.ReplaceAll(newExpr, "\"(\"", "(")
    newExpr = strings.ReplaceAll(newExpr, "\")\"", ")")
    return newExpr, nil
    }

    并对逻辑运算进行简化:

    1
    2
    3
    4
    5
    6
    // NewBoolExpression returns a Minifier instance
    // It is used to simplify boolean epressions.
    // For example, "(a || b || c) && (a && b)" can be simplified as "a && b"
    // It is achieved by using Quine–McCluskey algorithm.
    // See https://en.wikipedia.org/wiki/Quine%E2%80%93McCluskey_algorithm
    func NewBoolExpression(expression string) (Minifier, error) {...}
  2. 生成一个 triggerMapping ,用于做一个 dependencyExpression 和对应的 triggers 的映射

    dependencyExpression 可以理解为通过逻辑运算符对 dependency 运算后得到的事件源集合。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    triggerMapping := make(map[string][]v1alpha1.Trigger)
    for _, trigger := range sensor.Spec.Triggers {
    depExpr, err := sensorCtx.getDependencyExpression(ctx, trigger)
    if err != nil {
    logger.Errorw("failed to get dependency expression", zap.Error(err))
    return err
    }
    triggers, ok := triggerMapping[depExpr]
    if !ok {
    triggers = []v1alpha1.Trigger{}
    }
    triggers = append(triggers, trigger)
    triggerMapping[depExpr] = triggers
    }
  3. 针对每个 triggerMapping 中的 item ,会通过一个 subscribeFunc 方法来处理触发逻辑

    其中 ebDriver.SubscribeEventSources 函数的用途和参数说明如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // SubscribeEventSources is used to subscribe multiple event source dependencies
    // Parameter - ctx, context
    // Parameter - conn, eventbus connection
    // Parameter - group, NATS Streaming queue group or Kafka consumer group
    // Parameter - closeCh, channel to indicate to close the subscription
    // Parameter - dependencyExpr, example: "(dep1 || dep2) && dep3"
    // Parameter - dependencies, array of dependencies information
    // Parameter - filter, a function used to filter the message
    // Parameter - action, a function to be triggered after all conditions meet
    SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error

    这里提到的 filterFunc 的作用是根据 .spec.dependencies.filters 对事件进行筛选 :

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    filterFunc := func(depName string, event cloudevents.Event) bool {
    dep, ok := depMapping[depName]
    if !ok {
    return false
    }
    if dep.Filters == nil {
    return true
    }
    e := convertEvent(event)
    result, err := sensordependencies.Filter(e, dep.Filters)
    if err != nil {
    logger.Errorw("failed to apply filters", zap.Error(err))
    return false
    }
    return result
    }

    actionFunc 的作用是执行真正的触发逻辑:

    1
    2
    3
    4
    5
    actionFunc := func(events map[string]cloudevents.Event) {
    if err := sensorCtx.triggerActions(cctx, sensor, events, triggers); err != nil {
    logger.Errorw("failed to trigger actions", zap.Error(err))
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    subscribeFunc := func() {
    wg1.Add(1)
    go func() {
    defer wg1.Done()
    // release the lock when goroutine exits
    defer atomic.StoreUint32(&subLock, 0)

    logger.Infof("started subscribing to events for triggers %s with client %s", fmt.Sprintf("[%s]", strings.Join(triggerNames, " ")), clientID)

    err = ebDriver.SubscribeEventSources(cctx, conn, group, closeSubCh, depExpression, deps, filterFunc, actionFunc)
    if err != nil {
    logger.Errorw("failed to subscribe to eventbus", zap.Any("clientID", clientID), zap.Error(err))
    return
    }
    }()
    }
  4. SubscribeEventSources 中,会创建一个订阅者来获取 NATS Streaming 中的事件

    而只有在这个订阅者注册后出现的事件才会被处理

    1
    2
    3
    4
    5
    6
    7
    sub, err := nsc.stanConn.QueueSubscribe(n.subject, group, func(m *stan.Msg) {
    n.processEventSourceMsg(m, msgHolder, filter, action, log)
    }, stan.DurableName(durableName),
    stan.SetManualAckMode(),
    stan.StartAt(pb.StartPosition_NewOnly),
    stan.AckWait(1*time.Second),
    stan.MaxInflight(len(msgHolder.depNames)+2))

    这时候出现了一个很重要的结构体:eventSourceMessageHolder 。它的作用是对事件源的事件进行缓存,以及提供相关的元数据信息(可以根据各属性的注释了解它们的含义)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // eventSourceMessageHolder is a struct used to hold the message information of subscribed dependencies
    type eventSourceMessageHolder struct {
    // time that all conditions meet
    lastMeetTime int64
    // timestamp of last msg when all the conditions meet
    latestGoodMsgTimestamp int64
    expr *govaluate.EvaluableExpression
    depNames []string
    // Mapping of [eventSourceName + eventName]dependencyName
    sourceDepMap map[string]string
    parameters map[string]interface{}
    msgs map[string]*eventSourceMessage
    // A sync map used to cache the message IDs, it is used to guarantee Exact Once triggering
    smap *sync.Map
    }

    processEventSourceMsg 函数中,会使用 eventSourceMessageHolder 对事件进行一系列的处理:

    如果当前在处理的事件的时间戳早于或等于 eventSourceMessageHolder 中最近的事件的时间戳,那么就重置它的时间错为当前时间戳。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    if existingMsg, ok := msgHolder.msgs[depName]; ok {
    if m.Timestamp == existingMsg.timestamp {
    // Re-delivered latest messge, update delivery timestamp and return
    existingMsg.lastDeliveredTime = now
    msgHolder.msgs[depName] = existingMsg
    return
    } else if m.Timestamp < existingMsg.timestamp {
    // Re-delivered old message, ack and return
    msgHolder.ackAndCache(m, event.ID())
    log.Debugw("Dropping this message because later ones also satisfy", "eventID", event.ID())
    return
    }
    }

    如果当前在处理的事件的时间戳晚于 eventSourceMessageHolder 中最近的事件的时间戳,那么就生成一个新的 eventSourceMessage 用于存储这个事件。

    1
    2
    msgHolder.msgs[depName] = &eventSourceMessage{seq: m.Sequence, timestamp: m.Timestamp, event: event, lastDeliveredTime: now}
    msgHolder.parameters[depName] = true

    清理那些留存时间超过 10m 的事件(它们被认为已经在 NATS 侧被删除)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    hasStale := false
    for k, v := range msgHolder.msgs {
    // Since the message is not acked, the server will keep re-sending it.
    // If a message being held didn't get re-delivered in the last 10 minutes, treat it as stale.
    if (now - v.lastDeliveredTime) > 10*60 {
    msgHolder.reset(k)
    hasStale = true
    }
    }
    if hasStale {
    return
    }

    判断事件是否满足过滤条件。如果满足条件则表示这一次的事件将会触发最终目标。设置 msgHolder.latestGoodMsgTimestamp 为这个过滤成功的事件的时间戳,同时设置 msgHolder.lastMeetTime 为当前的时间戳。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    result, err := msgHolder.expr.Evaluate(msgHolder.parameters)
    if err != nil {
    log.Errorf("failed to evaluate dependency expression: %v", err)
    // TODO: how to handle this situation?
    return
    }
    if result != true {
    return
    }
    msgHolder.latestGoodMsgTimestamp = m.Timestamp
    msgHolder.lastMeetTime = time.Now().Unix()
    // Trigger actions
    messages := make(map[string]cloudevents.Event)
    for k, v := range msgHolder.msgs {
    messages[k] = *v.event
    }

    执行触发逻辑。

    1
    go action(messages)