博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm-源码分析- Storm中Zookeeper的使用
阅读量:6952 次
发布时间:2019-06-27

本文共 8242 字,大约阅读时间需要 27 分钟。

在backtype.storm.cluster.clj中, 定义了storm对于Zookeeper的使用

 

ClusterState

首先定义操作Zookeeper集群的interface

(defprotocol ClusterState  (set-ephemeral-node [this path data])  (delete-node [this path])  (create-sequential [this path data])  (set-data [this path data])  ;; if node does not exist, create persistent with this data   (get-data [this path watch?])  (get-children [this path watch?])  (mkdirs [this path])  (close [this])  (register [this callback])  (unregister [this id])  )

实现和生成用于操作Zookeeper集群的record 

首先创建zk-client, 并在zk上创建STORM-ZOOKEEPER-ROOT目录 
接着定义, 
    callbacks, callback集合 
    active, 标志zk集群状态 
    zk, zk client

创建zk client的时候, 设置了watcher, 即zk server当状态发生变化时会给client发送event, 此处client设置的watcher会调用callbacks来处理server发送的event

Storm在操作Zookeeper时, 使用CuratorFramework()

最后实现ClusterState protocol, 其中register和unregister是用来添加/删除callbacks的, 其他都是些zk的常规操作

(defn mk-distributed-cluster-state [conf]  (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)]    (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))    (.close zk))  (let [callbacks (atom {})        active (atom true)        zk (zk/mk-client conf                         (conf STORM-ZOOKEEPER-SERVERS)                         (conf STORM-ZOOKEEPER-PORT)                         :auth-conf conf                         :root (conf STORM-ZOOKEEPER-ROOT)                         :watcher (fn [state type path]                                     (when @active                                       (when-not (= :connected state)                                         (log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))                                       (when-not (= :none type)                                         (doseq [callback (vals @callbacks)]                                           (callback type path))))                                       ))]    (reify     ClusterState     (register [this callback]               (let [id (uuid)]                 (swap! callbacks assoc id callback)                 id                 ))     (unregister [this id]                 (swap! callbacks dissoc id))     (set-ephemeral-node [this path data]                         (zk/mkdirs zk (parent-path path))                         (if (zk/exists zk path false)                           (try-cause                             (zk/set-data zk path data) ; should verify that it's ephemeral                             (catch KeeperException$NoNodeException e                               (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")                               (zk/create-node zk path data :ephemeral)                               ))                           (zk/create-node zk path data :ephemeral)                           ))          (create-sequential [this path data]       (zk/create-node zk path data :sequential))          (set-data [this path data]               ;; note: this does not turn off any existing watches               (if (zk/exists zk path false)                 (zk/set-data zk path data)                 (do                   (zk/mkdirs zk (parent-path path))                   (zk/create-node zk path data :persistent)                   )))          (delete-node [this path]                  (zk/delete-recursive zk path)                  )          (get-data [this path watch?]               (zk/get-data zk path watch?)               )          (get-children [this path watch?]                   (zk/get-children zk path watch?))          (mkdirs [this path]             (zk/mkdirs zk path))          (close [this]            (reset! active false)            (.close zk))     )))

 

StormClusterState

定义针对Storm定制的zk操作协议, 包含各种storm里面的信息在zk上的读写

(defprotocol StormClusterState  (assignments [this callback])  (assignment-info [this storm-id callback])  (active-storms [this])  (storm-base [this storm-id callback])  (get-worker-heartbeat [this storm-id node port])  (executor-beats [this storm-id executor->node+port])  (supervisors [this callback])  (supervisor-info [this supervisor-id])  ;; returns nil if doesn't exist  (setup-heartbeats! [this storm-id])  (teardown-heartbeats! [this storm-id])  (teardown-topology-errors! [this storm-id])  (heartbeat-storms [this])  (error-topologies [this])  (worker-heartbeat! [this storm-id node port info])  (remove-worker-heartbeat! [this storm-id node port])  (supervisor-heartbeat! [this supervisor-id info])  (activate-storm! [this storm-id storm-base])  (update-storm! [this storm-id new-elems])  (remove-storm-base! [this storm-id])  (set-assignment! [this storm-id info])  (remove-storm! [this storm-id])  (report-error [this storm-id task-id error])  (errors [this storm-id task-id])  (disconnect [this])  )

首先判断是否第一次mk-storm-cluster-state, 既是否进行过zk cluster state的创建, 如果没有调用mk-distributed-cluster-state 

接着, 定义一系列的callbacks, 并调用cluster-state的register, 注册到callbacks列表中 
       state-id 就是register返回的callback的uuid 
再者, 在zk上创建storm的子目录 
最后, 实现StormClusterState协议, 实现各种zk数据的读写

(defn mk-storm-cluster-state [cluster-state-spec]  (let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)                                [false cluster-state-spec]                                [true (mk-distributed-cluster-state cluster-state-spec)])        assignment-info-callback (atom {})        supervisors-callback (atom nil)        assignments-callback (atom nil) ;在StormClusterState.assignments中被set        storm-base-callback (atom {})        state-id (register                  cluster-state                  (fn [type path]                    (let [[subtree & args] (tokenize-path path)] ;将path按'/'分割                      (condp = subtree ;对path的subtree部分进行swith…case                          ASSIGNMENTS-ROOT (if (empty? args)                                             (issue-callback! assignments-callback) ;issue-callback!, 执行并删除该callback, 保证callback只被执行一次                                             (issue-map-callback! assignment-info-callback (first args)))                          SUPERVISORS-ROOT (issue-callback! supervisors-callback)                          STORMS-ROOT (issue-map-callback! storm-base-callback (first args))                          ;; this should never happen                          (halt-process! 30 "Unknown callback for subtree " subtree args)                          )                      )))]    (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]]      (mkdirs cluster-state p))    (reify     StormClusterState     )

 

例子

通过一个场景来说明storm怎样使用zookeeper

supervisor中的mk-synchronize-supervisor, 主要用于下载新的, 并删除不使用的topology代码 

所以这个逻辑光执行一次是不够的, 需要当每次assignment发生变化的时候就执行一次

storm是利用zookeeper的watcher来解决这个问题 

1. 在mk-distributed-cluster-state中创建zk client的时候配置watcher, 当收到zk server的event的时候, 调用callbacks列表里面的callback进行处理

2. 在mk-storm-cluster-state 中将callback加入cluster-state的callback列表 

    而这个callback本身, 就是根据event中的path(代表哪部分数据发生change)来issue在storm-cluster-state中维护的一系列callback 

    比如, 当ASSIGNMENTS-ROOT发生变化时, 会调用assignments-callback  

3. 那么也就是说只需要将mk-synchronize-supervisor, set到assignments-callback, 就可以保证当ASSIGNMENTS-ROOT发生变化时, 调用mk-synchronize-supervisor去同步topology代码 

   什么时候set? 在第一次调用mk-synchronize-supervisor的时候 

sync-callback (fn [& ignored] (.add event-manager this))assignments-snapshot (assignments-snapshot storm-cluster-state sync-callback)

   同步topology代码是消耗时间的事情, 所以实现的时候放在后台执行, 只是将this(function) add到event-manager的queue里面, 后台线程会执行这个函数 

   并且在调用assignment获取assignments-snapshot的时候, 将sync-callback set到assignments-callback中去

(assignments [this callback]        (when callback          (reset! assignments-callback callback))        (get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))

By the way, 对于get-children, 是否有callback, 即是否被watch, 读的数据是不一样的, 具体原因不是很清楚, 需要后面看看zk的具体使用

(defn get-children [^CuratorFramework zk ^String path watch?]  (if watch?    (.. zk (getChildren) (watched) (forPath (normalize-path path)))    (.. zk (getChildren) (forPath (normalize-path path)))))

4. 前面说了issue-callback!在执行assignments-callback之前, 会将其清空, 所以如果需要不断的触发, 那么就要不断的设置assignments-callback 

    所以作为callback, mk-synchronize-supervisor会先通过assignments-snapshot去重设assignments-callback 
    至于为什么要采用这样的机制? 现在还看不清楚

本文章摘自博客园,原文发布日期:2013-06-26 

转载地址:http://mjyil.baihongyu.com/

你可能感兴趣的文章
JAVA 中一个非常轻量级只有 200k 左右的 RESTful 路由框架
查看>>
2018.8.5 复习笔记
查看>>
【转】 DOTA2中的伪随机及其lua实现
查看>>
A*算法、导航网格、路径点寻路对比(A-Star VS NavMesh VS WayPoint)
查看>>
sys
查看>>
webSQL 实现即时通讯
查看>>
Monkey学习笔记<三>:Monkey脚本编写
查看>>
tomcat监听activemq jms配置
查看>>
页面中引入js的几种方法
查看>>
linux CentOS 系统下如何将php和mysql命令加入到环境变量中
查看>>
python3连接redis
查看>>
android获取用户点击的坐标
查看>>
IT工作十年总结之14个单据通用字段
查看>>
sys.dm_db_wait_stats
查看>>
冲刺阶段站立会议每天任务6
查看>>
BZOJ 5261 Rhyme
查看>>
LINQ
查看>>
1042 Shuffling Machine
查看>>
Weblogic配置和部署
查看>>
font: 12px/1.5 Tahoma, Helvetica, Arial, sans-serif;
查看>>