bigbully +

Worker

每一个worker都是一个进程,他的启动和停止由supervisor控制。

worker的元数据如下:

(defn worker-data [conf mq-context storm-id assignment-id port worker-id]
  (let [cluster-state (cluster/mk-distributed-cluster-state conf);;创建和zk的交互类
    storm-cluster-state (cluster/mk-storm-cluster-state cluster-state);;创建storm-cluster-state
    storm-conf (read-supervisor-storm-conf conf storm-id);;读取配置信息
    executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port));;通过zk获取executor和worker的对应关系
    transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY));;worker的消息发送队列
    executor-receive-queue-map (mk-receive-queue-map storm-conf executors);;worker的消息接受队列结合,每一个executor对应一个队列        
    receive-queue-map (->> executor-receive-queue-map
                           (mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue])))
                           (into {}));;获得task对queue的映射

    topology (read-supervisor-topology conf storm-id)];;topology
(recursive-map
  :conf conf
  :mq-context (if mq-context
                  mq-context
                  (TransportFactory/makeContext storm-conf))
  :storm-id storm-id
  :assignment-id assignment-id
  :port port
  :worker-id worker-id
  :cluster-state cluster-state
  :storm-cluster-state storm-cluster-state
  :storm-active-atom (atom false)
  :executors executors
  :task-ids (->> receive-queue-map keys (map int) sort)
  :storm-conf storm-conf
  :topology topology
  :system-topology (system-topology! storm-conf topology)
  :heartbeat-timer (mk-halting-timer "heartbeat-timer");;心跳用的timer
  :refresh-connections-timer (mk-halting-timer "refresh-connections-timer");;链接维护timer
  :refresh-active-timer (mk-halting-timer "refresh-active-timer");;获取topology状态的timer
  :executor-heartbeat-timer (mk-halting-timer "executor-heartbeat-timer");;executor心跳timer
  :user-timer (mk-halting-timer "user-timer");;executor内部的timer
  :task->component (HashMap. (storm-task-info topology storm-conf)) ; for optimized access when used in tasks later on
  :component->stream->fields (component->stream->fields (:system-topology <>))
  :component->sorted-tasks (->> (:task->component <>) reverse-map (map-val sort))
  :endpoint-socket-lock (mk-rw-lock)
  :cached-node+port->socket (atom {})
  :cached-task->node+port (atom {})
  :transfer-queue transfer-queue
  :executor-receive-queue-map executor-receive-queue-map
  :short-executor-receive-queue-map (map-key first executor-receive-queue-map);;存储executor中第一个task的taskId到该executor对应的接收队列的映射关系
  :task->short-executor (->> executors
                             (mapcat (fn [e] (for [t (executor-id->tasks e)] [t (first e)])))
                             (into {})
                             (HashMap.))
  :suicide-fn (mk-suicide-fn conf)
  :uptime (uptime-computer)
  :default-shared-resources (mk-default-resources <>)
  :user-shared-resources (mk-user-resources <>)
  :transfer-local-fn (mk-transfer-local-fn <>)
  :receiver-thread-count (get storm-conf WORKER-RECEIVER-THREAD-COUNT)
  :transfer-fn (mk-transfer-fn <>)
  )))

worker启动的main方法如下:

(defn -main [storm-id assignment-id port-str worker-id];;参数分别为topology-id,supervisor-id,port和worker-id
  (let [conf (read-storm-config)]
    (validate-distributed-mode! conf)
    (mk-worker conf nil storm-id assignment-id (Integer/parseInt port-str) worker-id)))

经过加载配置,验证distributed模式之后使用mk-worker函数启动worker:

(defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id]
  (log-message "Launching worker for " storm-id " on " assignment-id ":" port " with id " worker-id
           " and conf " conf)
  (if-not (local-mode? conf)
    (redirect-stdio-to-slf4j!));;如果是分布式系统,会把打印到控制台的信息打到日志中
  ;; because in local mode, its not a separate
  ;; process. supervisor will register it in this case
  (when (= :distributed (cluster-mode conf))
    (touch (worker-pid-path conf worker-id (process-pid))));;如果是分布式,要创建保存pid的文件
      (let [worker (worker-data conf shared-mq-context storm-id assignment-id port worker-id);;构建worker-data元数据
        heartbeat-fn #(do-heartbeat worker);;定义了一个函数,会构建WorkerHeartBeat,储存心跳信息到localState,并做清理
        ;; do this here so that the worker process dies if this fails
        ;; it's important that worker heartbeat to supervisor ASAP when launching so that the supervisor knows it's running (and can move on)
        _ (heartbeat-fn);;执行一次心跳
        executors (atom nil);;创建executors对象
        ;; launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
        ;; to the supervisor
        _ (schedule-recurring (:heartbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn);;定时执行worker心跳
        _ (schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK-HEARTBEAT-FREQUENCY-SECS) #(do-executor-heartbeats worker :executors @executors));;定时执行executor心跳

        refresh-connections (mk-refresh-connections worker)

        _ (refresh-connections nil);;启动queue链接维护定时任务
        _ (refresh-storm-active worker nil);;启动获取topology状态的定时任务

        _ (reset! executors (dofor [e (:executors worker)] (executor/mk-executor worker e)));;创建executor并保存在executors中
        receive-thread-shutdown (launch-receive-thread worker);;启动消息接收线程

        transfer-tuples (mk-transfer-tuples-handler worker)
        transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples);;启动queue的发送线程                                       
        shutdown* ...;;关闭方法
    ret (reify
         Shutdownable
         (shutdown
          [this]
          (shutdown*));;实现了Shutdownable接口,在关闭时会执行shutdown*方法
         DaemonCommon
         (waiting? [this]
           (and
             (timer-waiting? (:heartbeat-timer worker))
             (timer-waiting? (:refresh-connections-timer worker))
             (timer-waiting? (:refresh-active-timer worker))
             (timer-waiting? (:executor-heartbeat-timer worker))
             (timer-waiting? (:user-timer worker))
             ));;实现了DaemonCommon接口,会等待几个定时任务执行完毕
         )]

    (schedule-recurring (:refresh-connections-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) refresh-connections);;定时执行链接维护的任务
    (schedule-recurring (:refresh-active-timer worker) 0 (conf TASK-REFRESH-POLL-SECS) (partial refresh-storm-active worker));;定时执行获取topology状态的任务

    (log-message "Worker has topology config " (:storm-conf worker))
    (log-message "Worker " worker-id " for storm " storm-id " on " assignment-id ":" port " has finished loading")
    ret;;返回了实现了Shutdownable和DaemonCommon的对象
))

来具体看看以下几个方法如何实现的:

mk-refresh-connections

这个方法用来维护与通信框架的链接(ZMQ或NETTY)

