Storm Topology的并发度

Understanding the parallelism of a Storm topology

https://github.com/nathanmarz/storm/wiki/Understanding-the-parallelism-of-a-Storm-topology

 

概念

一个Topology可以包含一个或多个worker(并行的跑在不同的machine上), 所以worker process就是执行一个topology的子集, 并且worker只能对应于一个topology

一个worker可用包含一个或多个executor, 每个component (spout或bolt)至少对应于一个executor, 所以可以说executor执行一个compenent的子集, 同时一个executor只能对应于一个component

Task就是具体的处理逻辑对象, 一个executor线程可以执行一个或多个tasks
但一般默认每个executor只执行一个task, 所以我们往往认为task就是执行线程, 其实不然
task代表最大并发度, 一个component的task数是不会改变的, 但是一个componet的executer数目是会发生变化的
当task数大于executor数时, executor数代表实际并发数

A worker process executes a subset of a topology.
A worker process belongs to a specific topology and may run one or more executors for one or more components (spouts or bolts) of this topology.
A running topology consists of many such processes running on many machines within a Storm cluster.

An executor is a thread that is spawned by a worker process. It may run one or more tasks for the same component (spout or bolt).

A task performs the actual data processing — each spout or bolt that you implement in your code executes as many tasks across the cluster.
The number of tasks for a component is always the same throughout the lifetime of a topology, but the number of executors (threads) for a component can change over time. This means that the following condition holds true: #threads ≤ #tasks.
By default, the number of tasks is set to be the same as the number of executors, i.e. Storm will run one task per thread.

image

 

Configuring the parallelism of a topology, 并发度的配置

The following sections give an overview of the various configuration options and how to set them in your code. There is more than one way of setting these options though, and the table lists only some of them.

Storm currently has the following order of precedence for configuration settings:

defaults.yaml < storm.yaml < topology-specific configuration < internal component-specific configuration < external component-specific configuration

 

对于并发度的配置, 在storm里面可以在多个地方进行配置, 优先级如上面所示...
具体包含,

worker processes的数目, 可以通过配置文件和代码中配置, worker就是执行进程, 所以考虑并发的效果, 数目至少应该大于machines的数目

executor的数目, component的并发线程数,只能在代码中配置(通过setBolt和setSpout的参数), 例如, setBolt("green-bolt", new GreenBolt(), 2)

tasks的数目, 可以不配置, 默认和executor1:1, 也可以通过setNumTasks()配置

Number of worker processes

  • Description: How many worker processes to create for the topology across machines in the cluster.
  • Configuration option: TOPOLOGY_WORKERS
  • How to set in your code (examples):

Number of executors (threads)

  • Description: How many executors to spawn per component.
  • Configuration option: ?
  • How to set in your code (examples):

Number of tasks

Here is an example code snippet to show these settings in practice:

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
               .setNumTasks(4)
               .shuffleGrouping("blue-spout);

In the above code we configured Storm to run the bolt GreenBolt with an initial number of two executors and four associated tasks. Storm will run two tasks per executor (thread). If you do not explicitly configure the number of tasks, Storm will run by default one task per executor.

 

Example of a running topology

The following illustration shows how a simple topology would look like in operation.
The topology consists of three components: one spout called BlueSpout and two bolts called GreenBolt and YellowBolt.
The components are linked such that BlueSpout sends its output to GreenBolt, which in turns sends its own output to YellowBolt.

image

 

Config conf = new Config();
conf.setNumWorkers(2); // use two worker processes

topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // set parallelism hint to 2

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2) 
               .setNumTasks(4)                   //set tasks number to 4
               .shuffleGrouping("blue-spout");

topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
               .shuffleGrouping("green-bolt");

StormSubmitter.submitTopology(
        "mytopology",
        conf,
        topologyBuilder.createTopology()
    );

图和代码, 很清晰, 通过setBolt和setSpout一共定义2+2+6=10个executor threads
并且同setNumWorkers设置2个workers, 所以storm会平均在每个worker上run 5个executors
而对于green-bolt, 定义了4个tasks, 所以每个executor中有2个tasks

 

How to change the parallelism of a running topology, 动态的改变并发度

Storm支持在不restart topology的情况下, 动态的改变(增减)worker processes的数目和executors的数目, 称为rebalancing.
通过Storm web UI, 或者通过storm rebalance命令, 见下面的例子

A nifty feature of Storm is that you can increase or decrease the number of worker processes and/or executors without being required to restart the cluster or the topology. The act of doing so is called rebalancing.

You have two options to rebalance a topology:

  1. Use the Storm web UI to rebalance the topology.
  2. Use the CLI tool storm rebalance as described below.

Here is an example of using the CLI tool:

# Reconfigure the topology "mytopology" to use 5 worker processes,
# the spout "blue-spout" to use 3 executors and
# the bolt "yellow-bolt" to use 10 executors.

$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10

转自:http://www.cnblogs.com/fxjwind/archive/2013/05/04/3059514
2019-03-02 23:51

知识点

相关教程

更多

顶 Storm 【技术文档】- 拓扑并发度

有关Executor,worker,Tasks 的关系,请首先参考另外的一篇博文 1 Storm是按照下面三种主要的部分来区分Storm集群之中一个世纪运行的拓扑的: 1 : worker 2:Executor (线程) 3:tasks 下图简单的说明了他们的关系~ ,好的,这是一张重复发表的结构图: 对于上文而言,一个Task,也就是 : 一个Executor 是一个worker 进程生成的一个

storm

引用:http://baike.baidu.com/view/1012011.htm#4 Twitter将Storm正式开源了,这是一个分布式的、容错的实时计算系统,它被托管在GitHub上,遵循 Eclipse Public License 1.0。Storm是由BackType开发的实时处理系统,BackType现在已在Twitter麾下。GitHub上的最新版本是Storm 0.5.2,基本

storm

在http://www.cnblogs.com/panfeng412/archive/2012/11/30/how-to-install-and-deploy-storm-cluster.html Storm集群安装部署步骤【详细版】  作者:大圆那些事| 文章可以转载,请以超链接形式标明文章原始出处和作者信息 网址:http://www.cnblogs.com/panfeng412/archiv

Storm源码浅析之topology的提交

最近一直在读twitter开源的这个分布式流计算框架——storm的源码,还是有必要记录下一些比较有意思的地方。我按照storm的主要概念进行组织,并且只分析我关注的东西,因此称之为浅析。   一、介绍  Storm的开发语言主要是Java和Clojure,其中Java定义骨架,而Clojure编写核心逻辑。源码统计结果:      180  textfiles.   177  uniquefil

Storm入门准备

storm官网:http://storm-project.net/ 1. 入门的最佳途径是阅读GitHub上的官方《Storm Tutorial》。其中讨论了多种Storm概念和抽象,提供了范例代码以便你可以运行一个Storm Topology。开发过程中,可以用本地模式来运行Storm,这样就能在本地开发,在进程中测试Topology。一切就绪后,以远程模式运行Storm,提交用于在集群中运行的

[zz]Storm源码浅析之topology的提交

原文:http://www.blogjava.net/killme2008/archive/2011/11/17/364112.html 作者:dennis (killme2008@gmail.com) 转载请注明出处。 最近一直在读twitter开源的这个分布式流计算框架——storm的源码,还是有必要记录下一些比较有意思的地方。我按照storm的主要概念进行组织,并且只分析我关注的东西,因此称

Storm 简介

https://github.com/nathanmarz/storm/wiki/Documentation 安装和配置  Storm的安装比较简单, 下载storm的release版本, 解压, 并且把bin/目录加到环境变量PATH里面去, 就ok了. 参考配置storm开发环境  当然为了运行Storm, 需要装一些其他的依赖的包, 可以参考Twitter Storm 安装实战 Storm支

Storm入门

Storm框架入门 1 Topology构成  和同样是计算框架的Mapreduce相比,Mapreduce集群上运行的是Job,而Storm集群上运行的是Topology。但是Job在运行结束之后会自行结束,Topology却只能被手动的kill掉,否则会一直运行下去。 Storm集群中有两种节点,一种是控制节点(Nimbus节点),另一种是工作节点(Supervisor节点)。所有Topolo

(二) storm的基本使用

SimpleTopology.java       View Code       import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.topology.TopologyBuilder;import bac

Storm-源码分析-Topology Submit-Client

1 Storm Client   最开始使用storm命令来启动topology, 如下    storm jar storm-starter-0.0.1-SNAPSHOT-standalone.jar storm.starter.WordCountTopology   这个storm命令是用python实现的, 看看其中的jar函数, 很简单, 调用exec_storm_class, 其中jvm

Akka 对比 Storm

转载请注明出处:http://blog.csdn.net/jmppok/article/details/17267585 原文地址:http://www.warski.org/blog/2013/06/akka-vs-storm/ Akka 对比 Storm 最近在工作中用到Twitter的Storm框架,于是将他与另外一个高性能,数据并行处理框架Akka进行了对比. 1.什么是Akka和Stor

storm Tutorial

文章原始地址http://chenlx.blog.51cto.com/4096635/739531    通过这个入门指南,你将学会如何创建storm拓扑(topology)和部署拓扑到storm集群。主要使用Java语言,少许例子使用Python说明Storm的多语言特性。         准备工作    入门指南使用了  storm-start项目中的例子。建议您克隆此项目并跟随这些例子。阅读

