Worker
2014-12-05
每一个worker都是一个进程,他的启动和停止由supervisor控制。
worker的元数据如下:
(defn worker-data [conf mq-context storm-id assignment-id port worker-id]
(let [cluster-state (cluster/mk-distributed-cluster-state conf);;创建和zk的交互类
storm-cluster-state (cluster/mk-storm-cluster-state cluster-state);;创建storm-cluster-state
storm-conf (read-supervisor-storm-conf conf storm-id);;读取配置信息
executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port));;通过zk获取executor和worker的对应关系
transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY));;worker的消息发送队列
executor-receive-queue-map (mk-receive-queue-map storm-conf executors);;worker的消息接受队列结合,每一个executor对应一个队列
receive-queue-map (->> executor-receive-queue-map
(mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue])))
(into {}));;获得task对queue的映射
topology (read-supervisor-topology conf storm-id)];;topology
(recursive-map
:conf conf
:mq-context (if mq-context
mq-context
(TransportFactory/makeContext storm-conf))
:storm-id storm-id
:assignment-id assignment-id
:port port
:worker-id worker-id
:cluster-state cluster-state
:storm-cluster-state storm-cluster-state
:storm-active-atom (atom false)
:executors executors
:task-ids (->> receive-queue-map keys (map int) sort)
:storm-conf storm-conf
:topology topology
:system-topology (system-topology! storm-conf topology)
:heartbeat-timer (mk-halting-timer "heartbeat-timer");;心跳用的timer
:refresh-connections-timer (mk-halting-timer "refresh-connections-timer");;链接维护timer
:refresh-active-timer (mk-halting-timer "refresh-active-timer");;获取topology状态的timer
:executor-heartbeat-timer (mk-halting-timer "executor-heartbeat-timer");;executor心跳timer
:user-timer (mk-halting-timer "user-timer");;executor内部的timer
:task->component (HashMap. (storm-task-info topology storm-conf)) ; for optimized access when used in tasks later on
:component->stream->fields (component->stream->fields (:system-topology <>))
:component->sorted-tasks (->> (:task->component <>) reverse-map (map-val sort))
:endpoint-socket-lock (mk-rw-lock)
:cached-node+port->socket (atom {})
:cached-task->node+port (atom {})
:transfer-queue transfer-queue
:executor-receive-queue-map executor-receive-queue-map
:short-executor-receive-queue-map (map-key first executor-receive-queue-map);;存储executor中第一个task的taskId到该executor对应的接收队列的映射关系
:task->short-executor (->> executors
(mapcat (fn [e] (for [t (executor-id->tasks e)] [t (first e)])))
(into {})
(HashMap.))
:suicide-fn (mk-suicide-fn conf)
:uptime (uptime-computer)
:default-shared-resources (mk-default-resources <>)
:user-shared-resources (mk-user-resources <>)
:transfer-local-fn (mk-transfer-local-fn <>)
:receiver-thread-count (get storm-conf WORKER-RECEIVER-THREAD-COUNT)
:transfer-fn (mk-transfer-fn <>)
)))
worker启动的main方法如下:
(defn -main [storm-id assignment-id port-str worker-id];;参数分别为topology-id,supervisor-id,port和worker-id
(let [conf (read-storm-config)]
(validate-distributed-mode! conf)
(mk-worker conf nil storm-id assignment-id (Integer/parseInt port-str) worker-id)))
经过加载配置,验证distributed模式之后使用mk-worker函数启动worker:
(defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id]
(log-message "Launching worker for " storm-id " on " assignment-id ":" port " with id " worker-id
" and conf " conf)
(if-not (local-mode? conf)
(redirect-stdio-to-slf4j!));;如果是分布式系统,会把打印到控制台的信息打到日志中
;; because in local mode, its not a separate
;; process. supervisor will register it in this case
(when (= :distributed (cluster-mode conf))
(touch (worker-pid-path conf worker-id (process-pid))));;如果是分布式,要创建保存pid的文件
(let [worker (worker-data conf shared-mq-context storm-id assignment-id port worker-id);;构建worker-data元数据
heartbeat-fn #(do-heartbeat worker);;定义了一个函数,会构建WorkerHeartBeat,储存心跳信息到localState,并做清理
;; do this here so that the worker process dies if this fails
;; it's important that worker heartbeat to supervisor ASAP when launching so that the supervisor knows it's running (and can move on)
_ (heartbeat-fn);;执行一次心跳
executors (atom nil);;创建executors对象
;; launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
;; to the supervisor
_ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn);;定时执行worker心跳
_ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors));;定时执行executor心跳
refresh-connections (mk-refresh-connections worker)
_ (refresh-connections nil);;启动queue链接维护定时任务
_ (refresh-storm-active worker nil);;启动获取topology状态的定时任务
_ (reset! executors (dofor [e (:executors worker)] (executor/mk-executor worker e)));;创建executor并保存在executors中
receive-thread-shutdown (launch-receive-thread worker);;启动消息接收线程
transfer-tuples (mk-transfer-tuples-handler worker)
transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples);;启动queue的发送线程
shutdown* ...;;关闭方法
ret (reify
Shutdownable
(shutdown
[this]
(shutdown*));;实现了Shutdownable接口,在关闭时会执行shutdown*方法
DaemonCommon
(waiting? [this]
(and
(timer-waiting? (:heartbeat-timer worker))
(timer-waiting? (:refresh-connections-timer worker))
(timer-waiting? (:refresh-active-timer worker))
(timer-waiting? (:executor-heartbeat-timer worker))
(timer-waiting? (:user-timer worker))
));;实现了DaemonCommon接口,会等待几个定时任务执行完毕
)]
(schedule-recurring (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections);;定时执行链接维护的任务
(schedule-recurring (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker));;定时执行获取topology状态的任务
(log-message "Worker has topology config " (:storm-conf worker))
(log-message "Worker " worker-id " for storm " storm-id " on " assignment-id ":" port " has finished loading")
ret;;返回了实现了Shutdownable和DaemonCommon的对象
))
来具体看看以下几个方法如何实现的:
mk-refresh-connections
这个方法用来维护与通信框架的链接(ZMQ或NETTY)
(defn mk-refresh-connections [worker]
(let [outbound-tasks (worker-outbound-tasks worker);;获取worker中的task
conf (:conf worker)
storm-cluster-state (:storm-cluster-state worker)
storm-id (:storm-id worker)]
(fn this;;创建一个函数名为this
([];;无参的调用方式会调用有参的函数重载,并创建一个用来定时执行refresh-connections任务的函数作为callback
(this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this))))
([callback];;有参的重载
(let [assignment (.assignment-info storm-cluster-state storm-id callback);;从zk上获得assignment信息,并把callback作为zk的回调
my-assignment (-> assignment
:executor->node+port
to-task->node+port
(select-keys outbound-tasks)
(#(map-val endpoint->string %)));;获得接受该worker消息的taskId对node+port的映射
;; we dont need a connection for the local tasks anymore
needed-assignment (->> my-assignment
(filter-key (complement (-> worker :task-ids set))));;通过过滤移除所有和当前work关联的task
needed-connections (-> needed-assignment vals set);;目标worker的集合
needed-tasks (-> needed-assignment keys);;目标worker的taskid的集合
current-connections (set (keys @(:cached-node+port->socket worker)));;获取当前链接中所有node+port的列表
new-connections (set/difference needed-connections current-connections);;判断哪些需要链接
remove-connections (set/difference current-connections needed-connections)];;判断哪些需要关闭
(swap! (:cached-node+port->socket worker)
#(HashMap. (merge (into {} %1) %2))
(into {}
(dofor [endpoint-str new-connections
:let [[node port] (string->endpoint endpoint-str)]]
[endpoint-str
(.connect
^IContext (:mq-context worker)
storm-id
((:node->host assignment) node)
port)
]
)));;调用IContext的connect方法对new-connections中的每一个node+port创建新的链接并加入到cached-node+port->socket中
(write-locked (:endpoint-socket-lock worker)
(reset! (:cached-task->node+port worker)
(HashMap. my-assignment)));;把cached-task->node+port更新为my-assignment
(doseq [endpoint remove-connections]
(.close (get @(:cached-node+port->socket worker) endpoint)));;需要关闭的链接进行关闭
(apply swap!
(:cached-node+port->socket worker)
#(HashMap. (apply dissoc (into {} %1) %&))
remove-connections);;从cached-node+port->socket中移除关闭了的链接
(let [missing-tasks (->> needed-tasks
(filter (complement my-assignment)))]
(when-not (empty? missing-tasks)
(log-warn "Missing assignment for following tasks: " (pr-str missing-tasks));;查找这次操作遗漏的task,打出日志,之后会在下次定时任务执行时执行
)))))))
refresh-storm-active
获取topology的最新状态。定时任务来获取storm-base中topology的信息,并保存在storm-active-atom中
(defn refresh-storm-active
([worker]
(refresh-storm-active worker (fn [& ignored] (schedule (:refresh-active-timer worker) 0 (partial refresh-storm-active worker)))))
([worker callback]
(let [base (.storm-base (:storm-cluster-state worker) (:storm-id worker) callback)]
(reset!
(:storm-active-atom worker)
(= :active (-> base :status :type))
))
))
与通信相关的几个重要函数:
mk-transfer-local-fn
这个函数在创建worker-data时使用,返回的结果被赋值到transfer-local-fn上。这个函数用来产生并发送消息到executor的接收队列。同一worker内部的executor会通过这个函数来传递消息。
(defn mk-transfer-local-fn [worker]
(let [short-executor-receive-queue-map (:short-executor-receive-queue-map worker);;存储executor中第一个task的taskId到该executor对应的接收队列的映射关系
task->short-executor (:task->short-executor worker);;存储从该worker总的taskId到executor中第一个task的taskId的映射关系
task-getter (comp #(get task->short-executor %) fast-first)];;task-getter通过comp合并了后两个函数,返回传入消息的第一个元素(即taskId,第二个元素是消息体),然后从task->short-executor中去除executor中第一个task的taskId
(fn [tuple-batch];;接下来定义返回的函数体
(let [grouped (fast-group-by task-getter tuple-batch)];;把接收到的一组消息tuple-batch按照taskId到executor中第一个task的taskId的规则进行分组
(fast-map-iter [[short-executor pairs] grouped]
(let [q (short-executor-receive-queue-map short-executor)];;获取每个executor对应的queue
(if q;;如果队列不为空
(disruptor/publish q pairs);;接下来把收到的消息发到每一个队列中
(log-warn "Received invalid messages for unknown tasks. Dropping... ")
)))))))
mk-transfer-fn
在创建worker-data时还调用了mk-transfer-fn函数,他与mk-transfer-local-fn类似,他生产的函数用于executor的数据发送。这个函数分为两种情况
- worker内的executor消息发送,不需要跨进程发送
不同worker的executor消息发送,需要跨进程发送
(defn mk-transfer-fn [worker] (let [local-tasks (-> worker :task-ids set);;获取本地taskId local-transfer (:transfer-local-fn worker);;获得本地消息传输消息 ^DisruptorQueue transfer-queue (:transfer-queue worker);;worker对应的消息发送队列 task->node+port (:cached-task->node+port worker)];;worker缓存的task对node+port的映射关系 (fn [^KryoTupleSerializer serializer tuple-batch];;方法体 (let [local (ArrayList.);;定义一个ArrayList用来缓存消息 remoteMap (HashMap.)] (fast-list-iter [[task tuple :as pair] tuple-batch];;遍历接收到的消息 (if (local-tasks task);;如果task为当前worker的task (.add local pair);;则把消息放入到local中 ;;Using java objects directly to avoid performance issues in java code (let [node+port (get @task->node+port task)] (when (not (.get remoteMap node+port)) (.put remoteMap node+port (ArrayList.))) (let [remote (.get remoteMap node+port)] (.add remote (TaskMessage. task (.serialize serializer tuple))) ))));;把消息根据当前worker中的taskId进行分组,存放在remoteMap中 (local-transfer local);;调用本地消息传输的函数来传输消息 (disruptor/publish transfer-queue remoteMap);;把分好组的消息发送到 transfer-queue中 ))))
mk-transfer-tuples-handler
mk-transfer-tuples-handler函数处理了消息是如何从 transfer-queue 接收的:
(defn mk-transfer-tuples-handler [worker]
(let [^DisruptorQueue transfer-queue (:transfer-queue worker);;获取接收队列
drainer (TransferDrainer.);;构造一个容器用来缓存发送的消息
node+port->socket (:cached-node+port->socket worker);;获得node+port对socket的映射
task->node+port (:cached-task->node+port worker);;获得task对node+port的映射
endpoint-socket-lock (:endpoint-socket-lock worker);;获得readwrite锁
]
(disruptor/clojure-handler;;定义了一个clojure-handler
(fn [packets _ batch-end?];;packets代表这一组消息,batche-end?代表该batch是否结束
(.add drainer packets);;把packets放入drainer容器中
(when batch-end?;;如果batch结束了
(read-locked endpoint-socket-lock;;加读锁
(let [node+port->socket @node+port->socket]
(.send drainer node+port->socket)));;通过drainer的send方法发送一批消息到socket
(.clear drainer))))));;清空drainer容器
这个函数会被定时调用:
transfer-tuples (mk-transfer-tuples-handler worker)
transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples)
shutdown*
shutdown*函数定义了worker在终结自己时要执行的操作:
(fn []
(log-message "Shutting down worker " storm-id " " assignment-id " " port)
(doseq [[_ socket] @(:cached-node+port->socket worker)];;遍历socket
;; this will do best effort flushing since the linger period
;; was set on creation
(.close socket));;逐一关闭
(log-message "Shutting down receive thread")
(receive-thread-shutdown);;关闭消息接收线程
(log-message "Shut down receive thread")
(log-message "Terminating messaging context")
(log-message "Shutting down executors")
(doseq [executor @executors] (.shutdown executor));;关闭executor
(log-message "Shut down executors")
;;this is fine because the only time this is shared is when it's a local context,
;;in which case it's a noop
(.term ^IContext (:mq-context worker));;IContext调用term方法关闭链接
(log-message "Shutting down transfer thread")
(disruptor/halt-with-interrupt! (:transfer-queue worker));;disruptor关闭
(.interrupt transfer-thread);;关闭消息发送线程
(.join transfer-thread);;等待关闭结束
(log-message "Shut down transfer thread")
(cancel-timer (:heartbeat-timer worker))
(cancel-timer (:refresh-connections-timer worker))
(cancel-timer (:refresh-active-timer worker))
(cancel-timer (:executor-heartbeat-timer worker))
(cancel-timer (:user-timer worker));;停止所有定时任务
(close-resources worker);;关闭worker的线程池资源
;; TODO: here need to invoke the "shutdown" method of WorkerHook
(.remove-worker-heartbeat! (:storm-cluster-state worker) storm-id assignment-id port);;从zk上移除当前worker的心跳信息
(log-message "Disconnecting from storm cluster state context")
(.disconnect (:storm-cluster-state worker));;与zk断开链接
(.close (:cluster-state worker))
(log-message "Shut down worker " storm-id " " assignment-id " " port))