(defn mk-refresh-connections [worker]
  (let [outbound-tasks (worker-outbound-tasks worker);;获取worker中的task
    conf (:conf worker)
    storm-cluster-state (:storm-cluster-state worker)
    storm-id (:storm-id worker)]
    (fn this;;创建一个函数名为this
      ([];;无参的调用方式会调用有参的函数重载,并创建一个用来定时执行refresh-connections任务的函数作为callback
        (this (fn [& ignored] (schedule (:refresh-connections-timer worker) 0 this))))
      ([callback];;有参的重载
        (let [assignment (.assignment-info storm-cluster-state storm-id callback);;从zk上获得assignment信息,并把callback作为zk的回调
              my-assignment (-> assignment
                            :executor->node+port
                            to-task->node+port
                            (select-keys outbound-tasks)
                            (#(map-val endpoint->string %)));;获得接受该worker消息的taskId对node+port的映射
              ;; we dont need a connection for the local tasks anymore
              needed-assignment (->> my-assignment
                                  (filter-key (complement (-> worker :task-ids set))));;通过过滤移除所有和当前work关联的task
              needed-connections (-> needed-assignment vals set);;目标worker的集合
              needed-tasks (-> needed-assignment keys);;目标worker的taskid的集合

              current-connections (set (keys @(:cached-node+port->socket worker)));;获取当前链接中所有node+port的列表
              new-connections (set/difference needed-connections current-connections);;判断哪些需要链接
              remove-connections (set/difference current-connections needed-connections)];;判断哪些需要关闭
              (swap! (:cached-node+port->socket worker)
                 #(HashMap. (merge (into {} %1) %2))
                 (into {}
                   (dofor [endpoint-str new-connections
                           :let [[node port] (string->endpoint endpoint-str)]]
                     [endpoint-str
                      (.connect
                       ^IContext (:mq-context worker)
                       storm-id
                       ((:node->host assignment) node)
                       port)
                      ]
                     )));;调用IContext的connect方法对new-connections中的每一个node+port创建新的链接并加入到cached-node+port->socket中
          (write-locked (:endpoint-socket-lock worker)
            (reset! (:cached-task->node+port worker)
                    (HashMap. my-assignment)));;把cached-task->node+port更新为my-assignment
          (doseq [endpoint remove-connections]
            (.close (get @(:cached-node+port->socket worker) endpoint)));;需要关闭的链接进行关闭
          (apply swap!
                 (:cached-node+port->socket worker)
                 #(HashMap. (apply dissoc (into {} %1) %&))
                 remove-connections);;从cached-node+port->socket中移除关闭了的链接

          (let [missing-tasks (->> needed-tasks
                                   (filter (complement my-assignment)))]
            (when-not (empty? missing-tasks)
              (log-warn "Missing assignment for following tasks: " (pr-str missing-tasks));;查找这次操作遗漏的task,打出日志,之后会在下次定时任务执行时执行
              )))))))

refresh-storm-active

获取topology的最新状态。定时任务来获取storm-base中topology的信息,并保存在storm-active-atom中

(defn refresh-storm-active
  ([worker]
(refresh-storm-active worker (fn [& ignored] (schedule (:refresh-active-timer worker) 0 (partial refresh-storm-active worker)))))
  ([worker callback]
    (let [base (.storm-base (:storm-cluster-state worker) (:storm-id worker) callback)]
     (reset!
      (:storm-active-atom worker)
      (= :active (-> base :status :type))
    ))
 ))

与通信相关的几个重要函数:

mk-transfer-local-fn

这个函数在创建worker-data时使用,返回的结果被赋值到transfer-local-fn上。这个函数用来产生并发送消息到executor的接收队列。同一worker内部的executor会通过这个函数来传递消息。

(defn mk-transfer-local-fn [worker]
  (let [short-executor-receive-queue-map (:short-executor-receive-queue-map worker);;存储executor中第一个task的taskId到该executor对应的接收队列的映射关系
    task->short-executor (:task->short-executor worker);;存储从该worker总的taskId到executor中第一个task的taskId的映射关系
    task-getter (comp #(get task->short-executor %) fast-first)];;task-getter通过comp合并了后两个函数,返回传入消息的第一个元素(即taskId,第二个元素是消息体),然后从task->short-executor中去除executor中第一个task的taskId
    (fn [tuple-batch];;接下来定义返回的函数体
      (let [grouped (fast-group-by task-getter tuple-batch)];;把接收到的一组消息tuple-batch按照taskId到executor中第一个task的taskId的规则进行分组
        (fast-map-iter [[short-executor pairs] grouped]
          (let [q (short-executor-receive-queue-map short-executor)];;获取每个executor对应的queue
            (if q;;如果队列不为空
              (disruptor/publish q pairs);;接下来把收到的消息发到每一个队列中
              (log-warn "Received invalid messages for unknown tasks. Dropping... ")
          )))))))

mk-transfer-fn

在创建worker-data时还调用了mk-transfer-fn函数,他与mk-transfer-local-fn类似,他生产的函数用于executor的数据发送。这个函数分为两种情况

  1. worker内的executor消息发送,不需要跨进程发送
  2. 不同worker的executor消息发送,需要跨进程发送

    (defn mk-transfer-fn [worker]
      (let [local-tasks (-> worker :task-ids set);;获取本地taskId
        local-transfer (:transfer-local-fn worker);;获得本地消息传输消息
        ^DisruptorQueue transfer-queue (:transfer-queue worker);;worker对应的消息发送队列
        task->node+port (:cached-task->node+port worker)];;worker缓存的task对node+port的映射关系
        (fn [^KryoTupleSerializer serializer tuple-batch];;方法体
          (let [local (ArrayList.);;定义一个ArrayList用来缓存消息
                remoteMap (HashMap.)]
            (fast-list-iter [[task tuple :as pair] tuple-batch];;遍历接收到的消息
              (if (local-tasks task);;如果task为当前worker的task
                (.add local pair);;则把消息放入到local中
    
                ;;Using java objects directly to avoid performance issues in java code
                (let [node+port (get @task->node+port task)]
                  (when (not (.get remoteMap node+port))
                    (.put remoteMap node+port (ArrayList.)))
                  (let [remote (.get remoteMap node+port)]
                    (.add remote (TaskMessage. task (.serialize serializer tuple)))
                 ))));;把消息根据当前worker中的taskId进行分组,存放在remoteMap中
    
            (local-transfer local);;调用本地消息传输的函数来传输消息
            (disruptor/publish transfer-queue remoteMap);;把分好组的消息发送到 transfer-queue中
      ))))
    

mk-transfer-tuples-handler

mk-transfer-tuples-handler函数处理了消息是如何从 transfer-queue 接收的:

(defn mk-transfer-tuples-handler [worker]
  (let [^DisruptorQueue transfer-queue (:transfer-queue worker);;获取接收队列
    drainer (TransferDrainer.);;构造一个容器用来缓存发送的消息
    node+port->socket (:cached-node+port->socket worker);;获得node+port对socket的映射
    task->node+port (:cached-task->node+port worker);;获得task对node+port的映射
    endpoint-socket-lock (:endpoint-socket-lock worker);;获得readwrite锁
    ]
(disruptor/clojure-handler;;定义了一个clojure-handler
  (fn [packets _ batch-end?];;packets代表这一组消息,batche-end?代表该batch是否结束
    (.add drainer packets);;把packets放入drainer容器中

    (when batch-end?;;如果batch结束了
      (read-locked endpoint-socket-lock;;加读锁
        (let [node+port->socket @node+port->socket]
          (.send drainer node+port->socket)));;通过drainer的send方法发送一批消息到socket
      (.clear drainer))))));;清空drainer容器

这个函数会被定时调用:

 transfer-tuples (mk-transfer-tuples-handler worker)

 transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples)  

shutdown*

shutdown*函数定义了worker在终结自己时要执行的操作:

(fn []
                (log-message "Shutting down worker " storm-id " " assignment-id " " port)
                (doseq [[_ socket] @(:cached-node+port->socket worker)];;遍历socket
                  ;; this will do best effort flushing since the linger period
                  ;; was set on creation
                  (.close socket));;逐一关闭
                (log-message "Shutting down receive thread")
                (receive-thread-shutdown);;关闭消息接收线程
                (log-message "Shut down receive thread")
                (log-message "Terminating messaging context")
                (log-message "Shutting down executors")
                (doseq [executor @executors] (.shutdown executor));;关闭executor
                (log-message "Shut down executors")

                ;;this is fine because the only time this is shared is when it's a local context,
                ;;in which case it's a noop
                (.term ^IContext (:mq-context worker));;IContext调用term方法关闭链接
                (log-message "Shutting down transfer thread")
                (disruptor/halt-with-interrupt! (:transfer-queue worker));;disruptor关闭

                (.interrupt transfer-thread);;关闭消息发送线程
                (.join transfer-thread);;等待关闭结束
                (log-message "Shut down transfer thread")
                (cancel-timer (:heartbeat-timer worker))
                (cancel-timer (:refresh-connections-timer worker))
                (cancel-timer (:refresh-active-timer worker))
                (cancel-timer (:executor-heartbeat-timer worker))
                (cancel-timer (:user-timer worker));;停止所有定时任务

                (close-resources worker);;关闭worker的线程池资源

                ;; TODO: here need to invoke the "shutdown" method of WorkerHook

                (.remove-worker-heartbeat! (:storm-cluster-state worker) storm-id assignment-id port);;从zk上移除当前worker的心跳信息
                (log-message "Disconnecting from storm cluster state context")
                (.disconnect (:storm-cluster-state worker));;与zk断开链接
                (.close (:cluster-state worker))
                (log-message "Shut down worker " storm-id " " assignment-id " " port))