Storm-源码分析-Topology Submit-Supervisor

mk-supervisor

(defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor]
  (log-message "Starting Supervisor with conf " conf)
  (.prepare isupervisor conf (supervisor-isupervisor-dir conf)) ;;初始化supervisor-id,并存在localstate中(参考ISupervisor的实现)
(FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf))) ;;清空本机的supervisor目录 (let [supervisor (supervisor-data conf shared-context isupervisor)
;;创建两个event-manager,用于在后台执行function [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)] sync-processes (partial sync-processes supervisor) ;;partial sync-process ;;mk-synchronize-supervisor, mk-supervisor的主要工作,参考下面
synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager) ;;定义生成supervisor hb的funciton
heartbeat-fn (fn [] (.supervisor-heartbeat! (:storm-cluster-state supervisor) (:supervisor-id supervisor) (SupervisorInfo. (current-time-secs) (:my-hostname supervisor) (:assignment-id supervisor) (keys @(:curr-assignment supervisor)) ;; used ports (.getMetadata isupervisor) (conf SUPERVISOR-SCHEDULER-META) ((:uptime supervisor)))))] ;;先调用heartbeat-fn发送一次supervisor的hb
    ;;接着使用schedule-recurring去定期调用heartbeat-fn更新hb
    (heartbeat-fn)
    ;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
    (schedule-recurring (:timer supervisor)
                        0
                        (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
                        heartbeat-fn))
 

mk-synchronize-supervisor

supervisor很简单, 主要管两件事,
当assignment发生变化时, 从nimbus同步topology的代码到本地
当assignment发生变化时, check workers状态, 保证被分配的work的状态都是valid

两个需求,
1. 当assignment发生变化时触发
    怎样通过zookeeper的watcher实现这个反复触发机制, 参考
Storm-源码分析- Storm中Zookeeper的使用

2. 因为比较耗时, 后台执行
    创建两个event-manager, 分别用于后台执行mk-synchronize-supervisor和sync-processes

mk-synchronize-supervisor, 比较特别的是内部用了一个有名字的匿名函数this来封装这个函数体
刚开始看到非常诧异, 其实目的是为了可以在sync-callback中将这个函数add到event-manager里面去
即每次被调用, 都需要再一次把sync-callback注册到zk, 以保证下次可以被继续触发

 

(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
  (fn this []
    (let [conf (:conf supervisor)
          storm-cluster-state (:storm-cluster-state supervisor)
          ^ISupervisor isupervisor (:isupervisor supervisor)
          ^LocalState local-state (:local-state supervisor) ;;本地缓存数据库
          sync-callback (fn [& ignored] (.add event-manager this)) ;;生成callback函数(后台执行mk-synchronize-supervisor)
          assignments-snapshot (assignments-snapshot storm-cluster-state sync-callback) ;;读取assignments,并注册callback,在zk->assignment发生变化时被触发
          storm-code-map (read-storm-code-locations assignments-snapshot) ;;从哪儿下载topology code
          downloaded-storm-ids (set (read-downloaded-storm-ids conf)) ;;已经下载了哪些topology
          all-assignment (read-assignments  ;;supervisor的port上被分配了哪些executors
                           assignments-snapshot
                           (:assignment-id supervisor)) ;;supervisor-id
          new-assignment (->> all-assignment ;;new=all,因为confirmAssigned没有具体实现,always返回true
                              (filter-key #(.confirmAssigned isupervisor %)))
          assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment) ;;supervisor上被分配的topology id集合
           existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)] ;;从local-state数据库里面读出当前保存的local assignments
      
      ;;下载新分配的topology代码
      (doseq [[storm-id master-code-dir] storm-code-map]
        (when (and (not (downloaded-storm-ids storm-id))
                   (assigned-storm-ids storm-id))
         (download-storm-code conf storm-id master-code-dir)))     
      (.put local-state     ;;把new-assignment存到local-state数据库中
            LS-LOCAL-ASSIGNMENTS
            new-assignment)
      (reset! (:curr-assignment supervisor) new-assignment) ;;把new-assignment cache到supervisor对象中
       
      ;;删除无用的topology code 
;;remove any downloaded code that's no longer assigned or active (doseq [storm-id downloaded-storm-ids] (when-not (assigned-storm-ids storm-id) (log-message "Removing code for storm id " storm-id) (rmr (supervisor-stormdist-root conf storm-id)) ))
      ;;后台执行sync-processes
      (.add processes-event-manager sync-processes)
      )))

sync-processes

