bigbully +

Nimbus

Nimbus的核心代码在backtype.storm.daemon.nimbus.clj中

数据结构

nimbus共享数据结构使用nimbus-data函数获得,其中包括一些Nimbus使用的公共类:

(defn nimbus-data [conf inimbus]
  (let [forced-scheduler (.getForcedScheduler inimbus)]
{:conf conf
 :inimbus inimbus
 :submitted-count (atom 0)
 :storm-cluster-state (cluster/mk-storm-cluster-state conf)
 :submit-lock (Object.)
 :heartbeats-cache (atom {})
 :downloaders (file-cache-map conf)
 :uploaders (file-cache-map conf)
 :uptime (uptime-computer)
 :validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
 :timer (mk-timer :kill-fn (fn [t]
                             (log-error t "Error when processing event")
                             (halt-process! 20 "Error when processing an event")
                             ))
 :scheduler (mk-scheduler conf inimbus)
 }))
  1. conf用来表示配置信息
  2. inimbus是Thrift的接口实现
  3. submitted-count表示Topology的提交数目
  4. storm-cluster-state用于将数据存储到ZK中以及从ZK中读取数据
  5. submit-lock 提交时用的锁
  6. heartbeats-cache 心跳用的cache
  7. downloaders和uploaders分别对应的是用户提交或下载Topology jar包时用到的缓存
  8. uptime记录启动耗时
  9. validator用来验证Topology,不过默认的Topology为空,用来扩展的
  10. 创建一个timer,timer的实现在timer.clj中
  11. 根据inimbus中ForcedScheduler创建一个调度器scheduler

分布式启动

main方法如下:

(defn -main []
  (-launch (standalone-nimbus)))

-launch方法如下,接收nimbus参数,和storm相关配置一起作为launch-server!的函数参数调用:

(defn -launch [nimbus]
  (launch-server! (read-storm-config) nimbus)启动)

在config.clj中实现了read-storm-config函数,他调用java的Utils.readStormConfig静态方法从配置文件中读出配置,使用clojurify-structure转化成clojure的数据结构(如list, map),通过validate-configs-with-schemas进行验证后返回conf。

(defn read-storm-config
  []
  (let [conf (clojurify-structure (Utils/readStormConfig))]
(validate-configs-with-schemas conf)
conf))

standalone-nimbus方法:

(defn standalone-nimbus []
  (reify INimbus
    (prepare [this conf local-dir]
      )
    (allSlotsAvailableForScheduling [this supervisors topologies topologies-missing-assignments]
      (->> supervisors
           (mapcat (fn [^SupervisorDetails s]
                     (for [p (.getMeta s)]
                       (WorkerSlot. (.getId s) p))))
           set ))
    (assignSlots [this topology slots]
      )启动
    (getForcedScheduler [this]
      nil )
    (getHostName [this supervisors node-id]
      (if-let [^SupervisorDetails supervisor (get supervisors node-id)]
        (.getHost supervisor)))
    ))

这里定义了一个协议实现standalone-nimbus, 它实现了java接口INimbus,并实现了其中的三个方法:

  1. allSlotsAvailableForScheduling方法会把传入参数的SupervisorDetails集合supervisors,根据其中定义的id和不同的port构造WorkerSlot,合并后转化成set。
  2. getForcedScheduler返回空SupervisorDetails
  3. getHostName使用if-let函数,在supervisors这个map中查找node-id,如果找到,则赋值到supervisor上,并返回host。如果没有找到,则什么也不做。启动

当我们获得了INimbus的实现后终于可以进行启动前的最后准备了,来看launch-server!函数:

(defn launch-server! [conf nimbus]
  (validate-distributed-mode! conf)
  (let [service-handler (service-handler conf nimbus)
    options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
                (THsHaServer$Args.)
                (.workerThreads 64)
                (.protocolFactory (TBinaryProtocol$Factory. false true (conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)))
                (.processor (Nimbus$Processor. service-handler))
                )
   server (THsHaServer. (do (set! (. options maxReadBufferBytes)(conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)) options))]
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
(log-message "Starting Nimbus server...")
(.serve server)))

这个方法依次做了以下几步:

  1. 对conf进行分布式校验,如果conf中配置的是本地模式,则直接异常抛出
  2. 通过service-handle函数创建handler
  3. 通过一个串行的宏创建Thrift的processor,并用handler作为处理器
  4. 之后使用conf中的属性重置options中的maxReadBufferBytes,并使用options创建THsHaServer
  5. 加入一个关闭handler和server的关闭钩子
  6. 打印一行日志
  7. 启动server

下面来看看如何初始化handler,首先使用了一个common.clj中定义的宏:

(defmacro defserverfn [name & body]
  `(let [exec-fn# (fn ~@body)]
(defn ~name [& args#]
  (try-cause
    (apply exec-fn# args#)
  (catch InterruptedException e#
    (throw e#))
  (catch Throwable t#
    (log-error t# "Error on initialization of server " ~(str name))
    (halt-process! 13 "Error on initialization")
    )))))

这个宏会把函数体放在try-catch中执行,并抛出InterruptedException,并所有Throwable异常进行日志打印。

然后用这个宏来创建handler:

(defserverfn service-handler [conf inimbus]
  (.prepare inimbus conf (master-inimbus-dir conf))
  (log-message "Starting Nimbus with conf " conf)
  (let [nimbus (nimbus-data conf inimbus)]
    (.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) conf)
    (cleanup-corrupt-topologies! nimbus)
    (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
      (transition! nimbus storm-id :startup))
    (schedule-recurring (:timer nimbus)
                    0
                    (conf NIMBUS-MONITOR-FREQ-SECS)
                    (fn []
                      (when (conf NIMBUS-REASSIGN)
                        (locking (:submit-lock nimbus)
                          (mk-assignments nimbus)))
                      (do-cleanup nimbus)
                      ))
    ;; Schedule Nimbus inbox cleaner
    (schedule-recurring (:timer nimbus)(defn nimbus-data [conf inimbus]
  (let [forced-scheduler (.getForcedScheduler inimbus)]
{:conf conf
 :inimbus inimbus
 :submitted-count (atom 0)
 :storm-cluster-state (cluster/mk-storm-cluster-state conf)
 :submit-lock (Object.)
 :heartbeats-cache (atom {})
 :downloaders (file-cache-map conf)
 :uploaders (file-cache-map conf)
 :uptime (uptime-computer)
 :validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
 :timer (mk-timer :kill-fn (fn [t]
                             (log-error t "Error when processing event")
                             (halt-process! 20 "Error when processing an event")
                             ))
 :scheduler (mk-scheduler conf inimbus)
 }))(defn nimbus-data [conf inimbus]
  (let [forced-scheduler (.getForcedScheduler inimbus)]
{:conf conf
 :inimbus inimbus
 :submitted-count (atom 0)
 :storm-cluster-state (cluster/mk-storm-cluster-state conf)
 :submit-lock (Object.)
 :heartbeats-cache (atom {})
 :downloaders (file-cache-map conf)
 :uploaders (file-cache-map conf)
 :uptime (uptime-computer)
 :validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
 :timer (mk-timer :kill-fn (fn [t]
                             (log-error t "Error when processing event")
                             (halt-process! 20 "Error when processing an event")
                             ))
 :scheduler (mk-scheduler conf inimbus)
 }))
                    0
                    (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
                    (fn []
                      (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))
                      ))    
(reify Nimbus$Iface
    ...
)))

介绍一下做了那些事情:

  1. 首先调用了inimbus的prepare方法,不过由于这个方法现在还是空方法,所以不产生任何作用
  2. 打印一行日志,随后进入let赋值的方法
  3. 通过nimbus-data方法把inimbus和conf封装成Nimbus共享数据结构,并赋予nimbus引用
  4. 之后执行ITopologyValidator的prepare方法,如果是DefaultTopologyValidator,则不执行任何操作
  5. cleanup-corrupt-topologies!方法会清楚哪些在ZK上还有但是本地目录没有的Topology,并调用 storm-cluster-state的remove-storm!方法把这些Topology移除
  6. 之后对所有活跃状态的Topology调用transition!方法,把状态设置为:start-up
  7. 使用nimbus中的timer定时执行mk-assignments, do-cleanup, clean-inbox方法。这三个方法在下文会详细分析
  8. 实现了Nimbus$Iface接口
  9. 实现了Shutdownable接口和DaemonCommon协议

下面按顺序分析一下Nimbus$Iface的每一个方法实现

submitTopologyWithOpts

直接在源码中用注释标明每行代码的意图:

(^void submitTopologyWithOpts
    [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology ^SubmitOptions submitOptions]
    (try 
      (assert (not-nil? submitOptions)) ;;对submitOptions做断言
      (validate-topology-name! storm-name) ;;验证Topology name中不能包含特殊字符
      (check-storm-active! nimbus storm-name false) ;;确保当前环境中没有同名的Topology正在运行
      (let [topo-conf (from-json serializedConf)] ;;把json的conf转成clojure的集合类型
        (try
          (validate-configs-with-schemas topo-conf) ;;对配置信息进行验证
          (catch IllegalArgumentException ex
            (throw (InvalidTopologyException. (.getMessage ex))))) ;;如果有误则抛出异常
        (.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus)
                   storm-name
                   topo-conf
                   topology)) ;;用ITopologyValidator对Topology进行验证,默认不执行任何操作
      (swap! (:submitted-count nimbus) inc) ;;原子的把submitted-count设置为1
      (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs)) ;;用Topology的name,index和当前时间设置Topology的id
            storm-conf (normalize-conf
                        conf
                        (-> serializedConf
                            from-json
                            (assoc STORM-ID storm-id)
                          (assoc TOPOLOGY-NAME storm-name))
                        topology) ;;用户传递过来的serializedConf加入Topology的id,name以及TOPOLOGY-KRYO-DECORATORS,TOPOLOGY-KRYO-REGISTER,TOPOLOGY-ACKER-EXECUTORS,TOPOLOGY-MAX-TASK-PARALLELISM四个属性
            total-storm-conf (merge conf storm-conf) ;;把conf与storm-conf进行合并,主要是用conf中默认的配置添加到total-storm-conf中
            topology (normalize-topology total-storm-conf topology);;之后会详细介绍这个方法
            storm-cluster-state (:storm-cluster-state nimbus)]
        (system-topology! total-storm-conf topology) ;;使用system-topology!方法对topology进行验证,之后会详细分析
        (log-message "Received topology submission for " storm-name " with conf " storm-conf)
        ;; lock protects against multiple topologies being submitted at once and
        ;; cleanup thread killing topology in b/w assignment and starting the topology
        (locking (:submit-lock nimbus);;以下逻辑加锁
          (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology);;为当前topology创建本地目录,赋值jar文件,并写入序列化之后的storm配置信息和Topology信息。其中setup-jar方法根据conf中定义的启动模式是:distributed还是:local分别有不同的处理逻辑。:distributed会进行jar的拷贝,而:local则什么也不做。
          (.setup-heartbeats! storm-cluster-state storm-id);;为该topology在zk上创建心跳路径
          (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive TopologyInitialStatus/ACTIVE :active}];;把Java中的常量与clojure中的keyword做成映射
            (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions))));;启动storm
          (mk-assignments nimbus)));;为提交的Topology分配资源
      (catch Throwable e
        (log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
        (throw e))))

在submitTopologyWithOpts函数中,使用了以下几个重要的函数:

  1. mk-assignments主要负责对当前及权重所有的Topology进行新一轮的任务调度。

    (defnk mk-assignments [nimbus :scratch-topology-id nil]
      (let [conf (:conf nimbus) ;;配置信息
        storm-cluster-state (:storm-cluster-state nimbus) ;;与zk交互用服务
        ^INimbus inimbus (:inimbus nimbus) ;;inimbus接口
        ;; read all the topologies
        topology-ids (.active-storms storm-cluster-state) ;;从zk中读出所有活跃的topology的id
        topologies (into {} (for [tid topology-ids] 
                          {tid (read-topology-details nimbus tid)})) ;;获取topology id和detail的映射,并放入map中
        topologies (Topologies. topologies) ;封装成一个Topologies java对象
        ;; read all the assignments
        assigned-topology-ids (.assignments storm-cluster-state nil) ;;获取所有已经分配资源的Topology id的集合
        existing-assignments (into {} (for [tid assigned-topology-ids]
                                    ;; for the topology which wants rebalance (specified by the scratch-topology-id)
                                    ;; we exclude its assignment, meaning that all the slots occupied by its assignment
                                    ;; will be treated as free slot in the scheduler code.
                                    (when (or (nil? scratch-topology-id) (not= tid scratch-topology-id))
                                      {tid (.assignment-info storm-cluster-state tid nil)})));;获取所有已经分配资源的topology的分配信息,如果需要rebalance的topology通过参数scratch-topology-id传入了,则把这个topology从已分配中剔除
        ;; make the new assignments for topologies
        topology->executor->node+port (compute-new-topology->executor->node+port
                                   nimbus
                                   existing-assignments
                                   topologies
                                   scratch-topology-id);;这个方法之后会分析,他为所有topology计算新的调度,并返回。       
        now-secs (current-time-secs) ;;获取当前时间        
        basic-supervisor-details-map (basic-supervisor-details-map storm-cluster-state);;从zk中获取和supervisor信息,并转化成id和supervisorDetail        
        ;; construct the final Assignments by adding start-times etc into it
        new-assignments (into {} (for [[topology-id executor->node+port] topology->executor->node+port
                                    :let [existing-assignment (get existing-assignments topology-id);;获取分配了资源的topology
                                          all-nodes (->> executor->node+port vals (map first) set);;获得所有的node集合
                                          node->host (->> all-nodes
                                                          (mapcat (fn [node]
                                                              (if-let [host (.getHostName inimbus basic-supervisor-details-map node)]
                                                                      [[node host]]
                                                                      )))
                                                          (into {}));;获得node和host的对应关系
                                          all-node->host (merge (:node->host existing-assignment) node->host) ;; 合并得到所有的node对host的映射,如果有相同的node,则使用node->host中的值
                                          reassign-executors (changed-executors (:executor->node+port existing-assignment) executor->node+port) ;;通过对比获得所有重新分配过的Executor
                                          start-times是 (merge (:executor->start-time-secs existing-assignment)
                                                            (into {}
                                                                  (for [id reassign-executors]
                                                                    [id now-secs]
                                                                    )))]];;把重新分配的executor重新设定startTime
                               {topology-id (Assignment.
                                             (master-stormdist-root conf topology-id)
                                             (select-keys all-node->host all-nodes)
                                             executor->node+port
                                             start-times)}))];;创建每个topology id与Assignment对应关系
    
    ;; tasks figure out what tasks to talk to by looking at topology at runtime
    ;; only log/set when there's been a change to the assignment
    (doseq [[topology-id assignment] new-assignments ;;循环新分配好的topology的assignment
            :let [existing-assignment (get existing-assignments topology-id)
                  topology-details (.getById topologies topology-id)]]
      (if (= existing-assignment assignment) ;;查看新旧assignment是否发生任何变化
        (log-debug "Assignment for " topology-id " hasn't changed");;如果没有,打印一条日志
        (do
          (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
          (.set-assignment! storm-cluster-state topology-id assignment);;如果有,除了打印日志,还要同步到zk上
          )))
        (->> new-assignments
          (map (fn [[topology-id assignment]]
            (let [existing-assignment (get existing-assignments topology-id)]
              [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))] 
              )))
          (into {});;获得新增加的Executor转成workerSlot,并转化成topology-id与workerSlot的对应关系
          (.assignSlots inimbus topologies));;之后通过inimbus把topologies和topology-id与workerSlot的对应关系同步到zk上
    ))
    

2.compute-new-topology->executor->node+port函数用来根据上一次资源分配的情况计算这次资源的分配:

    (defn compute-new-topology->executor->node+port [nimbus existing-assignments topologies scratch-topology-id]
      (let [conf (:conf nimbus)
        storm-cluster-state (:storm-cluster-state nimbus)
        topology->executors (compute-topology->executors nimbus (keys existing-assignments));;获得 topology-id -> executors集合
        ;; update the executors heartbeats first.
        _ (update-all-heartbeats! nimbus existing-assignments topology->executors);;更新所有executors心跳
        topology->alive-executors (compute-topology->alive-executors nimbus
                                                                 existing-assignments
                                                                 topologies
                                                                 topology->executors
                                                                 scratch-topology-id) ;;获取topology对alive-executors的集合
        supervisor->dead-ports (compute-supervisor->dead-ports nimbus
                                                           existing-assignments
                                                           topology->executors
                                                           topology->alive-executors);;获取supervisor对dead-ports的集合
        topology->scheduler-assignment (compute-topology->scheduler-assignment nimbus
                                                                           existing-assignments
                                                                           topology->alive-executors);;获取topology对应schedulerAssignmentImpl的集合

        missing-assignment-topologies (->> topologies
                                       .getTopologies
                                       (map (memfn getId))
                                       (filter (fn [t]
                                                  (let [alle (get topology->executors t)
                                                        alivee (get topology->alive-executors t)]
                                                        (or (empty? alle);;1没有任何executor
                                                            (not= alle alivee);;2executor中有不存活的
                                                            (< (-> topology->scheduler-assignment
                                                                   (get t)
                                                                   num-used-workers )
                                                               (-> topologies (.getById t) .getNumWorkers)
                                                               ));;3实际使用的worker数量小于设置的worker数量
                                                        ))));;根据三个判断条件筛选出分配缺失的topology
        all-scheduling-slots (->> (all-scheduling-slots nimbus topologies missing-assignment-topologies)
                              (map (fn [[node-id port]] {node-id #{port}}))
                              (apply merge-with set/union));;获得所有分配缺失的topology对应的workSlot信息,即node-id与port相关信息

        supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports);;获得topology对supervisorDetails的信息
        cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment);;创建cluster对象

        ;; call scheduler.schedule to schedule all the topologies
        ;; the new assignments for all the topologies are in the cluster object.
        _ (.schedule (:scheduler nimbus) topologies cluster);;调用void schedule(Topologies topologies, Cluster cluster)方法,这个方法会为需要的topology进行任务调度
        new-scheduler-assignments (.getAssignments cluster);;获得任务调度的结果:topology-id对应SchedulerAssignment
        ;; add more information to convert SchedulerAssignment to Assignment
        new-topology->executor->node+port (compute-topology->executor->node+port new-scheduler-assignments)];;把topology-id与SchedulerAssignment对应关系转成topology-id对应{executor [node port]}}
        ;; print some useful information.
        (doseq [[topology-id executor->node+port] new-topology->executor->node+port ;;遍历
            :let [old-executor->node+port (-> topology-id
                                      existing-assignments
                                      :executor->node+port);;获得已经存在的executor对应node+port
              reassignment (filter (fn [[executor node+port]]
                                     (and (contains? old-executor->node+port executor)
                                          (not (= node+port (old-executor->node+port executor)))))
                                   executor->node+port)]];;和当前的进行对比,计算出重新分配的assignment
          (when-not (empty? reassignment) ;;如果存在重新分配的情况
            (let [new-slots-cnt (count (set (vals executor->node+port)))
              reassign-executors (keys reassignment)]
          (log-message "Reassigning " topology-id " to " new-slots-cnt " slots")
          (log-message "Reassign executors: " (vec reassign-executors)))));;日志打印创建爱你了多少个workSlot和重新分配的executors
    new-topology->executor->node+port));;最终返回新的topology对应的{executor [node port]}}
  1. clean-inbox会被定时执行,清理过期jar包

    (defn clean-inbox [dir-location seconds]
      "Deletes jar files in dir older than seconds."
      (let [now (current-time-secs);;获取当前时间
            pred #(and (.isFile %) (file-older-than? now seconds %));;定义匿名函数获取过期文件
            files (filter pred (file-seq (File. dir-location)))];;过滤获得过期文件
        (doseq [f files]
          (if (.delete f);;删除文件
            (log-message "Cleaning inbox ... deleted: " (.getName f))
            ;; This should never happen
            (log-error "Cleaning inbox ... error deleting: " (.getName f))
        ))))
    
  2. do-cleanup定期清理

    (defn do-cleanup [nimbus]
      (let [storm-cluster-state (:storm-cluster-state nimbus)
        conf (:conf nimbus)
        submit-lock (:submit-lock nimbus)] 
    (let [to-cleanup-ids (locking submit-lock ;;首先加锁
                           (cleanup-storm-ids conf storm-cluster-state))] ;;获得需要清理的topologyId
      (when-not (empty? to-cleanup-ids) ;;如果不为空
        (doseq [id to-cleanup-ids]
          (log-message "Cleaning up " id)
          (.teardown-heartbeats! storm-cluster-state id) ;;清除心跳
          (.teardown-topology-errors! storm-cluster-state id);;清除topology错误信息
          (rmr (master-stormdist-root conf id));;删除topology对应本地目录
          (swap! (:heartbeats-cache nimbus) dissoc id));;从心跳缓存中删除topologyId
        ))))
    
  3. normalize-topology 用来计算topology的并行度并把配置信息转成json存入topology中

     (defn normalize-topology [storm-conf ^StormTopology topology]
      (let [ret (.deepCopy topology)];;对topology做深度拷贝
        (doseq [[_ component] (all-components ret)];;找到topology中的每一个component
          (.set_json_conf ;;
            (.get_common component)
            (->> {TOPOLOGY-TASKS (component-parallelism storm-conf component)};;汇总component并行度的参数
                 (merge (component-conf component));;与component的conf进行合并
                 to-json )));;然后转成json,并set到component的common属性中
        ret ));;返回topology的深度拷贝
    
  4. system-topology! 这个方法主要用来验证topology并添加一些系统组件和流。

     (defn system-topology! [storm-conf ^StormTopology topology]
      (validate-basic! topology);;验证
      (let [ret (.deepCopy topology)];;做深度拷贝
        (add-acker! storm-conf ret);;增加ack bolt
        (add-metric-components! storm-conf ret);;增加metric bolt
        (add-system-components! storm-conf ret);;增加system bolt
        (add-metric-streams! ret);;添加统计流
        (add-system-streams! ret);;添加系统流
        (validate-structure! ret);;添加完三个bolt后,验证是否符合topology要求
        ret;;返回
      ))callbacks
    
  5. start-storm 用于启动storm

    (defn- start-storm [nimbus storm-name storm-id topology-initial-status]
      {:pre [(#{:active :inactive} topology-initial-status)]};;方法的前置条件,用于验证topology-initial-status参数
      (let [storm-cluster-state (:storm-cluster-state nimbus)
            conf (:conf nimbus)
            storm-conf (read-storm-conf conf storm-id)
            topology (system-topology! storm-conf (read-storm-topology conf storm-id));;通过deepcopy复制一个topology并挂载系统级bolt
            num-executors (->> (all-components topology) (map-val num-start-executors))];;获取component对executor的数量
        (log-message "Activating " storm-name ": " storm-id)
        (.activate-storm! storm-cluster-state;;把stormBase的信息保存到zk上
                          storm-id
                          (StormBase. storm-name;;创建StormBase
                                      (current-time-secs)
                                      {:type topology-initial-status}
                                          (storm-conf TOPOLOGY-WORKERS)
                                      num-executors))))
    

submitTopology

调用submitTopologyWithOpts的另一种方式,只不过submitTopology默认把SubmitOptions设置为ACTIVE

(^void submitTopology
    [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
    (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology (SubmitOptions. TopologyInitialStatus/ACTIVE)))

killTopology & killTopologyWithOpts

杀死Topology的方法

  (^void killTopology [this ^String name];;使用默认KillOptions杀死Topology
    (.killTopologyWithOpts this name (KillOptions.)))

  (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
    (check-storm-active! nimbus storm-name true);;通过zk判断当前topology是否存活
    (let [wait-amt (if (.is_set_wait_secs options);;获取kill等待时间参数
                     (.get_wait_secs options)                         
                     )]
      (transition-name! nimbus storm-name [:kill wait-amt] true);;通过transition-name!方法把状态转变为kill
      ))

rebalance

对Topology进行rebalance,重新分配资源

(^void rebalance [this ^String storm-name ^RebalanceOptions options]
    (check-storm-active! nimbus storm-name true);;检查topology是否存活
    (let [wait-amt (if (.is_set_wait_secs options);;获取状态转移等待时间
                     (.get_wait_secs options))
          num-workers (if (.is_set_num_workers options);;从配置信息中获取worker数
                        (.get_num_workers options))
          executor-overrides (if (.is_set_num_executors options);;从配置信息中获取executor数
                               (.get_num_executors options)
                               {})]
      (doseq [[c num-executors] executor-overrides];;
        (when (<= num-executors 0)
          (throw (InvalidTopologyException. "Number of executors must be greater than 0"))
           ));;检查每一个num-executors的配置
      (transition-name! nimbus storm-name [:rebalance wait-amt num-workers executor-overrides] true);;把状态转变成rebalance
      ))

activate & deactivate

把状态转变为activate或inactivate

 (activate [this storm-name]
    (transition-name! nimbus storm-name :activate true)
    )

 (deactivate [this storm-name]
    (transition-name! nimbus storm-name :inactivate true))

beginFileUpload

开始上传文件

 (beginFileUpload [this]
    (let [fileloc (str (inbox nimbus) "/stormjar-" (uuid) ".jar")];;获得文件目录
      (.put (:uploaders nimbus);;通过uploaders上传文件
            fileloc
            (Channels/newChannel (FileOutputStream. fileloc)))
      (log-message "Uploading file from client to " fileloc)
      fileloc
      ))

uploadChunk

上传byte流

 (^void uploadChunk [this ^String location ^ByteBuffer chunk]
    (let [uploaders (:uploaders nimbus)
          ^WritableByteChannel channel (.get uploaders location)]
      (when-not channel
        (throw (RuntimeException.
                "File for that location does not exist (or timed out)")))
      (.write channel chunk)
      (.put uploaders location channel)
      ))

finishFileUpload

完成文件上传,从uploaders中卸载当前location,并关闭channel

(^void finishFileUpload [this ^String location]
    (let [uploaders (:uploaders nimbus)
          ^WritableByteChannel channel (.get uploaders location)]
      (when-not channel
        (throw (RuntimeException.
                "File for that location does not exist (or timed out)")))
      (.close channel)
      (log-message "Finished uploading file from client: " location)
      (.remove uploaders location)
      ))

beginFileDownload & downloadChunk

与upload相同,beginFileDownload回去downloaders加载并返回认证id。downloadChunk真正根据认证id读取byte流,读取完成后从downloaders卸载。

  (^String beginFileDownload [this ^String file]
    (let [is (BufferFileInputStream. file)
          id (uuid)]
      (.put (:downloaders nimbus) id is)
      id
      ))

  (^ByteBuffer downloadChunk [this ^String id]
    (let [downloaders (:downloaders nimbus)
          ^BufferFileInputStream is (.get downloaders id)]
      (when-not is
        (throw (RuntimeException.
                "Could not find input stream for that id")))
      (let [ret (.read is)]
        (.put downloaders id is)
        (when (empty? ret)
          (.remove downloaders id))
        (ByteBuffer/wrap ret)
        )))

getNimbusConf & getTopologyConf

获取nimbus配置信息,获取topology配置信息

(^String getNimbusConf [this]
    (to-json (:conf nimbus)))

(^String getTopologyConf [this ^String id]
    (to-json (try-read-storm-conf conf id)))

getTopology & getUserTopology

获取带系统bolt的topology和获取用户定义的topology

(^StormTopology getTopology [this ^String id]
    (system-topology! (try-read-storm-conf conf id) (try-read-storm-topology conf id)))

(^StormTopology getUserTopology [this ^String id]
    (try-read-storm-topology conf id))

getClusterInfo

获取整个集群的信息

(^ClusterSummary getClusterInfo [this]
    (let [storm-cluster-state (:storm-cluster-state nimbus);;首先获取storm-cluster-state
          supervisor-infos (all-supervisor-info storm-cluster-state);;获取所有supervisor信息并组装
          ;; TODO: need to get the port info about supervisors...
          ;; in standalone just look at metadata, otherwise just say N/A?
          supervisor-summaries (dofor [[id info] supervisor-infos]
                                      (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
                                            ]
                                        (SupervisorSummary. (:hostname info)
                                                            (:uptime-secs info)
                                                            (count ports)
                                                            (count (:used-ports info))
                                                            id );;组装为SupervisorSummary
                                        ))
          nimbus-uptime ((:uptime nimbus))
          bases (topology-bases storm-cluster-state);;获取存活的topology信息
          topology-summaries (dofor [[id base] bases]
                                    (let [assignment (.assignment-info storm-cluster-state id nil)];;获取每一个topology的部署信息
                                      (TopologySummary. id
                                                        (:storm-name base)
                                                        (->> (:executor->node+port assignment)
                                                             keys
                                                             (mapcat executor-id->tasks)
                                                             count) 
                                                        (->> (:executor->node+port assignment)
                                                             keys
                                                             count)                                                            
                                                        (->> (:executor->node+port assignment)
                                                             vals
                                                             set
                                                             count)
                                                        (time-delta (:launch-time-secs base))
                                                        (extract-status-str base))
                                      ))];;组装成TopologySummary
      (ClusterSummary. supervisor-summaries
                       nimbus-uptime
                       topology-summaries);;所有信息组装成ClusterSummary
      ))

getTopologyInfo

获取topology信息

(^TopologyInfo   [this ^String storm-id]
    (let [storm-cluster-state (:storm-cluster-state nimbus);;首先获取storm-cluster-state
          task->component (storm-task-info (try-read-storm-topology conf storm-id) (try-read-storm-conf conf storm-id));;获得task与component的映射
          base (.storm-base storm-cluster-state storm-id nil);;获取stormbase
          assignment (.assignment-info storm-cluster-state storm-id nil);;获取部署信息
          beats (.executor-beats storm-cluster-state storm-id (:executor->node+port assignment));;获取心跳信息
          all-components (-> task->component reverse-map keys);;获取所有component
          errors (->> all-components
                      (map (fn [c] [c (get-errors storm-cluster-state storm-id c)]));;收集错误信息
                      (into {}))
          executor-summaries (dofor [[executor [node port]] (:executor->node+port assignment)]
                                    (let [host (-> assignment :node->host (get node))
                                          heartbeat (get beats executor)
                                          stats (:stats heartbeat)
                                          stats (if stats
                                                  (stats/thriftify-executor-stats stats))]
                                      (doto
                                          (ExecutorSummary. (thriftify-executor-id executor)
                                                            (-> executor first task->component)
                                                            host
                                                            port
                                                            (nil-to-zero (:uptime heartbeat)))
                                        (.set_stats stats))
                                      ))
          ]
      (TopologyInfo. storm-id
                     (:storm-name base)
                     (time-delta (:launch-time-secs base))
                     executor-summaries
                     (extract-status-str base)
                     errors
                     )
      ))

storm-cluster-state和zk

任何状态信息,以及整个集群信息基本都存在zk上,而nimbus主要通过storm-cluster-state与zk交互,接下来主要研究一下storm-cluster-state中到底是如何维护状态信息的。

storm-cluster-state会在构建nimbus-data的时候创建:

(defn mk-storm-cluster-state
  [cluster-state-spec]
  (let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
                            [false cluster-state-spec]
                            [true (mk-distributed-cluster-state cluster-state-spec)]);;根据传入的cluster-state-spec是否继承了ClusterState来判断是否是分布式的还是单机的,分布式的cluster-state会与zk做交互.
    assignment-info-callback (atom {})
    supervisors-callback (atom nil)
    assignments-callback (atom nil)
    storm-base-callback (atom {})
    state-id (register;;cluster-state的register方法会向callbacks变量注册回调函数,注册的回调函数会根据type和path来决定是否被调用
               cluster-statecallbacks
               (fn [type path]
                 (let [[subtree & args] (tokenize-path path)]
                   (condp = subtree
                     ASSIGNMENTS-ROOT (if (empty? args)
                                        (issue-callback! assignments-callback)
                                        (issue-map-callback! assignment-info-callback (first args)))
                     SUPERVISORS-ROOT (issue-callback! supervisors-callback)
                     STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
                     ;; this should never happen
                     (halt-process! 30 "Unknown callback for subtree " subtree args)))))]
(doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]]
  (mkdirs cluster-state p));;分别创建目录
(reify
  StormClusterState
    ...
)))

在 mk-storm-cluster-state中实现了StormClusterState协议:

assignments

如果callback不为空,会重新设定assignments-callback,之后会根据ASSIGNMENTS-SUBTREE获取zk的子节点,并执行callback

(assignments
    [this callback]
    (when callback
      (reset! assignments-callback callback))
    (get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))

assignment-info

会在assignment-info-callback中设置topologyId与callback的映射,之后会根据assignment-path storm-id从zk中获取节点信息,如果有相关信息则进行反序列化

(assignment-info
    [this storm-id callback]
    (when callback
      (swap! assignment-info-callback assoc storm-id callback))
    (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback))))

active-storms

获取所有存活的topology

(active-storms
    [this]
    (get-children cluster-state STORMS-SUBTREE false))

heartbeat-storms

获取在发送心跳的topology

(heartbeat-storms
    [this]
    (get-children cluster-state WORKERBEATS-SUBTREE false))

error-topologies

获取发生错误的topology

(error-topologies
    [this]
    (get-children cluster-state ERRORS-SUBTREE false))

get-worker-heartbeat

获得某一个topology的心跳信息

  (get-worker-heartbeat
    [this storm-id node port]
    (-> cluster-state
        (get-data (workerbeat-path storm-id node port) false)

executor-beats

获取topology心跳数据并转化成executor心跳数据

   (executor-beats
    [this storm-id executor->node+port]
    ;; need to take executor->node+port in explicitly so that we don't run into a situation where a
    ;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
    ;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
    ;; we avoid situations like that
    (let [node+port->executors (reverse-map executor->node+port)
          all-heartbeats (for [[[node port] executors] node+port->executors]
                           (->> (get-worker-heartbeat this storm-id node port)
                                (convert-executor-beats executors)
                                ))]
      (apply merge all-heartbeats)))

supervisors

获取所有supervisor,并重设callback

 (supervisors
    [this callback]
    (when callback
      (reset! supervisors-callback callback))
    (get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback)))

supervisor-info

获取某个supervisor的信息

(supervisor-info
    [this supervisor-id]
    (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false)))

worker-heartbeat!

获取某个worker心跳信息

(worker-heartbeat!
    [this storm-id node port info]
    (set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info)))

remove-worker-heartbeat!

删除某个worker的心跳信息

(remove-worker-heartbeat!
    [this storm-id node port]
    (delete-node cluster-state (workerbeat-path storm-id node port)))

setup-heartbeats! & teardown-heartbeats!

创建和删除topology心跳信息

(setup-heartbeats!
    [this storm-id]
    (mkdirs cluster-state (workerbeat-storm-root storm-id)))

  (teardown-heartbeats!
    [this storm-id]
    (try-cause
      (delete-node cluster-state (workerbeat-storm-root storm-id))
      (catch KeeperException e
        (log-warn-error e "Could not teardown heartbeats for " storm-id))))

teardown-topology-errors!

删除topology的错误信息

 (teardown-topology-errors!
    [this storm-id]
    (try-cause
      (delete-node cluster-state (error-storm-root storm-id))
      (catch KeeperException e
        (log-warn-error e "Could not teardown errors for " storm-id))))

supervisor-heartbeat!

设置supervisor的心跳信息节点

(supervisor-heartbeat!
    [this supervisor-id info]
    (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info)))

activate-storm!

在topology对应的饿节点上设置stormbase信息

(activate-storm!
    [this storm-id storm-base]
    (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base)))

update-storm!

更新topology的stormbase信息,把新信息与就信息进行合并

 (update-storm!
    [this storm-id new-elems]
    (let [base (storm-base this storm-id nil)
          executors (:component->executors base)
          new-elems (update new-elems :component->executors (partial merge executors))]
      (set-data cluster信息-state (storm-path storm-id)
                (-> base
                    (merge new-elems)
                    Utils/serialize))))

storm-base

获取topology的stormbase

(storm-base
    [this storm-id callback]
    (when callback
      (swap! storm-base-callback assoc storm-id callback))
    (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback))))

remove-storm-base!

移除topology的stormbase

(remove-storm-base!
    [this storm-id]
    (delete-node cluster-state (storm-path storm-id)))

set-assignment!

设置topology的assignment

  (set-assignment!
    [this storm-id info]
    (set-data cluster-state (assignment-path storm-id) (Utils/serialize info)))

remove-storm!

删除topology及他相关的stormbase

(remove-storm!
    [this storm-id]
    (delete-node cluster-state (assignment-path storm-id))
    (remove-storm-base! this storm-id))

report-error

上报一条error信息,并删除老的error信息

  (report-error
    [this storm-id component-id error]
    (let [path (error-path storm-id component-id)
          data {:time-secs (current-time-secs) :error (stringify-error error)}
          _ (mkdirs cluster-state path)
          _ (create-sequential cluster-state (str path "/e") (Utils/serialize data))
          to-kill (->> (get-children cluster-state path false)
                       (sort-by parse-error-path)
                       reverse
                       (drop 10))]
      (doseq [k to-kill]
        (delete-node cluster-state (str path "/" k)))))

errors

获得某个topology中某个component的error信息

(errors
    [this storm-id component-id]
    (let [path (error-path storm-id component-id)
          _ (mkdirs cluster-state path)
          children (get-children cluster-state path false)
          errors (dofor [c children]
                        (let [data (-> (get-data cluster-state (str path "/" c) false)
                                       maybe-deserialize)]
                          (when data
                            (struct TaskError (:error data) (:time-secs data))
                            )))
          ]
      (->> (filter not-nil? errors)
           (sort-by (comp - :time-secs)))))

disconnect

unregister某个topology的callback信息,如果是solo模式,则关闭这个cluster-state

(disconnect
    [this]
    (unregister cluster-state state-id)
    (when solo?
      (close cluster-state))))))

状态转移

本地启动

当我们在本机测试Topology时会用到:

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());

所以我非常想了解一下submitTopology方法都做了些什么。深入源码后我发现LocalCluster类是用clojure实现的。

LocalCluster类在ns中定义了如下所示的gen-class,他实现了java中的ILocalCluster

LocalCluster.clj...

(ns backtype.storm.LocalCluster
  (:use [backtype.storm testing config])
  (:import [java.util Map])
  (:gen-class
    :init init
    :implements [backtype.storm.ILocalCluster]
    :constructors {[] [] [java.util.Map] []}
    :state state))

init方法中提供了两个构造函数:

LocalCluster.clj...

(defn -init
  ([]
   (let [ret (mk-local-storm-cluster
           :daemon-conf
           {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})]
 [[] ret]))
  ([^Map stateMap]
   [[] stateMap]))

其中无参的构造函数会调用mk-local-storm-cluster方法返回带着配置信息的map,除此之外还提供java.util.Map类型的构造函数,map会直接设置到state属性中。

做完上述铺垫后,终于可以看看submitTopology的实现了。

LocalCluster.clj...

(defn -submitTopology
  [this name conf topology]
  (submit-local-topology
    (:nimbus (. this state)) name conf topology))

submitTopology方法接收三个参数,name、conf自不必多说,topology是我们通过TopologyBuilder构建的StormTopology。接下来会调用submit-local-topology方法,并把this.state中的nimbus对应的配置和name, conf, topology作为实参。

以为submit-local-topology和其他local方法实际上都是本地测试用的,所以被放在testing.clj中,代码如下:

testing.clj...

(defn submit-local-topology
  [nimbus storm-name conf topology]
  (when-not (Utils/isValidConf conf)
    (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
  (.submitTopology nimbus storm-name nil (to-json conf) topology))

这个方法校验了conf,如果校验失败,抛出IllegalArgumentException异常,如果正常,则把conf转成json之后调用java类Nimbus中的Thrift服务submitTopology提交Topology。