bigbully +

Executor

在创建worker的时候会从worker中获取所有executor,逐一创建。

(reset! executors (dofor [e (:executors worker)] (executor/mk-executor worker e)))

来看看executor.clj中是如何创建executor的,首先需要定义executor的元数据:

(defn mk-executor-data [worker executor-id]
  (let [worker-context (worker-context worker);;worker的上下文WorkerTopologyContext
    task-ids (executor-id->tasks executor-id);;当前executor对应的taskId
    component-id (.getComponentId worker-context (first task-ids));;获取第一个task对应的component-id
    storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id);;合并配置项
    executor-type (executor-type worker-context component-id);;获取executor-typeh
    batch-transfer->worker (disruptor/disruptor-queue
                              (str "executor"  executor-id "-send-queue")
                              (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
                              :claim-strategy :single-threaded
                              :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY));;创建executor发送队列
    ]
(recursive-map
 :worker worker
 :worker-context worker-context
 :executor-id executor-id
 :task-ids task-ids
 :component-id component-id
 :open-or-prepare-was-called? (atom false)
 :storm-conf storm-conf
 :receive-queue ((:executor-receive-queue-map worker) executor-id)
 :storm-id (:storm-id worker)
 :conf (:conf worker)
 :shared-executor-data (HashMap.)
 :storm-active-atom (:storm-active-atom worker)
 :batch-transfer-queue batch-transfer->worker
 :transfer-fn (mk-executor-transfer-fn batch-transfer->worker);;executor中消息发送的函数
 :suicide-fn (:suicide-fn worker)
 :storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker))
 :type executor-type
 ;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field)
 :stats (mk-executor-stats <> (sampling-rate storm-conf));;executor的运行统计信息
 :interval->task->metric-registry (HashMap.)
 :task->component (:task->component worker)
 :stream->component->grouper (outbound-components worker-context component-id);;从流到接收组件及分组函数的映射
 :report-error (throttled-report-error-fn <>);;报告错误的函数
 :report-error-and-die (fn [error]
                         ((:report-error <>) error)
                         ((:suicide-fn <>)));;报告错误然后杀死自己的函数
 :deserializer (KryoTupleDeserializer. storm-conf worker-context);;反序列化器
 :sampler (mk-stats-sampler storm-conf);;运行统计采样率
 ;; TODO: add in the executor-specific stuff in a :specific... or make a spout-data, bolt-data function?
 )))

接下来看看创建executor的过程

(defn mk-executor [worker executor-id]
  (let [executor-data (mk-executor-data worker executor-id);;创建executor元数据
    _ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id))
    task-datas (->> executor-data
                    :task-ids
                    (map (fn [t] [t (task/mk-task executor-data t)]))
                    (into {})
                    (HashMap.));;从元数据中获取taskId,并通过mk-task函数创建task
    _ (log-message "Loaded executor tasks " (:component-id executor-data) ":" (pr-str executor-id))
    report-error-and-die (:report-error-and-die executor-data)
    component-id (:component-id executor-data)

    ;; starting the batch-transfer->worker ensures that anything publishing to that queue 
    ;; doesn't block (because it's a single threaded queue and the caching/consumer started
    ;; trick isn't thread-safe)
    system-threads [(start-batch-transfer->worker-handler! worker executor-data)];;启动executor数据发送线程
    handlers (with-error-reaction report-error-and-die
               (mk-threads executor-data task-datas));;创建spout或bolt的主循环线程,并用report-error-and-die处理异常
    threads (concat handlers system-threads)];;获得所有的线程    
(setup-ticks! worker executor-data);;安装ticks

(log-message "Finished loading executor " component-id ":" (pr-str executor-id))
;; TODO: add method here to get rendered stats... have worker call that when heartbeating
(reify
  RunningExecutor;;实现RunningExecutor接口,用来手机executor的运行统计
  (render-stats [this]
    (stats/render-stats! (:stats executor-data)))
  (get-executor-id [this]
    executor-id )
  Shutdownable;;实现Shutdownable接口,在关闭时会做以下处理
  (shutdown
    [this]
    (log-message "Shutting down executor " component-id ":" (pr-str executor-id))
    (disruptor/halt-with-interrupt! (:receive-queue executor-data));;关闭receive-queue
    (disruptor/halt-with-interrupt! (:batch-transfer-queue executor-data));;关闭batch-transfer-queue
    (doseq [t threads]
      (.interrupt t)
      (.join t));;中断所有工作线程,并等待结束

    (doseq [user-context (map :user-context (vals task-datas))]
      (doseq [hook (.getHooks user-context)]
        (.cleanup hook)));;清理用户的关闭钩子
    (.disconnect (:storm-cluster-state executor-data));;断开与zk的链接
    (when @(:open-or-prepare-was-called? executor-data)
      (doseq [obj (map :object (vals task-datas))]
        (close-component executor-data obj)));;调用spout或bolt的close方法
    (log-message "Shut down executor " component-id ":" (pr-str executor-id)))
    )))