sync-processes用于管理workers, 比如处理不正常的worker或dead worker, 并创建新的workers
首先从本地读出workers的hb, 来判断work状况, shutdown所有状态非valid的workers
并为被assignment, 而worker状态非valid的slot, 创建新的worker

(defn sync-processes [supervisor]
  (let [conf (:conf supervisor)
        ^LocalState local-state (:local-state supervisor)
        assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {})
        now (current-time-secs)
        allocated (read-allocated-workers supervisor assigned-executors now) ;;1.读取当前worker的状况
        keepers (filter-val     ;;找出状态为valid的worker
                 (fn [[state _]] (= state :valid))
                 allocated)
        keep-ports (set (for [[id [_ hb]] keepers] (:port hb))) ;;keepers的ports集合
         ;;select-keys-pred(pred map), 对map中的key使用pred进行过滤
         ;;找出assigned-executors中executor的port, 哪些不属于keep-ports, 
        ;;即找出新被assign的workers或那些虽被assign但状态不是valid的workers(dead或没有start)
;;这些executors需要从新分配到新的worker上去
reassign-executors (select-keys-pred (complement keep-ports) assigned-executors) new-worker-ids (into {} (for [port (keys reassign-executors)] ;;为reassign-executors的port产生新的worker-id [port (uuid)])) ] ;; 1. to kill are those in allocated that are dead or disallowed ;; 2. kill the ones that should be dead ;; - read pids, kill -9 and individually remove file ;; - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) ;; 3. of the rest, figure out what assignments aren't yet satisfied ;; 4. generate new worker ids, write new "approved workers" to LS ;; 5. create local dir for worker id ;; 5. launch new workers (give worker-id, port, and supervisor-id) ;; 6. wait for workers launch (doseq [[id [state heartbeat]] allocated] (when (not= :valid state) ;;shutdown所有状态不是valid的worker (shutdown-worker supervisor id))) (doseq [id (vals new-worker-ids)] (local-mkdirs (worker-pids-root conf id))) ;;为新的worker创建目录, 并加到local-state的LS-APPROVED-WORKERS中 (.put local-state LS-APPROVED-WORKERS ;;更新的approved worker, 状态为valid的 + new workers (merge (select-keys (.get local-state LS-APPROVED-WORKERS) ;;现有approved worker中状态为valid (keys keepers)) (zipmap (vals new-worker-ids) (keys new-worker-ids)) ;;new workers )) (wait-for-workers-launch ;;2.wait-for-workers-launch conf (dofor [[port assignment] reassign-executors] (let [id (new-worker-ids port)] (launch-worker supervisor (:storm-id assignment) port id) id))) ))

1. read-allocated-workers

(defn read-allocated-workers
  "Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)"
  [supervisor assigned-executors now]
  (let [conf (:conf supervisor)
        ^LocalState local-state (:local-state supervisor)
;从local-state中读出每个worker的hb, 当然每个worker进程会不断的更新本地hb id->heartbeat (read-worker-heartbeats conf)
        approved-ids (set (keys (.get local-state LS-APPROVED-WORKERS)))] ;;从local-state读出approved的worker
    (into
     {}
     (dofor [[id hb] id->heartbeat] ;;根据hb来判断worker的当前状态
            (let [state (cond
                         (or (not (contains? approved-ids id))
                             (not (matches-an-assignment? hb assigned-executors)))
                           :disallowed  ;;不被允许
                         (not hb)
                           :not-started ;;无hb,没有start
                         (> (- now (:time-secs hb))
                            (conf SUPERVISOR-WORKER-TIMEOUT-SECS))
                           :timed-out  ;;超时,dead
                         true
                           :valid)]
              (log-debug "Worker " id " is " state ": " (pr-str hb) " at supervisor time-secs " now)
              [id [state hb]] ;;返回每个worker的当前state和hb
              ))
     )))

 

2. wait-for-workers-launch

对reassign-executors中的每个new_work_id调用launch-worker

最终调用wait-for-workers-launch, 等待worder被成功launch

逻辑也比较简单, check hb, 如果没有就不停的sleep, 至到超时, 打印failed to start

(defn- wait-for-worker-launch [conf id start-time]
  (let [state (worker-state conf id)]    
    (loop []
      (let [hb (.get state LS-WORKER-HEARTBEAT)]
        (when (and
               (not hb)
               (<
                (- (current-time-secs) start-time)
                (conf SUPERVISOR-WORKER-START-TIMEOUT-SECS)
                ))
          (log-message id " still hasn't started")
          (Time/sleep 500)
          (recur)
          )))
    (when-not (.get state LS-WORKER-HEARTBEAT)
      (log-message "Worker " id " failed to start")
      )))