Storm重要概念

This page lists the main concepts of Storm and links to resources where you can find more information. The concepts discussed are:       Topologies   Streams   Spouts   Bolts   Stream groupings   Reli

Storm-源码分析-Topology Submit-Nimbus

Nimbus Server   Nimbus server, 首先从启动命令开始, 同样是使用storm命令"storm nimbus”来启动  看下源码, 此处和上面client不同, jvmtype="-server", 最终调用"backtype.storm.daemon.nimbus"的main  nimbus是用clojure实现的, 但

storm-0.8.2源码分析之topology启动

topology启动  一个topology的启动包括了三个步骤 1)创建TopologyBuilder,设置输入源,输出源 2)获取config 3)提交topology(这里不考虑LocalCluster本地模式) 以storm.starter的ExclamationTopology为例: public static void main(String[] args)throws Excepti

最新教程

更多

java线程状态详解(6种)

java线程类为:java.lang.Thread,其实现java.lang.Runnable接口。 线程在运行过程中有6种状态,分别如下: NEW:初始状态,线程被构建,但是还没有调用start()方法 RUNNABLE:运行状态,Java线程将操作系统中的就绪和运行两种状态统称为“运行状态” BLOCK:阻塞状态,表示线程阻塞

redis从库只读设置-redis集群管理

默认情况下redis数据库充当slave角色时是只读的不能进行写操作,如果写入,会提示以下错误:READONLY You can't write against a read only slave.  127.0.0.1:6382> set k3 111  (error) READONLY You can't write against a read only slave. 如果你要开启从库

Netty环境配置

netty是一个java事件驱动的网络通信框架,也就是一个jar包,只要在项目里引用即可。

Netty基于流的传输处理

​在TCP/IP的基于流的传输中,接收的数据被存储到套接字接收缓冲器中。不幸的是,基于流的传输的缓冲器不是分组的队列,而是字节的队列。 这意味着,即使将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一组字节(有点悲剧)。 因此,不能保证读的是您在远程定入的行数据

Netty入门实例-使用POJO代替ByteBuf

使用TIME协议的客户端和服务器示例,让它们使用POJO来代替原来的ByteBuf。

Netty入门实例-时间服务器

Netty中服务器和客户端之间最大的和唯一的区别是使用了不同的Bootstrap和Channel实现

Netty入门实例-编写服务器端程序

channelRead()处理程序方法实现如下

Netty开发环境配置

最新版本的Netty 4.x和JDK 1.6及更高版本

电商平台数据库设计

电商平台数据库表设计:商品分类表、商品信息表、品牌表、商品属性表、商品属性扩展表、规格表、规格扩展表

HttpClient 上传文件

我们使用MultipartEntityBuilder创建一个HttpEntity。 当创建构建器时,添加一个二进制体 - 包含将要上传的文件以及一个文本正文。 接下来,使用RequestBuilder创建一个HTTP请求,并分配先前创建的HttpEntity。

MongoDB常用命令

查看当前使用的数据库    > db    test  切换数据库   > use foobar    switched to db foobar  插入文档    > post={"title":"领悟书生","content":"这是一个分享教程的网站","date":new

快速了解MongoDB【基本概念与体系结构】

什么是MongoDB MongoDB is a general purpose, document-based, distributed database built for modern application developers and for the cloud era. MongoDB是一个基于分布式文件存储的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。

windows系统安装MongoDB

安装 下载MongoDB的安装包:mongodb-win32-x86_64-2008plus-ssl-3.2.10-signed.msi,按照提示步骤安装即可。 安装完成后,软件会安装在C:\Program Files\MongoDB 目录中 我们要启动的服务程序就是C:\Program Files\MongoDB\Server\3.2\bin目录下的mongod.exe,为了方便我们每次启动,我

Spring boot整合MyBatis-Plus 之二:增删改查

基于上一篇springboot整合MyBatis-Plus之后,实现简单的增删改查 创建实体类 添加表注解TableName和主键注解TableId import com.baomidou.mybatisplus.annotations.TableId;
import com.baomidou.mybatisplus.annotations.TableName;
import com.baom

分布式ID生成器【snowflake雪花算法】

基于snowflake雪花算法分布式ID生成器 snowflake雪花算法分布式ID生成器几大特点: 41bit的时间戳可以支持该算法使用到2082年 10bit的工作机器id可以支持1024台机器 序列号支持1毫秒产生4096个自增序列id 整体上按照时间自增排序 整个分布式系统内不会产生ID碰撞 每秒能够产生26万ID左右 Twitter的 Snowflake分布式ID生成器的JAVA实现方案