start-batch-transfer->worker-handler!

executor如何启动数据发送线程的:

(defn start-batch-transfer->worker-handler! [worker executor-data]
  (let [worker-transfer-fn (:transfer-fn worker);;获得worker的transfer函数
    cached-emit (MutableObject. (ArrayList.))
    storm-conf (:storm-conf executor-data)
    serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data))
    ]
  (disruptor/consume-loop*
    (:batch-transfer-queue executor-data);;接收消息的queue
    (disruptor/handler [o seq-id batch-end?];;创建消息处理handler
      (let [^ArrayList alist (.getObject cached-emit)]
        (.add alist o)
        (when batch-end?
          (worker-transfer-fn serializer alist);;调用worker-transfer-fn函数把对消息进行处理
          (.setObject cached-emit (ArrayList.));;重置cached-emit
          )))
    :kill-fn (:report-error-and-die executor-data))));;设置report-error-and-die为异常处理函数

mk-threads

因为executor分为两种spout和bolt,所以在创建工作线程时,也分为两种类型。

首先来看spout类型:

(defmethod mk-threads :spout [executor-data task-datas]
  (let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler open-or-prepare-was-called?]} executor-data;;mk-threads函数将使用的executor-data中的数据
    ^ISpoutWaitStrategy spout-wait-strategy (init-spout-wait-strategy storm-conf);;创建并prepare ISpoutWaitStrategy,它用来在nextTuple调用多次没有输出的时候休眠一段时间
    max-spout-pending (executor-max-spout-pending storm-conf (count task-datas));;结果和spout中的task数量有关,如果超过max-spout-pending的消息没有被ack或fail,则spout会进入等待状态,并利用这个参数进行流量控制
    ^Integer max-spout-pending (if max-spout-pending (int max-spout-pending))        
    last-active (atom false);;标示spout的状态,这个状态与topology的状态一致        
    spouts (ArrayList. (map :object (vals task-datas)));;每个task是一个spout,组成集合
    rand (Random. (Utils/secureRandomLong))

    pending (RotatingMap.
             2 ;; microoptimize for performance of .size method
             (reify RotatingMap$ExpiredCallback
               (expire [this msg-id [task-id spout-id tuple-info start-time-ms]]
                 (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
                   (fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta)
                   ))));;pending用来储存发出去但是还没有ack或fail的消息,如果超时会触发这里的expire函数,调用fail-spout-msg
    tuple-action-fn ;;用来处理spout收到的消息
    receive-queue (:receive-queue executor-data)
    event-handler (mk-task-receiver executor-data tuple-action-fn);;创建一个handler用tuple-action-fn来处理tuple
    has-ackers? (has-ackers? storm-conf)
    emitted-count fail-spout-msg(MutableLong. 0)
    empty-emit-streak (MutableLong. 0)

    ;; the overflow buffer is used to ensure that spouts never block when emitting
    ;; this ensures that the spout can always clear the incoming buffer (acks and fails), which
    ;; prevents deadlock from occuring across the topology (e.g. Spout -> Bolt -> Acker -> Spout, and all
    ;; buffers filled up)
    ;; when the overflow buffer is full, spouts stop calling nextTuple until it's able to clear the overflow buffer
    ;; this limits the size of the overflow buffer to however many tuples a spout emits in one call of nextTuple, 
    ;; preventing memory issues
    overflow-buffer (LinkedList.)];;当发送队列被填满时,会把消息存在 overflow-buffer 中,而不是阻塞发消息