(defn- wait-for-workers-launch [conf ids]
  (let [start-time (current-time-secs)]
    (doseq [id ids]
      (wait-for-worker-launch conf id start-time))
    ))

转自:http://www.cnblogs.com/fxjwind/p/3161241
2019-03-02 23:51

知识点

相关教程

更多

Storm-源码分析-Topology Submit-Nimbus

Nimbus Server   Nimbus server, 首先从启动命令开始, 同样是使用storm命令"storm nimbus”来启动  看下源码, 此处和上面client不同, jvmtype="-server", 最终调用"backtype.storm.daemon.nimbus"的main  nimbus是用clojure实现的, 但

Storm-源码分析-Topology Submit-Client

1 Storm Client   最开始使用storm命令来启动topology, 如下    storm jar storm-starter-0.0.1-SNAPSHOT-standalone.jar storm.starter.WordCountTopology   这个storm命令是用python实现的, 看看其中的jar函数, 很简单, 调用exec_storm_class, 其中jvm

Storm-源码分析-Topology Submit-Task

mk-task, 比较简单, 因为task只是概念上的结构, 不象其他worker, executor都需要创建进程或线程  所以其核心其实就是mk-task-data,  1. 创建TopologyContext对象, 其实就是把之前的topology对象和worker-data混合到一起, 便于task在执行时可以取到需要的topology信息.  2. 创建task-object, spou

Storm-源码分析-Topology Submit-Worker

