Nimbus
2014-11-24
Nimbus的核心代码在backtype.storm.daemon.nimbus.clj中
数据结构
nimbus共享数据结构使用nimbus-data函数获得,其中包括一些Nimbus使用的公共类:
(defn nimbus-data [conf inimbus]
(let [forced-scheduler (.getForcedScheduler inimbus)]
{:conf conf
:inimbus inimbus
:submitted-count (atom 0)
:storm-cluster-state (cluster/mk-storm-cluster-state conf)
:submit-lock (Object.)
:heartbeats-cache (atom {})
:downloaders (file-cache-map conf)
:uploaders (file-cache-map conf)
:uptime (uptime-computer)
:validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
:timer (mk-timer :kill-fn (fn [t]
(log-error t "Error when processing event")
(halt-process! 20 "Error when processing an event")
))
:scheduler (mk-scheduler conf inimbus)
}))
- conf用来表示配置信息
- inimbus是Thrift的接口实现
- submitted-count表示Topology的提交数目
- storm-cluster-state用于将数据存储到ZK中以及从ZK中读取数据
- submit-lock 提交时用的锁
- heartbeats-cache 心跳用的cache
- downloaders和uploaders分别对应的是用户提交或下载Topology jar包时用到的缓存
- uptime记录启动耗时
- validator用来验证Topology,不过默认的Topology为空,用来扩展的
- 创建一个timer,timer的实现在timer.clj中
- 根据inimbus中ForcedScheduler创建一个调度器scheduler
分布式启动
main方法如下:
(defn -main []
(-launch (standalone-nimbus)))
-launch方法如下,接收nimbus参数,和storm相关配置一起作为launch-server!的函数参数调用:
(defn -launch [nimbus]
(launch-server! (read-storm-config) nimbus)启动)
在config.clj中实现了read-storm-config函数,他调用java的Utils.readStormConfig静态方法从配置文件中读出配置,使用clojurify-structure转化成clojure的数据结构(如list, map),通过validate-configs-with-schemas进行验证后返回conf。
(defn read-storm-config
[]
(let [conf (clojurify-structure (Utils/readStormConfig))]
(validate-configs-with-schemas conf)
conf))
standalone-nimbus方法:
(defn standalone-nimbus []
(reify INimbus
(prepare [this conf local-dir]
)
(allSlotsAvailableForScheduling [this supervisors topologies topologies-missing-assignments]
(->> supervisors
(mapcat (fn [^SupervisorDetails s]
(for [p (.getMeta s)]
(WorkerSlot. (.getId s) p))))
set ))
(assignSlots [this topology slots]
)启动
(getForcedScheduler [this]
nil )
(getHostName [this supervisors node-id]
(if-let [^SupervisorDetails supervisor (get supervisors node-id)]
(.getHost supervisor)))
))
这里定义了一个协议实现standalone-nimbus, 它实现了java接口INimbus,并实现了其中的三个方法:
- allSlotsAvailableForScheduling方法会把传入参数的SupervisorDetails集合supervisors,根据其中定义的id和不同的port构造WorkerSlot,合并后转化成set。
- getForcedScheduler返回空SupervisorDetails
- getHostName使用if-let函数,在supervisors这个map中查找node-id,如果找到,则赋值到supervisor上,并返回host。如果没有找到,则什么也不做。启动
当我们获得了INimbus的实现后终于可以进行启动前的最后准备了,来看launch-server!函数:
(defn launch-server! [conf nimbus]
(validate-distributed-mode! conf)
(let [service-handler (service-handler conf nimbus)
options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
(THsHaServer$Args.)
(.workerThreads 64)
(.protocolFactory (TBinaryProtocol$Factory. false true (conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)))
(.processor (Nimbus$Processor. service-handler))
)
server (THsHaServer. (do (set! (. options maxReadBufferBytes)(conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)) options))]
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
(log-message "Starting Nimbus server...")
(.serve server)))
这个方法依次做了以下几步:
- 对conf进行分布式校验,如果conf中配置的是本地模式,则直接异常抛出
- 通过service-handle函数创建handler
- 通过一个串行的宏创建Thrift的processor,并用handler作为处理器
- 之后使用conf中的属性重置options中的maxReadBufferBytes,并使用options创建THsHaServer
- 加入一个关闭handler和server的关闭钩子
- 打印一行日志
- 启动server
下面来看看如何初始化handler,首先使用了一个common.clj中定义的宏:
(defmacro defserverfn [name & body]
`(let [exec-fn# (fn ~@body)]
(defn ~name [& args#]
(try-cause
(apply exec-fn# args#)
(catch InterruptedException e#
(throw e#))
(catch Throwable t#
(log-error t# "Error on initialization of server " ~(str name))
(halt-process! 13 "Error on initialization")
)))))
这个宏会把函数体放在try-catch中执行,并抛出InterruptedException,并所有Throwable异常进行日志打印。
然后用这个宏来创建handler:
(defserverfn service-handler [conf inimbus]
(.prepare inimbus conf (master-inimbus-dir conf))
(log-message "Starting Nimbus with conf " conf)
(let [nimbus (nimbus-data conf inimbus)]
(.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) conf)
(cleanup-corrupt-topologies! nimbus)
(doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
(transition! nimbus storm-id :startup))
(schedule-recurring (:timer nimbus)
0
(conf NIMBUS-MONITOR-FREQ-SECS)
(fn []
(when (conf NIMBUS-REASSIGN)
(locking (:submit-lock nimbus)
(mk-assignments nimbus)))
(do-cleanup nimbus)
))
;; Schedule Nimbus inbox cleaner
(schedule-recurring (:timer nimbus)(defn nimbus-data [conf inimbus]
(let [forced-scheduler (.getForcedScheduler inimbus)]
{:conf conf
:inimbus inimbus
:submitted-count (atom 0)
:storm-cluster-state (cluster/mk-storm-cluster-state conf)
:submit-lock (Object.)
:heartbeats-cache (atom {})
:downloaders (file-cache-map conf)
:uploaders (file-cache-map conf)
:uptime (uptime-computer)
:validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
:timer (mk-timer :kill-fn (fn [t]
(log-error t "Error when processing event")
(halt-process! 20 "Error when processing an event")
))
:scheduler (mk-scheduler conf inimbus)
}))(defn nimbus-data [conf inimbus]
(let [forced-scheduler (.getForcedScheduler inimbus)]
{:conf conf
:inimbus inimbus
:submitted-count (atom 0)
:storm-cluster-state (cluster/mk-storm-cluster-state conf)
:submit-lock (Object.)
:heartbeats-cache (atom {})
:downloaders (file-cache-map conf)
:uploaders (file-cache-map conf)
:uptime (uptime-computer)
:validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
:timer (mk-timer :kill-fn (fn [t]
(log-error t "Error when processing event")
(halt-process! 20 "Error when processing an event")
))
:scheduler (mk-scheduler conf inimbus)
}))
0
(conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
(fn []
(clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))
))
(reify Nimbus$Iface
...
)))
介绍一下做了那些事情:
- 首先调用了inimbus的prepare方法,不过由于这个方法现在还是空方法,所以不产生任何作用
- 打印一行日志,随后进入let赋值的方法
- 通过nimbus-data方法把inimbus和conf封装成Nimbus共享数据结构,并赋予nimbus引用
- 之后执行ITopologyValidator的prepare方法,如果是DefaultTopologyValidator,则不执行任何操作
- cleanup-corrupt-topologies!方法会清楚哪些在ZK上还有但是本地目录没有的Topology,并调用 storm-cluster-state的remove-storm!方法把这些Topology移除
- 之后对所有活跃状态的Topology调用transition!方法,把状态设置为:start-up
- 使用nimbus中的timer定时执行mk-assignments, do-cleanup, clean-inbox方法。这三个方法在下文会详细分析
- 实现了Nimbus$Iface接口
- 实现了Shutdownable接口和DaemonCommon协议
下面按顺序分析一下Nimbus$Iface的每一个方法实现
submitTopologyWithOpts
直接在源码中用注释标明每行代码的意图:
(^void submitTopologyWithOpts
[this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology ^SubmitOptions submitOptions]
(try
(assert (not-nil? submitOptions)) ;;对submitOptions做断言
(validate-topology-name! storm-name) ;;验证Topology name中不能包含特殊字符
(check-storm-active! nimbus storm-name false) ;;确保当前环境中没有同名的Topology正在运行
(let [topo-conf (from-json serializedConf)] ;;把json的conf转成clojure的集合类型
(try
(validate-configs-with-schemas topo-conf) ;;对配置信息进行验证
(catch IllegalArgumentException ex
(throw (InvalidTopologyException. (.getMessage ex))))) ;;如果有误则抛出异常
(.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus)
storm-name
topo-conf
topology)) ;;用ITopologyValidator对Topology进行验证,默认不执行任何操作
(swap! (:submitted-count nimbus) inc) ;;原子的把submitted-count设置为1
(let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs)) ;;用Topology的name,index和当前时间设置Topology的id
storm-conf (normalize-conf
conf
(-> serializedConf
from-json
(assoc STORM-ID storm-id)
(assoc TOPOLOGY-NAME storm-name))
topology) ;;用户传递过来的serializedConf加入Topology的id,name以及TOPOLOGY-KRYO-DECORATORS,TOPOLOGY-KRYO-REGISTER,TOPOLOGY-ACKER-EXECUTORS,TOPOLOGY-MAX-TASK-PARALLELISM四个属性
total-storm-conf (merge conf storm-conf) ;;把conf与storm-conf进行合并,主要是用conf中默认的配置添加到total-storm-conf中
topology (normalize-topology total-storm-conf topology);;之后会详细介绍这个方法
storm-cluster-state (:storm-cluster-state nimbus)]
(system-topology! total-storm-conf topology) ;;使用system-topology!方法对topology进行验证,之后会详细分析
(log-message "Received topology submission for " storm-name " with conf " storm-conf)
;; lock protects against multiple topologies being submitted at once and
;; cleanup thread killing topology in b/w assignment and starting the topology
(locking (:submit-lock nimbus);;以下逻辑加锁
(setup-storm-code conf storm-id uploadedJarLocation storm-conf topology);;为当前topology创建本地目录,赋值jar文件,并写入序列化之后的storm配置信息和Topology信息。其中setup-jar方法根据conf中定义的启动模式是:distributed还是:local分别有不同的处理逻辑。:distributed会进行jar的拷贝,而:local则什么也不做。
(.setup-heartbeats! storm-cluster-state storm-id);;为该topology在zk上创建心跳路径
(let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive TopologyInitialStatus/ACTIVE :active}];;把Java中的常量与clojure中的keyword做成映射
(start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions))));;启动storm
(mk-assignments nimbus)));;为提交的Topology分配资源
(catch Throwable e
(log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
(throw e))))
在submitTopologyWithOpts函数中,使用了以下几个重要的函数:
mk-assignments主要负责对当前及权重所有的Topology进行新一轮的任务调度。
(defnk mk-assignments [nimbus :scratch-topology-id nil] (let [conf (:conf nimbus) ;;配置信息 storm-cluster-state (:storm-cluster-state nimbus) ;;与zk交互用服务 ^INimbus inimbus (:inimbus nimbus) ;;inimbus接口 ;; read all the topologies topology-ids (.active-storms storm-cluster-state) ;;从zk中读出所有活跃的topology的id topologies (into {} (for [tid topology-ids] {tid (read-topology-details nimbus tid)})) ;;获取topology id和detail的映射,并放入map中 topologies (Topologies. topologies) ;封装成一个Topologies java对象 ;; read all the assignments assigned-topology-ids (.assignments storm-cluster-state nil) ;;获取所有已经分配资源的Topology id的集合 existing-assignments (into {} (for [tid assigned-topology-ids] ;; for the topology which wants rebalance (specified by the scratch-topology-id) ;; we exclude its assignment, meaning that all the slots occupied by its assignment ;; will be treated as free slot in the scheduler code. (when (or (nil? scratch-topology-id) (not= tid scratch-topology-id)) {tid (.assignment-info storm-cluster-state tid nil)})));;获取所有已经分配资源的topology的分配信息,如果需要rebalance的topology通过参数scratch-topology-id传入了,则把这个topology从已分配中剔除 ;; make the new assignments for topologies topology->executor->node+port (compute-new-topology->executor->node+port nimbus existing-assignments topologies scratch-topology-id);;这个方法之后会分析,他为所有topology计算新的调度,并返回。 now-secs (current-time-secs) ;;获取当前时间 basic-supervisor-details-map (basic-supervisor-details-map storm-cluster-state);;从zk中获取和supervisor信息,并转化成id和supervisorDetail ;; construct the final Assignments by adding start-times etc into it new-assignments (into {} (for [[topology-id executor->node+port] topology->executor->node+port :let [existing-assignment (get existing-assignments topology-id);;获取分配了资源的topology all-nodes (->> executor->node+port vals (map first) set);;获得所有的node集合 node->host (->> all-nodes (mapcat (fn [node] (if-let [host (.getHostName inimbus basic-supervisor-details-map node)] [[node host]] ))) (into {}));;获得node和host的对应关系 all-node->host (merge (:node->host existing-assignment) node->host) ;; 合并得到所有的node对host的映射,如果有相同的node,则使用node->host中的值 reassign-executors (changed-executors (:executor->node+port existing-assignment) executor->node+port) ;;通过对比获得所有重新分配过的Executor start-times是 (merge (:executor->start-time-secs existing-assignment) (into {} (for [id reassign-executors] [id now-secs] )))]];;把重新分配的executor重新设定startTime {topology-id (Assignment. (master-stormdist-root conf topology-id) (select-keys all-node->host all-nodes) executor->node+port start-times)}))];;创建每个topology id与Assignment对应关系 ;; tasks figure out what tasks to talk to by looking at topology at runtime ;; only log/set when there's been a change to the assignment (doseq [[topology-id assignment] new-assignments ;;循环新分配好的topology的assignment :let [existing-assignment (get existing-assignments topology-id) topology-details (.getById topologies topology-id)]] (if (= existing-assignment assignment) ;;查看新旧assignment是否发生任何变化 (log-debug "Assignment for " topology-id " hasn't changed");;如果没有,打印一条日志 (do (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment)) (.set-assignment! storm-cluster-state topology-id assignment);;如果有,除了打印日志,还要同步到zk上 ))) (->> new-assignments (map (fn [[topology-id assignment]] (let [existing-assignment (get existing-assignments topology-id)] [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))] ))) (into {});;获得新增加的Executor转成workerSlot,并转化成topology-id与workerSlot的对应关系 (.assignSlots inimbus topologies));;之后通过inimbus把topologies和topology-id与workerSlot的对应关系同步到zk上 ))
2.compute-new-topology->executor->node+port函数用来根据上一次资源分配的情况计算这次资源的分配:
(defn compute-new-topology->executor->node+port [nimbus existing-assignments topologies scratch-topology-id]
(let [conf (:conf nimbus)
storm-cluster-state (:storm-cluster-state nimbus)
topology->executors (compute-topology->executors nimbus (keys existing-assignments));;获得 topology-id -> executors集合
;; update the executors heartbeats first.
_ (update-all-heartbeats! nimbus existing-assignments topology->executors);;更新所有executors心跳
topology->alive-executors (compute-topology->alive-executors nimbus
existing-assignments
topologies
topology->executors
scratch-topology-id) ;;获取topology对alive-executors的集合
supervisor->dead-ports (compute-supervisor->dead-ports nimbus
existing-assignments
topology->executors
topology->alive-executors);;获取supervisor对dead-ports的集合
topology->scheduler-assignment (compute-topology->scheduler-assignment nimbus
existing-assignments
topology->alive-executors);;获取topology对应schedulerAssignmentImpl的集合
missing-assignment-topologies (->> topologies
.getTopologies
(map (memfn getId))
(filter (fn [t]
(let [alle (get topology->executors t)
alivee (get topology->alive-executors t)]
(or (empty? alle);;1没有任何executor
(not= alle alivee);;2executor中有不存活的
(< (-> topology->scheduler-assignment
(get t)
num-used-workers )
(-> topologies (.getById t) .getNumWorkers)
));;3实际使用的worker数量小于设置的worker数量
))));;根据三个判断条件筛选出分配缺失的topology
all-scheduling-slots (->> (all-scheduling-slots nimbus topologies missing-assignment-topologies)
(map (fn [[node-id port]] {node-id #{port}}))
(apply merge-with set/union));;获得所有分配缺失的topology对应的workSlot信息,即node-id与port相关信息
supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports);;获得topology对supervisorDetails的信息
cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment);;创建cluster对象
;; call scheduler.schedule to schedule all the topologies
;; the new assignments for all the topologies are in the cluster object.
_ (.schedule (:scheduler nimbus) topologies cluster);;调用void schedule(Topologies topologies, Cluster cluster)方法,这个方法会为需要的topology进行任务调度
new-scheduler-assignments (.getAssignments cluster);;获得任务调度的结果:topology-id对应SchedulerAssignment
;; add more information to convert SchedulerAssignment to Assignment
new-topology->executor->node+port (compute-topology->executor->node+port new-scheduler-assignments)];;把topology-id与SchedulerAssignment对应关系转成topology-id对应{executor [node port]}}
;; print some useful information.
(doseq [[topology-id executor->node+port] new-topology->executor->node+port ;;遍历
:let [old-executor->node+port (-> topology-id
existing-assignments
:executor->node+port);;获得已经存在的executor对应node+port
reassignment (filter (fn [[executor node+port]]
(and (contains? old-executor->node+port executor)
(not (= node+port (old-executor->node+port executor)))))
executor->node+port)]];;和当前的进行对比,计算出重新分配的assignment
(when-not (empty? reassignment) ;;如果存在重新分配的情况
(let [new-slots-cnt (count (set (vals executor->node+port)))
reassign-executors (keys reassignment)]
(log-message "Reassigning " topology-id " to " new-slots-cnt " slots")
(log-message "Reassign executors: " (vec reassign-executors)))));;日志打印创建爱你了多少个workSlot和重新分配的executors
new-topology->executor->node+port));;最终返回新的topology对应的{executor [node port]}}
clean-inbox会被定时执行,清理过期jar包
(defn clean-inbox [dir-location seconds] "Deletes jar files in dir older than seconds." (let [now (current-time-secs);;获取当前时间 pred #(and (.isFile %) (file-older-than? now seconds %));;定义匿名函数获取过期文件 files (filter pred (file-seq (File. dir-location)))];;过滤获得过期文件 (doseq [f files] (if (.delete f);;删除文件 (log-message "Cleaning inbox ... deleted: " (.getName f)) ;; This should never happen (log-error "Cleaning inbox ... error deleting: " (.getName f)) ))))
do-cleanup定期清理
(defn do-cleanup [nimbus] (let [storm-cluster-state (:storm-cluster-state nimbus) conf (:conf nimbus) submit-lock (:submit-lock nimbus)] (let [to-cleanup-ids (locking submit-lock ;;首先加锁 (cleanup-storm-ids conf storm-cluster-state))] ;;获得需要清理的topologyId (when-not (empty? to-cleanup-ids) ;;如果不为空 (doseq [id to-cleanup-ids] (log-message "Cleaning up " id) (.teardown-heartbeats! storm-cluster-state id) ;;清除心跳 (.teardown-topology-errors! storm-cluster-state id);;清除topology错误信息 (rmr (master-stormdist-root conf id));;删除topology对应本地目录 (swap! (:heartbeats-cache nimbus) dissoc id));;从心跳缓存中删除topologyId ))))
normalize-topology 用来计算topology的并行度并把配置信息转成json存入topology中
(defn normalize-topology [storm-conf ^StormTopology topology] (let [ret (.deepCopy topology)];;对topology做深度拷贝 (doseq [[_ component] (all-components ret)];;找到topology中的每一个component (.set_json_conf ;; (.get_common component) (->> {TOPOLOGY-TASKS (component-parallelism storm-conf component)};;汇总component并行度的参数 (merge (component-conf component));;与component的conf进行合并 to-json )));;然后转成json,并set到component的common属性中 ret ));;返回topology的深度拷贝
system-topology! 这个方法主要用来验证topology并添加一些系统组件和流。
(defn system-topology! [storm-conf ^StormTopology topology] (validate-basic! topology);;验证 (let [ret (.deepCopy topology)];;做深度拷贝 (add-acker! storm-conf ret);;增加ack bolt (add-metric-components! storm-conf ret);;增加metric bolt (add-system-components! storm-conf ret);;增加system bolt (add-metric-streams! ret);;添加统计流 (add-system-streams! ret);;添加系统流 (validate-structure! ret);;添加完三个bolt后,验证是否符合topology要求 ret;;返回 ))callbacks
start-storm 用于启动storm
(defn- start-storm [nimbus storm-name storm-id topology-initial-status] {:pre [(#{:active :inactive} topology-initial-status)]};;方法的前置条件,用于验证topology-initial-status参数 (let [storm-cluster-state (:storm-cluster-state nimbus) conf (:conf nimbus) storm-conf (read-storm-conf conf storm-id) topology (system-topology! storm-conf (read-storm-topology conf storm-id));;通过deepcopy复制一个topology并挂载系统级bolt num-executors (->> (all-components topology) (map-val num-start-executors))];;获取component对executor的数量 (log-message "Activating " storm-name ": " storm-id) (.activate-storm! storm-cluster-state;;把stormBase的信息保存到zk上 storm-id (StormBase. storm-name;;创建StormBase (current-time-secs) {:type topology-initial-status} (storm-conf TOPOLOGY-WORKERS) num-executors))))
submitTopology
调用submitTopologyWithOpts的另一种方式,只不过submitTopology默认把SubmitOptions设置为ACTIVE
(^void submitTopology
[this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
(.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology (SubmitOptions. TopologyInitialStatus/ACTIVE)))
killTopology & killTopologyWithOpts
杀死Topology的方法
(^void killTopology [this ^String name];;使用默认KillOptions杀死Topology
(.killTopologyWithOpts this name (KillOptions.)))
(^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
(check-storm-active! nimbus storm-name true);;通过zk判断当前topology是否存活
(let [wait-amt (if (.is_set_wait_secs options);;获取kill等待时间参数
(.get_wait_secs options)
)]
(transition-name! nimbus storm-name [:kill wait-amt] true);;通过transition-name!方法把状态转变为kill
))
rebalance
对Topology进行rebalance,重新分配资源
(^void rebalance [this ^String storm-name ^RebalanceOptions options]
(check-storm-active! nimbus storm-name true);;检查topology是否存活
(let [wait-amt (if (.is_set_wait_secs options);;获取状态转移等待时间
(.get_wait_secs options))
num-workers (if (.is_set_num_workers options);;从配置信息中获取worker数
(.get_num_workers options))
executor-overrides (if (.is_set_num_executors options);;从配置信息中获取executor数
(.get_num_executors options)
{})]
(doseq [[c num-executors] executor-overrides];;
(when (<= num-executors 0)
(throw (InvalidTopologyException. "Number of executors must be greater than 0"))
));;检查每一个num-executors的配置
(transition-name! nimbus storm-name [:rebalance wait-amt num-workers executor-overrides] true);;把状态转变成rebalance
))
activate & deactivate
把状态转变为activate或inactivate
(activate [this storm-name]
(transition-name! nimbus storm-name :activate true)
)
(deactivate [this storm-name]
(transition-name! nimbus storm-name :inactivate true))
beginFileUpload
开始上传文件
(beginFileUpload [this]
(let [fileloc (str (inbox nimbus) "/stormjar-" (uuid) ".jar")];;获得文件目录
(.put (:uploaders nimbus);;通过uploaders上传文件
fileloc
(Channels/newChannel (FileOutputStream. fileloc)))
(log-message "Uploading file from client to " fileloc)
fileloc
))
uploadChunk
上传byte流
(^void uploadChunk [this ^String location ^ByteBuffer chunk]
(let [uploaders (:uploaders nimbus)
^WritableByteChannel channel (.get uploaders location)]
(when-not channel
(throw (RuntimeException.
"File for that location does not exist (or timed out)")))
(.write channel chunk)
(.put uploaders location channel)
))
finishFileUpload
完成文件上传,从uploaders中卸载当前location,并关闭channel
(^void finishFileUpload [this ^String location]
(let [uploaders (:uploaders nimbus)
^WritableByteChannel channel (.get uploaders location)]
(when-not channel
(throw (RuntimeException.
"File for that location does not exist (or timed out)")))
(.close channel)
(log-message "Finished uploading file from client: " location)
(.remove uploaders location)
))
beginFileDownload & downloadChunk
与upload相同,beginFileDownload回去downloaders加载并返回认证id。downloadChunk真正根据认证id读取byte流,读取完成后从downloaders卸载。
(^String beginFileDownload [this ^String file]
(let [is (BufferFileInputStream. file)
id (uuid)]
(.put (:downloaders nimbus) id is)
id
))
(^ByteBuffer downloadChunk [this ^String id]
(let [downloaders (:downloaders nimbus)
^BufferFileInputStream is (.get downloaders id)]
(when-not is
(throw (RuntimeException.
"Could not find input stream for that id")))
(let [ret (.read is)]
(.put downloaders id is)
(when (empty? ret)
(.remove downloaders id))
(ByteBuffer/wrap ret)
)))
getNimbusConf & getTopologyConf
获取nimbus配置信息,获取topology配置信息
(^String getNimbusConf [this]
(to-json (:conf nimbus)))
(^String getTopologyConf [this ^String id]
(to-json (try-read-storm-conf conf id)))
getTopology & getUserTopology
获取带系统bolt的topology和获取用户定义的topology
(^StormTopology getTopology [this ^String id]
(system-topology! (try-read-storm-conf conf id) (try-read-storm-topology conf id)))
(^StormTopology getUserTopology [this ^String id]
(try-read-storm-topology conf id))
getClusterInfo
获取整个集群的信息
(^ClusterSummary getClusterInfo [this]
(let [storm-cluster-state (:storm-cluster-state nimbus);;首先获取storm-cluster-state
supervisor-infos (all-supervisor-info storm-cluster-state);;获取所有supervisor信息并组装
;; TODO: need to get the port info about supervisors...
;; in standalone just look at metadata, otherwise just say N/A?
supervisor-summaries (dofor [[id info] supervisor-infos]
(let [ports (set (:meta info)) ;;TODO: this is only true for standalone
]
(SupervisorSummary. (:hostname info)
(:uptime-secs info)
(count ports)
(count (:used-ports info))
id );;组装为SupervisorSummary
))
nimbus-uptime ((:uptime nimbus))
bases (topology-bases storm-cluster-state);;获取存活的topology信息
topology-summaries (dofor [[id base] bases]
(let [assignment (.assignment-info storm-cluster-state id nil)];;获取每一个topology的部署信息
(TopologySummary. id
(:storm-name base)
(->> (:executor->node+port assignment)
keys
(mapcat executor-id->tasks)
count)
(->> (:executor->node+port assignment)
keys
count)
(->> (:executor->node+port assignment)
vals
set
count)
(time-delta (:launch-time-secs base))
(extract-status-str base))
))];;组装成TopologySummary
(ClusterSummary. supervisor-summaries
nimbus-uptime
topology-summaries);;所有信息组装成ClusterSummary
))
getTopologyInfo
获取topology信息
(^TopologyInfo [this ^String storm-id]
(let [storm-cluster-state (:storm-cluster-state nimbus);;首先获取storm-cluster-state
task->component (storm-task-info (try-read-storm-topology conf storm-id) (try-read-storm-conf conf storm-id));;获得task与component的映射
base (.storm-base storm-cluster-state storm-id nil);;获取stormbase
assignment (.assignment-info storm-cluster-state storm-id nil);;获取部署信息
beats (.executor-beats storm-cluster-state storm-id (:executor->node+port assignment));;获取心跳信息
all-components (-> task->component reverse-map keys);;获取所有component
errors (->> all-components
(map (fn [c] [c (get-errors storm-cluster-state storm-id c)]));;收集错误信息
(into {}))
executor-summaries (dofor [[executor [node port]] (:executor->node+port assignment)]
(let [host (-> assignment :node->host (get node))
heartbeat (get beats executor)
stats (:stats heartbeat)
stats (if stats
(stats/thriftify-executor-stats stats))]
(doto
(ExecutorSummary. (thriftify-executor-id executor)
(-> executor first task->component)
host
port
(nil-to-zero (:uptime heartbeat)))
(.set_stats stats))
))
]
(TopologyInfo. storm-id
(:storm-name base)
(time-delta (:launch-time-secs base))
executor-summaries
(extract-status-str base)
errors
)
))
storm-cluster-state和zk
任何状态信息,以及整个集群信息基本都存在zk上,而nimbus主要通过storm-cluster-state与zk交互,接下来主要研究一下storm-cluster-state中到底是如何维护状态信息的。
storm-cluster-state会在构建nimbus-data的时候创建:
(defn mk-storm-cluster-state
[cluster-state-spec]
(let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
[false cluster-state-spec]
[true (mk-distributed-cluster-state cluster-state-spec)]);;根据传入的cluster-state-spec是否继承了ClusterState来判断是否是分布式的还是单机的,分布式的cluster-state会与zk做交互.
assignment-info-callback (atom {})
supervisors-callback (atom nil)
assignments-callback (atom nil)
storm-base-callback (atom {})
state-id (register;;cluster-state的register方法会向callbacks变量注册回调函数,注册的回调函数会根据type和path来决定是否被调用
cluster-statecallbacks
(fn [type path]
(let [[subtree & args] (tokenize-path path)]
(condp = subtree
ASSIGNMENTS-ROOT (if (empty? args)
(issue-callback! assignments-callback)
(issue-map-callback! assignment-info-callback (first args)))
SUPERVISORS-ROOT (issue-callback! supervisors-callback)
STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
;; this should never happen
(halt-process! 30 "Unknown callback for subtree " subtree args)))))]
(doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]]
(mkdirs cluster-state p));;分别创建目录
(reify
StormClusterState
...
)))
在 mk-storm-cluster-state中实现了StormClusterState协议:
assignments
如果callback不为空,会重新设定assignments-callback,之后会根据ASSIGNMENTS-SUBTREE获取zk的子节点,并执行callback
(assignments
[this callback]
(when callback
(reset! assignments-callback callback))
(get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))
assignment-info
会在assignment-info-callback中设置topologyId与callback的映射,之后会根据assignment-path storm-id从zk中获取节点信息,如果有相关信息则进行反序列化
(assignment-info
[this storm-id callback]
(when callback
(swap! assignment-info-callback assoc storm-id callback))
(maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback))))
active-storms
获取所有存活的topology
(active-storms
[this]
(get-children cluster-state STORMS-SUBTREE false))
heartbeat-storms
获取在发送心跳的topology
(heartbeat-storms
[this]
(get-children cluster-state WORKERBEATS-SUBTREE false))
error-topologies
获取发生错误的topology
(error-topologies
[this]
(get-children cluster-state ERRORS-SUBTREE false))
get-worker-heartbeat
获得某一个topology的心跳信息
(get-worker-heartbeat
[this storm-id node port]
(-> cluster-state
(get-data (workerbeat-path storm-id node port) false)
executor-beats
获取topology心跳数据并转化成executor心跳数据
(executor-beats
[this storm-id executor->node+port]
;; need to take executor->node+port in explicitly so that we don't run into a situation where a
;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
;; we avoid situations like that
(let [node+port->executors (reverse-map executor->node+port)
all-heartbeats (for [[[node port] executors] node+port->executors]
(->> (get-worker-heartbeat this storm-id node port)
(convert-executor-beats executors)
))]
(apply merge all-heartbeats)))
supervisors
获取所有supervisor,并重设callback
(supervisors
[this callback]
(when callback
(reset! supervisors-callback callback))
(get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback)))
supervisor-info
获取某个supervisor的信息
(supervisor-info
[this supervisor-id]
(maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false)))
worker-heartbeat!
获取某个worker心跳信息
(worker-heartbeat!
[this storm-id node port info]
(set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info)))
remove-worker-heartbeat!
删除某个worker的心跳信息
(remove-worker-heartbeat!
[this storm-id node port]
(delete-node cluster-state (workerbeat-path storm-id node port)))
setup-heartbeats! & teardown-heartbeats!
创建和删除topology心跳信息
(setup-heartbeats!
[this storm-id]
(mkdirs cluster-state (workerbeat-storm-root storm-id)))
(teardown-heartbeats!
[this storm-id]
(try-cause
(delete-node cluster-state (workerbeat-storm-root storm-id))
(catch KeeperException e
(log-warn-error e "Could not teardown heartbeats for " storm-id))))
teardown-topology-errors!
删除topology的错误信息
(teardown-topology-errors!
[this storm-id]
(try-cause
(delete-node cluster-state (error-storm-root storm-id))
(catch KeeperException e
(log-warn-error e "Could not teardown errors for " storm-id))))
supervisor-heartbeat!
设置supervisor的心跳信息节点
(supervisor-heartbeat!
[this supervisor-id info]
(set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info)))
activate-storm!
在topology对应的饿节点上设置stormbase信息
(activate-storm!
[this storm-id storm-base]
(set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base)))
update-storm!
更新topology的stormbase信息,把新信息与就信息进行合并
(update-storm!
[this storm-id new-elems]
(let [base (storm-base this storm-id nil)
executors (:component->executors base)
new-elems (update new-elems :component->executors (partial merge executors))]
(set-data cluster信息-state (storm-path storm-id)
(-> base
(merge new-elems)
Utils/serialize))))
storm-base
获取topology的stormbase
(storm-base
[this storm-id callback]
(when callback
(swap! storm-base-callback assoc storm-id callback))
(maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback))))
remove-storm-base!
移除topology的stormbase
(remove-storm-base!
[this storm-id]
(delete-node cluster-state (storm-path storm-id)))
set-assignment!
设置topology的assignment
(set-assignment!
[this storm-id info]
(set-data cluster-state (assignment-path storm-id) (Utils/serialize info)))
remove-storm!
删除topology及他相关的stormbase
(remove-storm!
[this storm-id]
(delete-node cluster-state (assignment-path storm-id))
(remove-storm-base! this storm-id))
report-error
上报一条error信息,并删除老的error信息
(report-error
[this storm-id component-id error]
(let [path (error-path storm-id component-id)
data {:time-secs (current-time-secs) :error (stringify-error error)}
_ (mkdirs cluster-state path)
_ (create-sequential cluster-state (str path "/e") (Utils/serialize data))
to-kill (->> (get-children cluster-state path false)
(sort-by parse-error-path)
reverse
(drop 10))]
(doseq [k to-kill]
(delete-node cluster-state (str path "/" k)))))
errors
获得某个topology中某个component的error信息
(errors
[this storm-id component-id]
(let [path (error-path storm-id component-id)
_ (mkdirs cluster-state path)
children (get-children cluster-state path false)
errors (dofor [c children]
(let [data (-> (get-data cluster-state (str path "/" c) false)
maybe-deserialize)]
(when data
(struct TaskError (:error data) (:time-secs data))
)))
]
(->> (filter not-nil? errors)
(sort-by (comp - :time-secs)))))
disconnect
unregister某个topology的callback信息,如果是solo模式,则关闭这个cluster-state
(disconnect
[this]
(unregister cluster-state state-id)
(when solo?
(close cluster-state))))))
状态转移
本地启动
当我们在本机测试Topology时会用到:
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
所以我非常想了解一下submitTopology方法都做了些什么。深入源码后我发现LocalCluster类是用clojure实现的。
LocalCluster类在ns中定义了如下所示的gen-class,他实现了java中的ILocalCluster
LocalCluster.clj...
(ns backtype.storm.LocalCluster
(:use [backtype.storm testing config])
(:import [java.util Map])
(:gen-class
:init init
:implements [backtype.storm.ILocalCluster]
:constructors {[] [] [java.util.Map] []}
:state state))
init方法中提供了两个构造函数:
LocalCluster.clj...
(defn -init
([]
(let [ret (mk-local-storm-cluster
:daemon-conf
{TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})]
[[] ret]))
([^Map stateMap]
[[] stateMap]))
其中无参的构造函数会调用mk-local-storm-cluster方法返回带着配置信息的map,除此之外还提供java.util.Map类型的构造函数,map会直接设置到state属性中。
做完上述铺垫后,终于可以看看submitTopology的实现了。
LocalCluster.clj...
(defn -submitTopology
[this name conf topology]
(submit-local-topology
(:nimbus (. this state)) name conf topology))
submitTopology方法接收三个参数,name、conf自不必多说,topology是我们通过TopologyBuilder构建的StormTopology。接下来会调用submit-local-topology方法,并把this.state中的nimbus对应的配置和name, conf, topology作为实参。
以为submit-local-topology和其他local方法实际上都是本地测试用的,所以被放在testing.clj中,代码如下:
testing.clj...
(defn submit-local-topology
[nimbus storm-name conf topology]
(when-not (Utils/isValidConf conf)
(throw (IllegalArgumentException. "Topology conf is not json-serializable")))
(.submitTopology nimbus storm-name nil (to-json conf) topology))
这个方法校验了conf,如果校验失败,抛出IllegalArgumentException异常,如果正常,则把conf转成json之后调用java类Nimbus中的Thrift服务submitTopology提交Topology。