Storm-源码分析-Topology Submit-Nimbus

Nimbus Server

Nimbus server, 首先从启动命令开始, 同样是使用storm命令"storm nimbus”来启动
看下源码, 此处和上面client不同, jvmtype="-server", 最终调用"backtype.storm.daemon.nimbus"的main
nimbus是用clojure实现的, 但是clojure是基于JVM的, 所以在最终发布的时候会产生nimbus.class, 所以在用户使用的时候完全可以不知道clojure, 看上去所有都是Java
clojure只是用于提高开发效率而已.

def nimbus():
    """Syntax: [storm nimbus]

Launches the nimbus daemon. This command should be run under
supervision with a tool like daemontools or monit.

See Setting up a Storm cluster for more information.
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
"""
    cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"]
    childopts = confvalue("nimbus.childopts", cppaths) + " -Dlogfile.name=nimbus.log -Dlog4j.configuration=storm.log.properties"
    exec_storm_class(
        "backtype.storm.daemon.nimbus",
        jvmtype="-server",
        extrajars=cppaths,
        childopts=childopts)

launch-server!

来看看nimbus的main, 最终会调到launch-server!, conf参数是调用read-storm-config读出的配置参数, 
而nimbus是INimbus接口(backtype.storm.scheduler.INimbus)的实现, 可以参考standalone-nimbus
(defn -main []
  (-launch (standalone-nimbus)))
(defn -launch [nimbus]
  (launch-server! (read-storm-config) nimbus))

(defn launch-server! [conf nimbus]
  (validate-distributed-mode! conf)
  (let [service-handler (service-handler conf nimbus)
        options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
                    (THsHaServer$Args.)
                    (.workerThreads 64)
                    (.protocolFactory (TBinaryProtocol$Factory.))
                    (.processor (Nimbus$Processor. service-handler))
                    )
       server (THsHaServer. options)]
    (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
    (log-message "Starting Nimbus server...")
    (.serve server)))

1. service-handler