1 mk-worker   和其他的daemon一样, 都是通过defserverfn macro来创建worker    (defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id]  (log-message "Launching worker for " stor

Storm-源码分析-Topology Submit-Executor

在worker中通过executor/mk-executor worker e, 创建每个executor    (defn mk-executor [worker executor-id]  (let [executor-data (mk-executor-data worker executor-id) ;;1.mk-executor-data         _ (log-message &

Storm-源码分析-Topology Submit-Nimbus-mk-assignments

什么是"mk-assignment”, 主要就是产生executor->node+port关系, 将executor分配到哪个node的哪个slot上(port代表slot, 一个slot可以run一个worker进程, 一个worker包含多个executor线程) 先搞清什么是executor, 参考Storm-源码分析- Component ,Executor ,Task之间

Storm-源码分析-Topology Submit-Executor-mk-threads

对于executor thread是整个storm最为核心的代码, 因为在这个thread里面真正完成了大部分工作, 而其他的如supervisor,worker都是封装调用. 对于executor的mk-threads, 是通过mutilmethods对spout和bolt分别定义不同的逻辑 1. Spout Thread    (defmethod mk-threads :spout [exe

Storm-源码分析汇总

Storm Features  Storm 简介 Storm Topology的并发度 Storm - Guaranteeing message processing Storm - Transactional-topologies Twitter Storm – DRPC Storm 多语言支持 Storm Starter  Storm starter - Overview Storm star

Storm-源码分析- Multimethods使用例子

1. storm通过multimethods来区分local和distributed模式  当调用launch-worker的时候, clojure会自动根据defmulti里面定义的fn来判断是调用哪个版本的launch-worker    (defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervis

Storm-源码分析- Storm中Zookeeper的使用

在backtype.storm.cluster.clj中, 定义了storm对于Zookeeper的使用 ClusterState  首先定义操作Zookeeper集群的interface    (defprotocol ClusterState  (set-ephemeral-node [this path data])  (delete-node [this path])  (create-s

storm-0.8.2源码分析之topology启动

topology启动  一个topology的启动包括了三个步骤 1)创建TopologyBuilder,设置输入源,输出源 2)获取config 3)提交topology(这里不考虑LocalCluster本地模式) 以storm.starter的ExclamationTopology为例: public static void main(String[] args)throws Excepti

twitter storm源码走读之3--topology提交过程分析

概要  storm cluster可以想像成为一个工厂,nimbus主要负责从外部接收订单和任务分配。除了从外部接单,nimbus还要将这些外部订单转换成为内部工作分配,这个时候nimbus充当了调度室的角色。supervisor作为中层干部,职责就是生产车间的主任,他的日常工作就是时刻等待着调度到给他下达新的工作。作为车间主任,supervisor领到的活是不用自己亲力亲为去作的,他手下有着一班

Storm-源码分析- spout (backtype.storm.spout)

1. ISpout接口  ISpout作为实现spout的核心interface, spout负责feeding message, 并且track这些message.  如果需要Spout track发出的message, 必须给出message-id, 这个message-id可以是任意类型, 但是如果不指定或将message-id置空, storm就不会track这个message 必须要注意

Storm-源码分析- bolt (backtype.storm.task)

Bolt关键的接口为execute,  Tuple的真正处理逻辑, 通过OutputCollector.emit发出新的tuples, 调用ack或fail处理的tuple    /** * An IBolt represents a component that takes tuples as input and produces tuples * as output. An IBolt can

Storm源码浅析之topology的提交

最近一直在读twitter开源的这个分布式流计算框架——storm的源码,还是有必要记录下一些比较有意思的地方。我按照storm的主要概念进行组织,并且只分析我关注的东西,因此称之为浅析。   一、介绍  Storm的开发语言主要是Java和Clojure,其中Java定义骨架,而Clojure编写核心逻辑。源码统计结果:      180  textfiles.   177  uniquefil

最新教程

更多

java线程状态详解(6种)

java线程类为:java.lang.Thread,其实现java.lang.Runnable接口。 线程在运行过程中有6种状态,分别如下: NEW:初始状态,线程被构建,但是还没有调用start()方法 RUNNABLE:运行状态,Java线程将操作系统中的就绪和运行两种状态统称为“运行状态” BLOCK:阻塞状态,表示线程阻塞

redis从库只读设置-redis集群管理

默认情况下redis数据库充当slave角色时是只读的不能进行写操作,如果写入,会提示以下错误:READONLY You can't write against a read only slave.  127.0.0.1:6382> set k3 111  (error) READONLY You can't write against a read only slave. 如果你要开启从库

Netty环境配置

netty是一个java事件驱动的网络通信框架,也就是一个jar包,只要在项目里引用即可。

Netty基于流的传输处理

​在TCP/IP的基于流的传输中,接收的数据被存储到套接字接收缓冲器中。不幸的是,基于流的传输的缓冲器不是分组的队列,而是字节的队列。 这意味着,即使将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一组字节(有点悲剧)。 因此,不能保证读的是您在远程定入的行数据

Netty入门实例-使用POJO代替ByteBuf

使用TIME协议的客户端和服务器示例,让它们使用POJO来代替原来的ByteBuf。

Netty入门实例-时间服务器

Netty中服务器和客户端之间最大的和唯一的区别是使用了不同的Bootstrap和Channel实现

Netty入门实例-编写服务器端程序

channelRead()处理程序方法实现如下

Netty开发环境配置

最新版本的Netty 4.x和JDK 1.6及更高版本

电商平台数据库设计

电商平台数据库表设计:商品分类表、商品信息表、品牌表、商品属性表、商品属性扩展表、规格表、规格扩展表

HttpClient 上传文件

我们使用MultipartEntityBuilder创建一个HttpEntity。 当创建构建器时,添加一个二进制体 - 包含将要上传的文件以及一个文本正文。 接下来,使用RequestBuilder创建一个HTTP请求,并分配先前创建的HttpEntity。

MongoDB常用命令

查看当前使用的数据库    > db    test  切换数据库   > use foobar    switched to db foobar  插入文档    > post={"title":"领悟书生","content":"这是一个分享教程的网站","date":new

快速了解MongoDB【基本概念与体系结构】

什么是MongoDB MongoDB is a general purpose, document-based, distributed database built for modern application developers and for the cloud era. MongoDB是一个基于分布式文件存储的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。

windows系统安装MongoDB

安装 下载MongoDB的安装包:mongodb-win32-x86_64-2008plus-ssl-3.2.10-signed.msi,按照提示步骤安装即可。 安装完成后,软件会安装在C:\Program Files\MongoDB 目录中 我们要启动的服务程序就是C:\Program Files\MongoDB\Server\3.2\bin目录下的mongod.exe,为了方便我们每次启动,我

Spring boot整合MyBatis-Plus 之二:增删改查

基于上一篇springboot整合MyBatis-Plus之后,实现简单的增删改查 创建实体类 添加表注解TableName和主键注解TableId import com.baomidou.mybatisplus.annotations.TableId;
import com.baomidou.mybatisplus.annotations.TableName;
import com.baom

分布式ID生成器【snowflake雪花算法】

基于snowflake雪花算法分布式ID生成器 snowflake雪花算法分布式ID生成器几大特点: 41bit的时间戳可以支持该算法使用到2082年 10bit的工作机器id可以支持1024台机器 序列号支持1毫秒产生4096个自增序列id 整体上按照时间自增排序 整个分布式系统内不会产生ID碰撞 每秒能够产生26万ID左右 Twitter的 Snowflake分布式ID生成器的JAVA实现方案