[(async-loop;;因为async-loop被设置为工厂模式,所以以下函数被用来生成开启新线程自旋执行用的函数
  (fn []
    ;; If topology was started in inactive state, don't call (.open spout) until it's activated first.
    (while (not @(:storm-active-atom executor-data))
      (Thread/sleep 100));;等待,直到topology进入活跃状态

    (log-message "Opening spout " component-id ":" (keys task-datas))
    (doseq [[task-id task-data] task-datas;;遍历每一个task
            :let [^ISpout spout-obj (:object task-data);;获得ISpout spout-obj
                  tasks-fn (:tasks-fn task-data);;获得tasks-fn函数,这个函数在task中定义
                  send-spout-msg ;;spout发送消息的函数,下文会具体分析]]
      (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data))
      (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data);;注册metrics
                                               :receive receive-queue}
                                              storm-conf (:user-context task-data))

      (.open spout-obj;;调用spout-obj的open方法
             storm-conf;;传入storm-conf
             (:user-context task-data);;传入TopologyContext
             (SpoutOutputCollector.;;构建一个SpoutOutputCollector
              (reify ISpoutOutputCollector;;并实现ISpoutOutputCollector接口
                (^List emit [this ^String stream-id ^List tuple ^Object message-id]
                  (fail-spout-msgsend-spout-msg stream-id tuple message-id nil)
                  );;emit函数会调用send-spout-msg发送数据
                (^void emitDirect [this ^int out-task-id ^String stream-id
                                   ^List tuple ^Object message-id]
                  (send-spout-msg stream-id tuple message-id out-task-id)
                  );;emitDirect函数会调用send-spout-msg发送数据
                (reportError [this error]async
                  (report-error error)
                  )))));;reportError函数会调用report-error发送数据
    (reset! open-or-prepare-was-called? true) ;;设置open的状态
    (log-message "Opened spout " component-id ":" (keys task-datas))
    (setup-metrics! executor-data);;初始化metrics

    (disruptor/consumer-started! (:receive-queue executor-data));;打开接收消息队列
    (fn [];;这个函数是真正用来自旋执行的
      ;; This design requires that spouts be non-blocking
      (disruptor/consume-batch receive-queue event-handler);;用非阻塞的方式接收receive-queue里的消息并进行处理

      ;; try to clear the overflow-buffer
      (try-cause
        (while (not (.isEmpty overflow-buffer))
          (let [[out-task out-tuple] (.peek overflow-buffer)]
            (transfer-fn out-task out-tuple false nil)
            (.removeFirst overflow-buffer)))
      (catch InsufficientCapacityException e
        ));;优先使用transfer-fn发送 overflow-buffer的全部数据

      (let [active? @(:storm-active-atom executor-data)
            curr-count (.get emitted-count)]
        (if (and (.isEmpty overflow-buffer);;如果overflow-buffer为空
                 (or (not max-spout-pending);;并且未设置max-spout-pending
                     (< (.size pending) max-spout-pending)));;或pending的量小于max-spout-pending
          ;;则进行以下逻辑
          (if active?;;如果topology处于激活状态
            (do
              (when-not @last-active;;如果@last-active为false
                (reset! last-active true);;重置为true
                (log-message "Activating spout " component-id ":" (keys task-datas))
                (fast-list-iter [^ISpout spout spouts] (.activate spout)));;依次触发spout的activate方法方法

              (fast-list-iter [^ISpout spout spouts] (.nextTuple spout)));;依次调用nextTuple方法,nextTuple方法的实现中会调用emit或emitDirect函数,最终调用的是send-spout-msg函数
            (do
              (when @last-active;;如果@last-active为true时
                (reset! last-active false);;重设为false
                (log-message "Deactivating spout " component-id ":" (keys task-datas))
                (fast-list-iter [^ISpout spout spouts] (.deactivate spout)));;依次调用ISpout的deactivate方法
              ;; TODO: log that it's getting throttled
              (Time/sleep 100))));;休眠100ms
        (if (and (= curr-count (.get emitted-count)) active?);;如果curr-count与emitted-count相等,则证明消息还没发出去
          (do (.increment empty-emit-streak)
              (.emptyEmit spout-wait-strategy (.get empty-emit-streak)));;会使用spout-wait-strategy来进行休眠,休眠时间逐渐增长
          (.set empty-emit-streak 0);;发送成功后,休眠时间归零
          ))           
      0));;这里返回0表示自旋时的时间间隔为0
  :kill-fn (:report-error-and-die executor-data)
  :factory? true
  :thread-name component-id)]))

tuple-action-fn

来看一些实现细节,比如tuple-action-fn这个函数,他用来做消息的输入处理:

(fn [task-id ^TupleImpl tuple]
                      (let [stream-id (.getSourceStreamId tuple)];;获取stream-id
                        (condp = stream-id
                          Constants/SYSTEM_TICK_STREAM_ID (.rotate pending);;如果消息来自于SYSTEM_TICK_STREAM_ID,代表接到清理rotateMap中缓存消息的信号,使用pending的rotate函数时缓存的消息过期
                          Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple);;如果消息来自于METRICS_TICK_STREAM_ID,则发送给处理统计信息的bolt
                          ;;其他的来源只会是ack或fail流
                          (let [id (.getValue tuple 0);;获取id
                                [stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)];;从rotateMap中remove这条消息
                            (when spout-id;;如果spout-id存在
                              (when-not (= stored-task-id task-id);;当当前taskId与rotateMap中储存的taskId不同时,证明接收错了,抛异常
                                (throw-runtime "Fatal error, mismatched task ids: " task-id " " stored-task-id))
                                ;;正常情况下
                              (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
                                (condp = stream-id
                                  ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
                                                                     spout-id tuple-finished-info time-delta)
                                  ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
                                                                       spout-id tuple-finished-info time-delta)
                                  )));;分别调用ack-spout-msg或fail-spout-msg处理ack或fail流
                            ;; TODO: on failure, emit tuple to failure stream
                            ))))

ack-spout-msg & fail-spout-msg

处理ack流的具体逻辑,主要是调用ISpout的ack函数,并添加相关统计信息。 处理fail流的具体逻辑,主要是调用ISpout的fail函数,并添加相关统计信息。

send-spout-msg

spout如何处理发送逻辑呢:

send-spout-msg (fn [out-stream-id values message-id out-task-id];;几个参数的含义:out-stream-id是消息的stream-id,values是消息内容,message-id是消息id,out-task-id是发送目的地的taskId
                                   (.increment emitted-count);;发送统计数+1
                                   (let [out-tasks (if out-task-id;;这个参数用来直接指明发送目的地的taskId,直接发送时会用到
                                                     (tasks-fn out-task-id out-stream-id values)
                                                     (tasks-fn out-stream-id values));;调用tasks-fn获得目标task的id
                                         rooted? (and message-id has-ackers?);;如果生成了消息id,并且conf中配置了对消息进行ack机制,则会对消息进行跟踪
                                         root-id (if rooted? (MessageId/generateId rand));;创建跟踪id
                                         out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))];;对每个发送目的地task也相应的创建跟踪id
                                     (fast-list-iter [out-task out-tasks id out-ids]
                                                     (let [tuple-id (if rooted?                                                                         (MessageId/makeRootId root-id id)                                                                (MessageId/makeUnanchored));;把root-id和目的地的root-id匹配成tuple-id          
                                                           out-tuple (TupleImpl. worker-context                                                                                     values                                                                                 task-id                                                                                      out-stream-id                                                                                     tuple-id)];;创建TupleImpl
                                                       (transfer-fn out-task
                                                                    out-tuple
                                                                    overflow-buffer);;调用transfer-fn函数进行发送
                                                       ))
                                     (if rooted?;;如果是跟踪状态
                                       (do
                                         (.put pending root-id [task-id
                                                                message-id
                                                                {:stream out-stream-id :values values}
                                                                (if (sampler) (System/currentTimeMillis))]);;放入rotateMap中等待被ack
                                         (task/send-unanchored task-data
                                                               ACKER-INIT-STREAM-ID
                                                               [root-id (bit-xor-vals out-ids) task-id]
                                                               overflow-buffer));;向ACKER-INIT-STREAM发送一条消息,用来在接收方处确认ack的来源是否正确
                                       ;;下面这个方法是对发送消息带有message-id但系统中没有ack机制的特殊处理,直接调用ack-spout-msg函数,不进行跟踪                        
                                       (when message-id
                                         (ack-spout-msg executor-data task-data message-id
                                                        {:stream out-stream-id :values values}
                                                        (if (sampler) 0))))
                                     (or out-tasks [])
                                     ))

setup-ticks!

这个函数用来定期向该executor的接收消息队列发送Tick消息,executor在收到tick消息之后,会执行发送队列的超时操作。