首先定义service-handler,  service-handler前面的定义如下
(defserverfn service-handler [conf inimbus]
    (reify Nimbus$Iface
      ...)
)
这边用到一个macro定义defserverfn, 如下
(defmacro defserverfn [name & body]
  `(let [exec-fn# (fn ~@body)]
    (defn ~name [& args#]0
      (try-cause
        (apply exec-fn# args#)
      (catch InterruptedException e#
        (throw e#))
      (catch Throwable t#
        (log-error t# "Error on initialization of server " ~(str name))
        (halt-process! 13 "Error on initialization")
        )))))
这个macro两个参数, 结合例子, name = service-handler, body就是后面所有的,包括参数和函数体
定义匿名函数 fn[conf inimbus] (……)
定义函数defn service-handler [& args], 里面只是简单的调用fn…使用这个macro和直接定义defn service-handler [conf inimbus]几乎没有啥区别
我有个疑问, 为什么要定义这个无聊的macro, 难道就是为了便于后面的exception处理
在service-handler函数里面最主要就是实现Nimbus$Iface接口(backtype.storm.generated.Nimbus$Iface, $在class文件里面就是这样写的, 应该是java的命名规则)

2. server

生成server option参数, 使用TNonblockingServerSocket, 定义的work thread数目, 使用的protocol和使用的processor
其中processor, 是server上主要的处理进程, 使用传入的service-handler进行数据处理
最终启动nimbus server.
 

Nimbus$Iface

Nimbus server已经启动, 剩下就是处理从client传来的RPC调用, 关键就是Nimbus$Iface的实现

在下面的实现中总是用到nimbus这个变量, nimbus-data, 用于存放nimbus相关配置和全局的参数

let [nimbus (nimbus-data conf inimbus)]
(defn nimbus-data [conf inimbus]
  (let [forced-scheduler (.getForcedScheduler inimbus)]
    {:conf conf
     :inimbus inimbus
     :submitted-count (atom 0) ;记录多少topology被submit
     :storm-cluster-state (cluster/mk-storm-cluster-state conf) ;抽象Zookeeper接口(Zookeeper用于存放cluster state)
     :submit-lock (Object.) ;创建锁对象,用于各个topology之间的互斥操作, 比如建目录
     :heartbeats-cache (atom {}) ;记录各个Topology的heartbeats的cache
     :downloaders (file-cache-map conf)
     :uploaders (file-cache-map conf)
     :uptime (uptime-computer)
     :validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
     :timer (mk-timer :kill-fn (fn [t]
                                 (log-error t "Error when processing event")
                                 (halt-process! 20 "Error when processing an event")
                                 ))
     :scheduler (mk-scheduler conf inimbus)
     }))


接着重点看下submitTopology,
4个参数,
^String storm-name, storm名字
^String uploadedJarLocation, 上传Jar的目录 
^String serializedConf, 序列化过的Conf信息
^StormTopology topology, topology对象(thrift对象), 由topologyBuilder产生

(^void submitTopology
        [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
        (try
          (validate-topology-name! storm-name) ;;名字起的是否符合规范
          (check-storm-active! nimbus storm-name false) ;;check是否active
          (.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) ;;调用用户定义的validator.validate
                     storm-name
                     (from-json serializedConf)
                     topology)
          (swap! (:submitted-count nimbus) inc) ;;submitted-count加1, 表示nimbus上submit的topology的数量
          (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs)) ;;生成storm-id
                storm-conf (normalize-conf  ;;转化成json,增加kv,最终生成storm-conf
                            conf
                            (-> serializedConf
                                from-json
                                (assoc STORM-ID storm-id)
                                (assoc TOPOLOGY-NAME storm-name))
                            topology)
                total-storm-conf (merge conf storm-conf)
                topology (normalize-topology total-storm-conf topology) ;;规范化的topology对象
                topology (if (total-storm-conf TOPOLOGY-OPTIMIZE)
                           (optimize-topology topology)
                           topology)
                storm-cluster-state (:storm-cluster-state nimbus)] ;;操作zk的interface
            (system-topology! total-storm-conf topology) ;; this validates the structure of the topology, 1. System-topology!
            (log-message "Received topology submission for " storm-name " with conf " storm-conf)
            ;; lock protects against multiple topologies being submitted at once and
            ;; cleanup thread killing topology in b/w assignment and starting the topology
            (locking (:submit-lock nimbus)
              (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology) ;;2. 建立topology的本地目录
              (.setup-heartbeats! storm-cluster-state storm-id) ;;3. 建立Zookeeper heartbeats
              (start-storm nimbus storm-name storm-id)  ;;4. start-storm
              (mk-assignments nimbus))) ;;5. mk-assignments

          (catch Throwable e
            (log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
            (throw e))))

1. System-topology!

Validate Topology, 比如使用的comonentid, steamid是否合法
添加系统所需要的component, 比如acker等, 不过没有用到, 不知道为什么要调用System-topology!

(system-topology! total-storm-conf topology) ;; this validates the structure of the topology
(defn system-topology! [storm-conf ^StormTopology topology]
  (validate-basic! topology)
  (let [ret (.deepCopy topology)]
    (add-acker! storm-conf ret)
    (add-metric-components! storm-conf ret)    
    (add-system-components! storm-conf ret)
    (add-metric-streams! ret)
    (add-system-streams! ret)
    (validate-structure! ret)
    ret
    ))

2. 建立topology的本地目录 (这步开始需要lock互斥)

Jars and configs are kept on local filesystem because they're too big for Zookeeper. The jar and configs are copied into the path {nimbus local dir}/stormdist/{topology id}

(setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
借用这张图, 比较清晰, 先创建目录, 并将Jar move到当前目录
再将topology对象和conf对象都序列化保存到目录中

image 

3. 建立Zookeeper heartbeats

就是按照下面图示在Zookeeper建立topology的心跳目录

(.setup-heartbeats! storm-cluster-state storm-id)
 
(setup-heartbeats! [this storm-id]
  (mkdirs cluster-state (workerbeat-storm-root storm-id)))

(defn mkdirs [^CuratorFramework zk ^String path]
  (let [path (normalize-path path)]
    (when-not (or (= path "/") (exists-node? zk path false))
      (mkdirs zk (parent-path path))
      (try-cause
        (create-node zk path (barr 7) :persistent)
        (catch KeeperException$NodeExistsException e
          ;; this can happen when multiple clients doing mkdir at same time
          ))
      )))
image

4. start-storm, 产生StormBase

虽然叫做start-storm, 其实做的事情只是把StormBase结构序列化并放到zookeeper上
这个StormBase和topology对象有什么区别,
topology对象, topology的静态信息, 包含components的详细信息和之间的拓扑关系, 内容比较多所以存储在磁盘上stormcode.ser
而StormBase, topology的动态信息, 只记录了launch时间, status, worker数, component的executor数运行态数据, 比较mini, 所以放在zk上

(defn- start-storm [nimbus storm-name storm-id]
  (let [storm-cluster-state (:storm-cluster-state nimbus)
        conf (:conf nimbus)
        storm-conf (read-storm-conf conf storm-id)
        topology (system-topology! storm-conf (read-storm-topology conf storm-id))
        num-executors (->> (all-components topology) (map-val num-start-executors))]
    (log-message "Activating " storm-name ": " storm-id)
    (.activate-storm! storm-cluster-state
                      storm-id
                      (StormBase. storm-name
                                  (current-time-secs)
                                  {:type :active}
                                  (storm-conf TOPOLOGY-WORKERS)
                                  num-executors))))

;; component->executors is a map from spout/bolt id to number of executors for that component
(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors])
 
struct ComponentCommon {
  1: required map<GlobalStreamId, Grouping> inputs;
  2: required map<string, StreamInfo> streams; //key is stream id
  3: optional i32 parallelism_hint; //how many threads across the cluster should be dedicated to this component
  4: optional string json_conf;
}

重上面可以看出StormBase是定义的一个record, 包含storm-name, 当前时间戳, topology的初始状态(active或inactive), worker数目, 和executor的数目
其中计算num-executors, 使用->>, 其实等于(map-val num-start-executors (all-components topology)), map-value就是对(k,v)中的value执行num-start-executors, 而这个函数其实就是去读ComponentCommon里面的parallelism_hint, 所以num-executors, 描述每个component需要几个executors(线程)

(activate-storm! [this storm-id storm-base]
  (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base))
  )
(defn storm-path [id]
  (str STORMS-SUBTREE "/" id)) ;/storms/id
 
(defn set-data [^CuratorFramework zk ^String path ^bytes data]
  (.. zk (setData) (forPath (normalize-path path) data)))

最终调用activate-storm!将storm-base序列化后的数据存到Zookeeper的"/storms/id”目录下
image

 

5. mk-assignments

Storm-源码分析-Topology Submit-Nimbus-mk-assignments


转自:http://www.cnblogs.com/fxjwind/p/3144256
2019-03-02 23:51

知识点

相关教程

更多

Storm-源码分析-Topology Submit-Nimbus-mk-assignments

什么是"mk-assignment”, 主要就是产生executor->node+port关系, 将executor分配到哪个node的哪个slot上(port代表slot, 一个slot可以run一个worker进程, 一个worker包含多个executor线程) 先搞清什么是executor, 参考Storm-源码分析- Component ,Executor ,Task之间

Storm-源码分析-Topology Submit-Client

1 Storm Client   最开始使用storm命令来启动topology, 如下    storm jar storm-starter-0.0.1-SNAPSHOT-standalone.jar storm.starter.WordCountTopology   这个storm命令是用python实现的, 看看其中的jar函数, 很简单, 调用exec_storm_class, 其中jvm

Storm-源码分析-Topology Submit-Task

mk-task, 比较简单, 因为task只是概念上的结构, 不象其他worker, executor都需要创建进程或线程  所以其核心其实就是mk-task-data,  1. 创建TopologyContext对象, 其实就是把之前的topology对象和worker-data混合到一起, 便于task在执行时可以取到需要的topology信息.  2. 创建task-object, spou

Storm-源码分析-Topology Submit-Executor

在worker中通过executor/mk-executor worker e, 创建每个executor    (defn mk-executor [worker executor-id]  (let [executor-data (mk-executor-data worker executor-id) ;;1.mk-executor-data         _ (log-message &

Storm-源码分析-Topology Submit-Worker

1 mk-worker   和其他的daemon一样, 都是通过defserverfn macro来创建worker    (defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id]  (log-message "Launching worker for " stor

Storm-源码分析-Topology Submit-Supervisor

mk-supervisor    (defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor]  (log-message "Starting Supervisor with conf " conf)  (.prepare isupervisor conf (supervisor-isuper

Storm-源码分析-Topology Submit-Executor-mk-threads

对于executor thread是整个storm最为核心的代码, 因为在这个thread里面真正完成了大部分工作, 而其他的如supervisor,worker都是封装调用. 对于executor的mk-threads, 是通过mutilmethods对spout和bolt分别定义不同的逻辑 1. Spout Thread    (defmethod mk-threads :spout [exe

Storm-源码分析汇总

Storm Features  Storm 简介 Storm Topology的并发度 Storm - Guaranteeing message processing Storm - Transactional-topologies Twitter Storm – DRPC Storm 多语言支持 Storm Starter  Storm starter - Overview Storm star

Storm-源码分析- Thrift的使用

1 IDL  首先是storm.thrift, 作为IDL里面定义了用到的数据结构和service  然后backtype.storm.generated, 存放从IDL通过Thrift自动转化成的Java代码 比如对于nimbus service  在IDL的定义为,     service Nimbus {  void submitTopology(1: string name, 2: str

storm-0.8.2源码分析之topology启动

topology启动  一个topology的启动包括了三个步骤 1)创建TopologyBuilder,设置输入源,输出源 2)获取config 3)提交topology(这里不考虑LocalCluster本地模式) 以storm.starter的ExclamationTopology为例: public static void main(String[] args)throws Excepti

twitter storm源码走读之3--topology提交过程分析

概要  storm cluster可以想像成为一个工厂,nimbus主要负责从外部接收订单和任务分配。除了从外部接单,nimbus还要将这些外部订单转换成为内部工作分配,这个时候nimbus充当了调度室的角色。supervisor作为中层干部,职责就是生产车间的主任,他的日常工作就是时刻等待着调度到给他下达新的工作。作为车间主任,supervisor领到的活是不用自己亲力亲为去作的,他手下有着一班

Storm-源码分析- bolt (backtype.storm.task)

Bolt关键的接口为execute,  Tuple的真正处理逻辑, 通过OutputCollector.emit发出新的tuples, 调用ack或fail处理的tuple    /** * An IBolt represents a component that takes tuples as input and produces tuples * as output. An IBolt can

Storm源码浅析之topology的提交

最近一直在读twitter开源的这个分布式流计算框架——storm的源码,还是有必要记录下一些比较有意思的地方。我按照storm的主要概念进行组织,并且只分析我关注的东西,因此称之为浅析。   一、介绍  Storm的开发语言主要是Java和Clojure,其中Java定义骨架,而Clojure编写核心逻辑。源码统计结果:      180  textfiles.   177  uniquefil

[zz]Storm源码浅析之topology的提交

原文:http://www.blogjava.net/killme2008/archive/2011/11/17/364112.html 作者:dennis (killme2008@gmail.com) 转载请注明出处。 最近一直在读twitter开源的这个分布式流计算框架——storm的源码,还是有必要记录下一些比较有意思的地方。我按照storm的主要概念进行组织,并且只分析我关注的东西,因此称

Storm-源码分析- metric

首先定义一系列metric相关的interface, IMetric, IReducer, ICombiner (backtype.storm.metric.api) 在task中, 创建一系列builtin-metrics, (backtype.storm.daemon.builtin-metrics), 并注册到topology context里面 task会不断的利用如spout-acked

最新教程

更多

java线程状态详解(6种)

java线程类为:java.lang.Thread,其实现java.lang.Runnable接口。 线程在运行过程中有6种状态,分别如下: NEW:初始状态,线程被构建,但是还没有调用start()方法 RUNNABLE:运行状态,Java线程将操作系统中的就绪和运行两种状态统称为“运行状态” BLOCK:阻塞状态,表示线程阻塞

redis从库只读设置-redis集群管理

默认情况下redis数据库充当slave角色时是只读的不能进行写操作,如果写入,会提示以下错误:READONLY You can't write against a read only slave.  127.0.0.1:6382> set k3 111  (error) READONLY You can't write against a read only slave. 如果你要开启从库

Netty环境配置

netty是一个java事件驱动的网络通信框架,也就是一个jar包,只要在项目里引用即可。

Netty基于流的传输处理

​在TCP/IP的基于流的传输中,接收的数据被存储到套接字接收缓冲器中。不幸的是,基于流的传输的缓冲器不是分组的队列,而是字节的队列。 这意味着,即使将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一组字节(有点悲剧)。 因此,不能保证读的是您在远程定入的行数据

Netty入门实例-使用POJO代替ByteBuf

使用TIME协议的客户端和服务器示例,让它们使用POJO来代替原来的ByteBuf。

Netty入门实例-时间服务器

Netty中服务器和客户端之间最大的和唯一的区别是使用了不同的Bootstrap和Channel实现

Netty入门实例-编写服务器端程序

channelRead()处理程序方法实现如下

Netty开发环境配置

最新版本的Netty 4.x和JDK 1.6及更高版本

电商平台数据库设计

电商平台数据库表设计:商品分类表、商品信息表、品牌表、商品属性表、商品属性扩展表、规格表、规格扩展表

HttpClient 上传文件

我们使用MultipartEntityBuilder创建一个HttpEntity。 当创建构建器时,添加一个二进制体 - 包含将要上传的文件以及一个文本正文。 接下来,使用RequestBuilder创建一个HTTP请求,并分配先前创建的HttpEntity。

MongoDB常用命令

查看当前使用的数据库    > db    test  切换数据库   > use foobar    switched to db foobar  插入文档    > post={"title":"领悟书生","content":"这是一个分享教程的网站","date":new

快速了解MongoDB【基本概念与体系结构】

什么是MongoDB MongoDB is a general purpose, document-based, distributed database built for modern application developers and for the cloud era. MongoDB是一个基于分布式文件存储的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。

windows系统安装MongoDB

安装 下载MongoDB的安装包:mongodb-win32-x86_64-2008plus-ssl-3.2.10-signed.msi,按照提示步骤安装即可。 安装完成后,软件会安装在C:\Program Files\MongoDB 目录中 我们要启动的服务程序就是C:\Program Files\MongoDB\Server\3.2\bin目录下的mongod.exe,为了方便我们每次启动,我

Spring boot整合MyBatis-Plus 之二:增删改查

基于上一篇springboot整合MyBatis-Plus之后,实现简单的增删改查 创建实体类 添加表注解TableName和主键注解TableId import com.baomidou.mybatisplus.annotations.TableId;
import com.baomidou.mybatisplus.annotations.TableName;
import com.baom

分布式ID生成器【snowflake雪花算法】

基于snowflake雪花算法分布式ID生成器 snowflake雪花算法分布式ID生成器几大特点: 41bit的时间戳可以支持该算法使用到2082年 10bit的工作机器id可以支持1024台机器 序列号支持1毫秒产生4096个自增序列id 整体上按照时间自增排序 整个分布式系统内不会产生ID碰撞 每秒能够产生26万ID左右 Twitter的 Snowflake分布式ID生成器的JAVA实现方案