Executor
2014-12-06
在创建worker的时候会从worker中获取所有executor,逐一创建。
(reset! executors (dofor [e (:executors worker)] (executor/mk-executor worker e)))
来看看executor.clj中是如何创建executor的,首先需要定义executor的元数据:
(defn mk-executor-data [worker executor-id]
(let [worker-context (worker-context worker);;worker的上下文WorkerTopologyContext
task-ids (executor-id->tasks executor-id);;当前executor对应的taskId
component-id (.getComponentId worker-context (first task-ids));;获取第一个task对应的component-id
storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id);;合并配置项
executor-type (executor-type worker-context component-id);;获取executor-typeh
batch-transfer->worker (disruptor/disruptor-queue
(str "executor" executor-id "-send-queue")
(storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
:claim-strategy :single-threaded
:wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY));;创建executor发送队列
]
(recursive-map
:worker worker
:worker-context worker-context
:executor-id executor-id
:task-ids task-ids
:component-id component-id
:open-or-prepare-was-called? (atom false)
:storm-conf storm-conf
:receive-queue ((:executor-receive-queue-map worker) executor-id)
:storm-id (:storm-id worker)
:conf (:conf worker)
:shared-executor-data (HashMap.)
:storm-active-atom (:storm-active-atom worker)
:batch-transfer-queue batch-transfer->worker
:transfer-fn (mk-executor-transfer-fn batch-transfer->worker);;executor中消息发送的函数
:suicide-fn (:suicide-fn worker)
:storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker))
:type executor-type
;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field)
:stats (mk-executor-stats <> (sampling-rate storm-conf));;executor的运行统计信息
:interval->task->metric-registry (HashMap.)
:task->component (:task->component worker)
:stream->component->grouper (outbound-components worker-context component-id);;从流到接收组件及分组函数的映射
:report-error (throttled-report-error-fn <>);;报告错误的函数
:report-error-and-die (fn [error]
((:report-error <>) error)
((:suicide-fn <>)));;报告错误然后杀死自己的函数
:deserializer (KryoTupleDeserializer. storm-conf worker-context);;反序列化器
:sampler (mk-stats-sampler storm-conf);;运行统计采样率
;; TODO: add in the executor-specific stuff in a :specific... or make a spout-data, bolt-data function?
)))
接下来看看创建executor的过程
(defn mk-executor [worker executor-id]
(let [executor-data (mk-executor-data worker executor-id);;创建executor元数据
_ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id))
task-datas (->> executor-data
:task-ids
(map (fn [t] [t (task/mk-task executor-data t)]))
(into {})
(HashMap.));;从元数据中获取taskId,并通过mk-task函数创建task
_ (log-message "Loaded executor tasks " (:component-id executor-data) ":" (pr-str executor-id))
report-error-and-die (:report-error-and-die executor-data)
component-id (:component-id executor-data)
;; starting the batch-transfer->worker ensures that anything publishing to that queue
;; doesn't block (because it's a single threaded queue and the caching/consumer started
;; trick isn't thread-safe)
system-threads [(start-batch-transfer->worker-handler! worker executor-data)];;启动executor数据发送线程
handlers (with-error-reaction report-error-and-die
(mk-threads executor-data task-datas));;创建spout或bolt的主循环线程,并用report-error-and-die处理异常
threads (concat handlers system-threads)];;获得所有的线程
(setup-ticks! worker executor-data);;安装ticks
(log-message "Finished loading executor " component-id ":" (pr-str executor-id))
;; TODO: add method here to get rendered stats... have worker call that when heartbeating
(reify
RunningExecutor;;实现RunningExecutor接口,用来手机executor的运行统计
(render-stats [this]
(stats/render-stats! (:stats executor-data)))
(get-executor-id [this]
executor-id )
Shutdownable;;实现Shutdownable接口,在关闭时会做以下处理
(shutdown
[this]
(log-message "Shutting down executor " component-id ":" (pr-str executor-id))
(disruptor/halt-with-interrupt! (:receive-queue executor-data));;关闭receive-queue
(disruptor/halt-with-interrupt! (:batch-transfer-queue executor-data));;关闭batch-transfer-queue
(doseq [t threads]
(.interrupt t)
(.join t));;中断所有工作线程,并等待结束
(doseq [user-context (map :user-context (vals task-datas))]
(doseq [hook (.getHooks user-context)]
(.cleanup hook)));;清理用户的关闭钩子
(.disconnect (:storm-cluster-state executor-data));;断开与zk的链接
(when @(:open-or-prepare-was-called? executor-data)
(doseq [obj (map :object (vals task-datas))]
(close-component executor-data obj)));;调用spout或bolt的close方法
(log-message "Shut down executor " component-id ":" (pr-str executor-id)))
)))
start-batch-transfer->worker-handler!
executor如何启动数据发送线程的:
(defn start-batch-transfer->worker-handler! [worker executor-data]
(let [worker-transfer-fn (:transfer-fn worker);;获得worker的transfer函数
cached-emit (MutableObject. (ArrayList.))
storm-conf (:storm-conf executor-data)
serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data))
]
(disruptor/consume-loop*
(:batch-transfer-queue executor-data);;接收消息的queue
(disruptor/handler [o seq-id batch-end?];;创建消息处理handler
(let [^ArrayList alist (.getObject cached-emit)]
(.add alist o)
(when batch-end?
(worker-transfer-fn serializer alist);;调用worker-transfer-fn函数把对消息进行处理
(.setObject cached-emit (ArrayList.));;重置cached-emit
)))
:kill-fn (:report-error-and-die executor-data))));;设置report-error-and-die为异常处理函数
mk-threads
因为executor分为两种spout和bolt,所以在创建工作线程时,也分为两种类型。
首先来看spout类型:
(defmethod mk-threads :spout [executor-data task-datas]
(let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler open-or-prepare-was-called?]} executor-data;;mk-threads函数将使用的executor-data中的数据
^ISpoutWaitStrategy spout-wait-strategy (init-spout-wait-strategy storm-conf);;创建并prepare ISpoutWaitStrategy,它用来在nextTuple调用多次没有输出的时候休眠一段时间
max-spout-pending (executor-max-spout-pending storm-conf (count task-datas));;结果和spout中的task数量有关,如果超过max-spout-pending的消息没有被ack或fail,则spout会进入等待状态,并利用这个参数进行流量控制
^Integer max-spout-pending (if max-spout-pending (int max-spout-pending))
last-active (atom false);;标示spout的状态,这个状态与topology的状态一致
spouts (ArrayList. (map :object (vals task-datas)));;每个task是一个spout,组成集合
rand (Random. (Utils/secureRandomLong))
pending (RotatingMap.
2 ;; microoptimize for performance of .size method
(reify RotatingMap$ExpiredCallback
(expire [this msg-id [task-id spout-id tuple-info start-time-ms]]
(let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
(fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta)
))));;pending用来储存发出去但是还没有ack或fail的消息,如果超时会触发这里的expire函数,调用fail-spout-msg
tuple-action-fn ;;用来处理spout收到的消息
receive-queue (:receive-queue executor-data)
event-handler (mk-task-receiver executor-data tuple-action-fn);;创建一个handler用tuple-action-fn来处理tuple
has-ackers? (has-ackers? storm-conf)
emitted-count fail-spout-msg(MutableLong. 0)
empty-emit-streak (MutableLong. 0)
;; the overflow buffer is used to ensure that spouts never block when emitting
;; this ensures that the spout can always clear the incoming buffer (acks and fails), which
;; prevents deadlock from occuring across the topology (e.g. Spout -> Bolt -> Acker -> Spout, and all
;; buffers filled up)
;; when the overflow buffer is full, spouts stop calling nextTuple until it's able to clear the overflow buffer
;; this limits the size of the overflow buffer to however many tuples a spout emits in one call of nextTuple,
;; preventing memory issues
overflow-buffer (LinkedList.)];;当发送队列被填满时,会把消息存在 overflow-buffer 中,而不是阻塞发消息
[(async-loop;;因为async-loop被设置为工厂模式,所以以下函数被用来生成开启新线程自旋执行用的函数
(fn []
;; If topology was started in inactive state, don't call (.open spout) until it's activated first.
(while (not @(:storm-active-atom executor-data))
(Thread/sleep 100));;等待,直到topology进入活跃状态
(log-message "Opening spout " component-id ":" (keys task-datas))
(doseq [[task-id task-data] task-datas;;遍历每一个task
:let [^ISpout spout-obj (:object task-data);;获得ISpout spout-obj
tasks-fn (:tasks-fn task-data);;获得tasks-fn函数,这个函数在task中定义
send-spout-msg ;;spout发送消息的函数,下文会具体分析]]
(builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data))
(builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data);;注册metrics
:receive receive-queue}
storm-conf (:user-context task-data))
(.open spout-obj;;调用spout-obj的open方法
storm-conf;;传入storm-conf
(:user-context task-data);;传入TopologyContext
(SpoutOutputCollector.;;构建一个SpoutOutputCollector
(reify ISpoutOutputCollector;;并实现ISpoutOutputCollector接口
(^List emit [this ^String stream-id ^List tuple ^Object message-id]
(fail-spout-msgsend-spout-msg stream-id tuple message-id nil)
);;emit函数会调用send-spout-msg发送数据
(^void emitDirect [this ^int out-task-id ^String stream-id
^List tuple ^Object message-id]
(send-spout-msg stream-id tuple message-id out-task-id)
);;emitDirect函数会调用send-spout-msg发送数据
(reportError [this error]async
(report-error error)
)))));;reportError函数会调用report-error发送数据
(reset! open-or-prepare-was-called? true) ;;设置open的状态
(log-message "Opened spout " component-id ":" (keys task-datas))
(setup-metrics! executor-data);;初始化metrics
(disruptor/consumer-started! (:receive-queue executor-data));;打开接收消息队列
(fn [];;这个函数是真正用来自旋执行的
;; This design requires that spouts be non-blocking
(disruptor/consume-batch receive-queue event-handler);;用非阻塞的方式接收receive-queue里的消息并进行处理
;; try to clear the overflow-buffer
(try-cause
(while (not (.isEmpty overflow-buffer))
(let [[out-task out-tuple] (.peek overflow-buffer)]
(transfer-fn out-task out-tuple false nil)
(.removeFirst overflow-buffer)))
(catch InsufficientCapacityException e
));;优先使用transfer-fn发送 overflow-buffer的全部数据
(let [active? @(:storm-active-atom executor-data)
curr-count (.get emitted-count)]
(if (and (.isEmpty overflow-buffer);;如果overflow-buffer为空
(or (not max-spout-pending);;并且未设置max-spout-pending
(< (.size pending) max-spout-pending)));;或pending的量小于max-spout-pending
;;则进行以下逻辑
(if active?;;如果topology处于激活状态
(do
(when-not @last-active;;如果@last-active为false
(reset! last-active true);;重置为true
(log-message "Activating spout " component-id ":" (keys task-datas))
(fast-list-iter [^ISpout spout spouts] (.activate spout)));;依次触发spout的activate方法方法
(fast-list-iter [^ISpout spout spouts] (.nextTuple spout)));;依次调用nextTuple方法,nextTuple方法的实现中会调用emit或emitDirect函数,最终调用的是send-spout-msg函数
(do
(when @last-active;;如果@last-active为true时
(reset! last-active false);;重设为false
(log-message "Deactivating spout " component-id ":" (keys task-datas))
(fast-list-iter [^ISpout spout spouts] (.deactivate spout)));;依次调用ISpout的deactivate方法
;; TODO: log that it's getting throttled
(Time/sleep 100))));;休眠100ms
(if (and (= curr-count (.get emitted-count)) active?);;如果curr-count与emitted-count相等,则证明消息还没发出去
(do (.increment empty-emit-streak)
(.emptyEmit spout-wait-strategy (.get empty-emit-streak)));;会使用spout-wait-strategy来进行休眠,休眠时间逐渐增长
(.set empty-emit-streak 0);;发送成功后,休眠时间归零
))
0));;这里返回0表示自旋时的时间间隔为0
:kill-fn (:report-error-and-die executor-data)
:factory? true
:thread-name component-id)]))
tuple-action-fn
来看一些实现细节,比如tuple-action-fn这个函数,他用来做消息的输入处理:
(fn [task-id ^TupleImpl tuple]
(let [stream-id (.getSourceStreamId tuple)];;获取stream-id
(condp = stream-id
Constants/SYSTEM_TICK_STREAM_ID (.rotate pending);;如果消息来自于SYSTEM_TICK_STREAM_ID,代表接到清理rotateMap中缓存消息的信号,使用pending的rotate函数时缓存的消息过期
Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple);;如果消息来自于METRICS_TICK_STREAM_ID,则发送给处理统计信息的bolt
;;其他的来源只会是ack或fail流
(let [id (.getValue tuple 0);;获取id
[stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)];;从rotateMap中remove这条消息
(when spout-id;;如果spout-id存在
(when-not (= stored-task-id task-id);;当当前taskId与rotateMap中储存的taskId不同时,证明接收错了,抛异常
(throw-runtime "Fatal error, mismatched task ids: " task-id " " stored-task-id))
;;正常情况下
(let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
(condp = stream-id
ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
spout-id tuple-finished-info time-delta)
ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
spout-id tuple-finished-info time-delta)
)));;分别调用ack-spout-msg或fail-spout-msg处理ack或fail流
;; TODO: on failure, emit tuple to failure stream
))))
ack-spout-msg & fail-spout-msg
处理ack流的具体逻辑,主要是调用ISpout的ack函数,并添加相关统计信息。 处理fail流的具体逻辑,主要是调用ISpout的fail函数,并添加相关统计信息。
send-spout-msg
spout如何处理发送逻辑呢:
send-spout-msg (fn [out-stream-id values message-id out-task-id];;几个参数的含义:out-stream-id是消息的stream-id,values是消息内容,message-id是消息id,out-task-id是发送目的地的taskId
(.increment emitted-count);;发送统计数+1
(let [out-tasks (if out-task-id;;这个参数用来直接指明发送目的地的taskId,直接发送时会用到
(tasks-fn out-task-id out-stream-id values)
(tasks-fn out-stream-id values));;调用tasks-fn获得目标task的id
rooted? (and message-id has-ackers?);;如果生成了消息id,并且conf中配置了对消息进行ack机制,则会对消息进行跟踪
root-id (if rooted? (MessageId/generateId rand));;创建跟踪id
out-ids (fast-list-for [t out-tasks] (if rooted? (MessageId/generateId rand)))];;对每个发送目的地task也相应的创建跟踪id
(fast-list-iter [out-task out-tasks id out-ids]
(let [tuple-id (if rooted? (MessageId/makeRootId root-id id) (MessageId/makeUnanchored));;把root-id和目的地的root-id匹配成tuple-id
out-tuple (TupleImpl. worker-context values task-id out-stream-id tuple-id)];;创建TupleImpl
(transfer-fn out-task
out-tuple
overflow-buffer);;调用transfer-fn函数进行发送
))
(if rooted?;;如果是跟踪状态
(do
(.put pending root-id [task-id
message-id
{:stream out-stream-id :values values}
(if (sampler) (System/currentTimeMillis))]);;放入rotateMap中等待被ack
(task/send-unanchored task-data
ACKER-INIT-STREAM-ID
[root-id (bit-xor-vals out-ids) task-id]
overflow-buffer));;向ACKER-INIT-STREAM发送一条消息,用来在接收方处确认ack的来源是否正确
;;下面这个方法是对发送消息带有message-id但系统中没有ack机制的特殊处理,直接调用ack-spout-msg函数,不进行跟踪
(when message-id
(ack-spout-msg executor-data task-data message-id
{:stream out-stream-id :values values}
(if (sampler) 0))))
(or out-tasks [])
))
setup-ticks!
这个函数用来定期向该executor的接收消息队列发送Tick消息,executor在收到tick消息之后,会执行发送队列的超时操作。
(defn setup-ticks! [worker executor-data]
(let [storm-conf (:storm-conf executor-data)
tick-time-secs (storm-conf TOPOLOGY-TICK-TUPLE-FREQ-SECS)
receive-queue (:receive-queue executor-data)
context (:worker-context executor-data)]
(when tick-time-secs;;如果设置了tick-time-secs
(if (or (system-id? (:component-id executor-data));;如果是系统级的组件
(and (not (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
(= :spout (:type executor-data))));;或是spout节点没有设置超时
(log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data));;打印日志
(schedule-recurring;;否则定期执行向接收队列中发送tick的tuple的任务
(:user-timer worker)
tick-time-secs
tick-time-secs
(fn []
(disruptor/publish
receive-queue
[[nil (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)]]
)))))))
接下来看看bolt类型的mk-threads,代码就不贴了,基本相同,除了在调用async-loop的工厂函数中会处理以下逻辑:
(.prepare bolt-obj
storm-conf
user-context
(OutputCollector.;;创建了OutputCollector类
(reify IOutputCollector;;实现IOutputCollector接口
(emit [this stream anchors values];;emit函数
(bolt-emit stream anchors values nil))
(emitDirect [this task stream anchors values];;emitDirect函数
(bolt-emit stream anchors values task));;都直接调用bolt-emit进行消息发送
(^void ack [this ^Tuple tuple]
(let [^TupleImpl tuple tuple
ack-val (.getAckVal tuple)]
(fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
(task/send-unanchored task-data
ACKER-ACK-STREAM-ID
[root (bit-xor id ack-val)]);;使用task.clj中的send-unanchored想acker bolt发送消息
))
(let [delta (tuple-time-delta! tuple)]
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta));;用户的扩展
(when delta
(builtin-metrics/bolt-acked-tuple! (:builtin-metrics task-data)
executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta)
(stats/bolt-acked-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta))))
(^void fail [this ^Tuple tuple]
(fast-list-iter [root (.. tuple getMessageId getAnchors)]
(task/send-unanchored task-data
ACKER-FAIL-STREAM-ID
[root]));;使用task.clj中的send-unanchored想acker bolt发送消息
(let [delta (tuple-time-delta! tuple)]
(task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta));;用户的扩展
(when delta
(builtin-metrics/bolt-failed-tuple! (:builtin-metrics task-data)
executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple))
(stats/bolt-failed-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta))))
(reportError [this error]
(report-error error)
)))))
在mk-threads的最后:
(let [receive-queue (:receive-queue executor-data)
event-handler (mk-task-receiver executor-data tuple-action-fn)]
(disruptor/consumer-started! receive-queue);;开启接收
(fn []
(disruptor/consume-batch-when-available receive-queue event-handler);;用event-handler接收消息
0)));;自旋等待时长为0
消息的发送会经过bolt-emit函数:
bolt-emit (fn [stream anchors values task]
(let [out-tasks (if task
(tasks-fn task stream values)
(tasks-fn stream values))];;获得消息目的地的taskId
(fast-list-iter [t out-tasks]
(let [anchors-to-ids (HashMap.)]
(fast-list-iter [^TupleImpl a anchors]
(let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
(when (pos? (count root-ids))
(let [edge-id (MessageId/generateId rand)]
(.updateAckVal a edge-id)
(fast-list-iter [root-id root-ids]
(put-xor! anchors-to-ids root-id edge-id))
))));;处理跟踪逻辑
(transfer-fn t
(TupleImpl. worker-context
values
task-id
stream
(MessageId/makeId anchors-to-ids)))));;调用transfer-fn发送消息
(or out-tasks [])))