(defn setup-ticks! [worker executor-data]
  (let [storm-conf (:storm-conf executor-data)
    tick-time-secs (storm-conf TOPOLOGY-TICK-TUPLE-FREQ-SECS)
    receive-queue (:receive-queue executor-data)
    context (:worker-context executor-data)]
    (when tick-time-secs;;如果设置了tick-time-secs
      (if (or (system-id? (:component-id executor-data));;如果是系统级的组件
              (and (not (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
                   (= :spout (:type executor-data))));;或是spout节点没有设置超时
        (log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data));;打印日志
        (schedule-recurring;;否则定期执行向接收队列中发送tick的tuple的任务
          (:user-timer worker)
          tick-time-secs
          tick-time-secs
          (fn []
            (disruptor/publish
              receive-queue
                  [[nil (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)]]
          )))))))

接下来看看bolt类型的mk-threads,代码就不贴了,基本相同,除了在调用async-loop的工厂函数中会处理以下逻辑:

(.prepare bolt-obj
                storm-conf
                user-context
                (OutputCollector.;;创建了OutputCollector类
                 (reify IOutputCollector;;实现IOutputCollector接口
                   (emit [this stream anchors values];;emit函数
                     (bolt-emit stream anchors values nil))
                   (emitDirect [this task stream anchors values];;emitDirect函数
                     (bolt-emit stream anchors values task));;都直接调用bolt-emit进行消息发送
                   (^void ack [this ^Tuple tuple]
                     (let [^TupleImpl tuple tuple
                           ack-val (.getAckVal tuple)]
                       (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
                                      (task/send-unanchored task-data
                                                            ACKER-ACK-STREAM-ID
                                                            [root (bit-xor id ack-val)]);;使用task.clj中的send-unanchored想acker bolt发送消息
                                      ))
                     (let [delta (tuple-time-delta! tuple)]
                       (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta));;用户的扩展
                       (when delta
                         (builtin-metrics/bolt-acked-tuple! (:builtin-metrics task-data)
                                                            executor-stats
                                                            (.getSourceComponent tuple)                                                      
                                                            (.getSourceStreamId tuple)
                                                            delta)
                         (stats/bolt-acked-tuple! executor-stats
                                                  (.getSourceComponent tuple)
                                                  (.getSourceStreamId tuple)
                                                  delta))))
                   (^void fail [this ^Tuple tuple]
                     (fast-list-iter [root (.. tuple getMessageId getAnchors)]
                                     (task/send-unanchored task-data
                                                           ACKER-FAIL-STREAM-ID
                                                           [root]));;使用task.clj中的send-unanchored想acker bolt发送消息
                     (let [delta (tuple-time-delta! tuple)]
                       (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta));;用户的扩展
                       (when delta
                         (builtin-metrics/bolt-failed-tuple! (:builtin-metrics task-data)
                                                             executor-stats
                                                             (.getSourceComponent tuple)                                                      
                                                             (.getSourceStreamId tuple))
                         (stats/bolt-failed-tuple! executor-stats
                                                   (.getSourceComponent tuple)
                                                   (.getSourceStreamId tuple)
                                                   delta))))
                   (reportError [this error]
                     (report-error error)
                     )))))

在mk-threads的最后:

 (let [receive-queue (:receive-queue executor-data)
          event-handler (mk-task-receiver executor-data tuple-action-fn)]
      (disruptor/consumer-started! receive-queue);;开启接收
      (fn []            
        (disruptor/consume-batch-when-available receive-queue event-handler);;用event-handler接收消息
        0)));;自旋等待时长为0

消息的发送会经过bolt-emit函数:

bolt-emit (fn [stream anchors values task]
                              (let [out-tasks (if task
                                                (tasks-fn task stream values)
                                                (tasks-fn stream values))];;获得消息目的地的taskId
                                (fast-list-iter [t out-tasks]
                                                (let [anchors-to-ids (HashMap.)]
                                                  (fast-list-iter [^TupleImpl a anchors]
                                                                  (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
                                                                    (when (pos? (count root-ids))
                                                                      (let [edge-id (MessageId/generateId rand)]
                                                                        (.updateAckVal a edge-id)
                                                                        (fast-list-iter [root-id root-ids]
                                                                                        (put-xor! anchors-to-ids root-id edge-id))
                                                                        ))));;处理跟踪逻辑
                                                  (transfer-fn t
                                                               (TupleImpl. worker-context
                                                                           values
                                                                           task-id
                                                                           stream
                                                                           (MessageId/makeId anchors-to-ids)))));;调用transfer-fn发送消息
                                (or out-tasks [])))