getting start with storm 翻译 第二章 part-1
转载请注明出处:http://blog.csdn.net/lonelytrooper/article/details/9960723
第二章 开始
在本章中,我们会建一个storm工程和我们的第一个storm topology。
下述假设你安装了至少1.6版本的Java运行时环境(JRE)。我们推荐使用oracle提供的JRE,可以在这里找到http://www.java.com/downloads/。
操作模式
在我们开始之前,理解storm的操作模式很重要。有两种方式运行storm。
本地模式
在本地模式中,storm topologies运行在本地机器一个单独的JVM中。由于是最简单的查看所有的topology组件一起工作的模式,这种方式被用来开发,测试和调试。在这种模式下,我们可以调整参数,这使得我们可以看到我们的topology在不同的storm配置环境下是怎么运行的。为了以本地模式运行topologies,我们需要下载storm的开发依赖包,其中包含了我们开发和测试topology所需的所有东西。
当我们建立自己的第一个storm工程的时候我们很快就可以看到是怎么回事了。
在本地模式运行topology与在storm集群中运行它是类似的。然而,确保所有的组件线程安全是重要的,因为当它们被部署到远程模式中时,它们可能运行在不同的JVM中或者在不同的物理机器上,这样的话,它们之间没有直接的交流或者内存共享。
本章的所有示例,我们都以本地模式运行。
远程模式
在远程模式中,我们提交topology到storm集群,该集群由许多进程组成,通常运行在不同的机器上。远程模式不显示调试信息,这也是它被认为是生产模式的原因。然而,在一台单独的开发机器上建立storm集群是可能的,并且它被认为是在部署至生产前的一个好方法,这可以确保在生产环境中运行topology时没有任何问题。
你在第六章中可以了解到更多关于远程模式的内容,我会在附录B里展示怎样安装一个集群。
Hello World Storm
在这个工程中,我们会建立一个简单的topology来为单词计数。我们可以把这个工程
认为是storm topologies的“hello world”。然而,它是一个非常强大的topology,因为
它只需要做一些小的改动便可以扩展到几乎无限规模并且,我们甚至可以用它来做一个统计系统。例如,我们可以修改这个项目来找出Twitter上的话题趋势。
为了建立这个topology,我们将使用一个spout来负责读取单词,第一个bolt来标准化单词,
第二个bolt来为单词计数,正如我们在图2-1中看到的那样。
图2-1 开始topology
你可以在https://github.com/storm-book/examples-ch02-getting_started/zipball/master
下载示例源代码的ZIP文件。
如果你使用git(一个分布式的校正控制及源代码管理工具),你可以在你想
要下载的源代码的目录中运行git clone git@github.com:storm-book/examplesch02-
getting_started.git。
验证Java安装
安装环境的第一步是验证你正在运行的java的版本。打开一个终端窗口,运行命令java –version。我们可以看到如下类似的信息:
java -version
java version "1.6.0_26"
Java(TM) SE Runtime Environment(build 1.6.0_26-b03)
Java HotSpot(TM) Server VM (build 20.1-b02,mixed mode)
如果没有,验证下你的java安装。(见http://www.java.com/download/.)
创建工程
为开始这个工程,先建立一个用来存放应用的文件夹(就像对任何的java应用一样)。该文件夹包含工程的源代码。
接着我们需要下载storm的依赖包:一个我们将添加到应用类路径的jar包的集合。你可以用两种方式中的一种做这件事:
﹒下载依赖包,解压,添加到类路径。
﹒使用ApacheMaven
maven是一套软件工程管理的工具。它可以被用来管理软件开发周期中的多个方面,从依赖到发布构建过程。在本书中我们会广泛的使用它。为验证是否已安装了maven,运行命令mvn。如果没有,可以从http://maven.apache.org/download.html下载。尽管使用storm没有必要成为一个maven专家,但是知道maven是怎样工作的基础知识是有帮助的。你可以找到更多信息在Apache Maven的网站(http://maven.apache.org/)。
为了定义工程的结构,我们需要建立一个pom.xml(工程对象模型)文件,该文件描述依赖,包,源码等。我们将使用依赖包及nathanmarz建立的maven库(https://github.com/nathan
marz/)。这些依赖可以在这里找到https://github.com/nathanmarz/storm/wiki/Maven。
storm的maven依赖包引用了在本地模式运行storm所需的所有库函数。
使用这些依赖包,我们可以写一个包含运行topology基本的必要组件的pom.xml文件:
<projectxmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>storm.book</groupId>
<artifactId>Getting-Started</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<compilerVersion>1.6</compilerVersion>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<!-- Repository where we can found the storm dependencies -->
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
<dependencies>
<!-- Storm Dependency -->
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.6.0</version>
</dependency>
</dependencies>
</project>
前几行指定了工程的名字和版本。然后我们添加了一个编译器插件,该插件告诉maven我们的代码应该用Java1.6编译。接下来我们定义库(maven支持同一工程的多个库)。Clojars是storm依赖包所在的库。Maven会自动下载本地模式运行storm所需的所有子依赖包。
应用有如下的结构,典型的maven java工程:
Java下的文件夹包含我们的源代码并且我们会将我们的单词文件放到resources文件夹中来处理。
mkdir –p 建立所有所需的父目录。
建立我们第一个Topology
为建立我们第一个topology,我们要创建运行单词计数的所有的类。或许示例的一些部分在目前阶段不是很清晰,我们将在后边的章节中解释它们。
Spout
WordReader spout是实现了IRichSpout接口的类。我们在第四章会看到更多的细节。WordReader负责读文件并且将每行提供给一个bolt。
一个spout发射一个定义的域的列表。这个架构允许你有多种bolt读取相同的spout流,然后这些bolt定义域供其他的bolt消费等等。
示例2-1包含了这个类的完整代码(我们在示例后分析代码的每个部分)。
Example 2-1.src/main/java/spouts/WordReader.java
package spouts;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
importjava.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class WordReaderimplementsIRichSpout{
private SpoutOutputCollector collector;
private FileReader fileReader;
private booleancompleted=false;
private TopologyContext context;
public booleanisDistributed(){returnfalse;}
public voidack(Object msgId) {
System.out.println("OK:"+msgId);
}
public voidclose(){}
public voidfail(Object msgId) {
System.out.println("FAIL:"+msgId);
}
/**
* The only thing that the methods will do It is emit each
* file line
*/
public voidnextTuple() {
/**
* The nextuple it is called forever, so if we have beenreaded the file
* we will wait and then return
*/
if(completed){
try {
Thread.sleep(1000);
} catch(InterruptedExceptione) {
//Do nothing
}
return;
}
String str;
//Open the reader
BufferedReader reader =newBufferedReader(fileReader);
try{
//Read all lines
while((str=reader.readLine())!=null){
/**
* By each line emmit a new value with the line as a their
*/
this.collector.emit(newValues(str),str);
}
}catch(Exception e){
throw new RuntimeException("Errorreading tuple",e);
}finally{
completed = true;
}
}
/**
* We will create the file and get the collector object
*/
public voidopen(Map conf,TopologyContextcontext,
SpoutOutputCollector collector) {
try {
this.context=context;
this.fileReader=newFileReader(conf.get("wordsFile").toString());
} catch(FileNotFoundExceptione) {
throw new RuntimeException("Errorreading file
["+conf.get("wordFile")+"]");
}
this.collector=collector;
}
/**
* Declare the output field "word"
*/
public voiddeclareOutputFields(OutputFieldsDeclarerdeclarer) {
declarer.declare(newFields("line"));
}
}
在任何spout中都调用的第一个方法是void open(Map conf, TopologyContext
context, SpoutOutputCollector collector)。方法的参数是TopologyContext,它包含了所有的topology数据;conf对象,它在topology定义的时候被创建;SpoutOutputCollector,它使得我们可以发射将被bolt处理的数据。下面的代码是open方法的实现:
public voidopen(Map conf,TopologyContext context,
SpoutOutputCollector collector) {
try {
this.context=context;
this.fileReader=newFileReader(conf.get("wordsFile").toString());
} catch(FileNotFoundException e) {
throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
}
this.collector=collector;
}
在这个方法中,我们也创建了reader,它负责读文件。接着我们需要实现public void nextTuple(),在这个方法里我们发射将被bolt处理的值。在我们的例子中,这个方法读文件并且每行发射一个值。
public voidnextTuple(){
if(completed){
try {
Thread.sleep(1);
} catch(InterruptedException e) {
//Do nothing
}
return;
}
String str;
BufferedReader reader =newBufferedReader(fileReader);
try{
while((str=reader.readLine())!=null){
this.collector.emit(newValues(str));
}
}catch(Exception e){
throw new RuntimeException("Errorreading tuple",e);
}finally{
completed =true;
}
}
Values是ArrayList的一个实现,其中把list的元素传到了构造方法中。
nextTuple()方法在相同的循环中被周期性的调用,正如ack()和fail()方法。当没有工作要做时,必须释放对线程的控制这样其他的方法有机会被调用。所以nextTuple方法的第一行是检查处理是否完成了。如果已经完成,在返回前它会休眠至少一毫秒来降低处理器的负载。如果有工作要做,那么文件的每一行被读取为一个值并且发射。
元组(Tuple)是一个值的命名列表,它可以是任何类型的java对象(只要这个对象是可序列化的)。Storm在缺省的情况下可以序列化常用的类型例如strings,bytearrays,ArrayList,HashMap和HashSet。
转自:http://blog.csdn.net/u011689674/article/details/9960723
知识点
相关教程
更多getting start with storm 翻译 第八章 part-2
转载请注明出处:http://blog.csdn.net/lonelytrooper/article/details/12435641 The Bolts 首先我们看一下该topology中的标准bolts: public class UserSplitterBoltimplementsIBasicBolt{ private static final longserialVersionUID=1
很好的翻译教程
仅作保存 ◎ 好东西应该和众人共享,要谢谢本教程的提供者hyde2457,so_so正受益中! 翻译教程 (Download from: http://readfree.net/bbs/htm_data/27/0507/78870.html) (see from: http://bbs.translators.com.cn/mtsbbs/topic.asp?TOPIC_ID=33750, too
中口翻译
Etiquette to society is what apparel is to the individual. Without apparel men would go in shameful nudity which would surely lead to the corruption of morals; and without etiquette society would be i
Storm入门之第二章
1、准备开始 本章创建一个Storm工程和第一个Storm拓扑结构。 需要提供JER版本在1.6以上,下载地址http://www.java .com/downloads/。 2、操作模式 Storm的操作模式,有两种方式: 本地模式 在本地模式下,Storm拓扑结构运行在本地计算机的单一JVM进程上,这个模式用于开发、测试以及调试,因为这是观察所有组件如何协同工作的最简单方法。在这种模式下
第二章 Spring MVC入门 —— 跟开涛学SpringMVC
Spring Web MVC是一种基于Java的实现了Web MVC设计模式的请求驱动类型的轻量级Web框架,即使用了MVC架构模式的思想,将web层进行职责解耦,基于请求驱动指的就是使用请求-响应模型,框架的目的就是帮助我们简化开发,Spring Web MVC也是要简化我们日常Web开发的。
[翻译][Trident] Storm Trident 教程
英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial ---------------- Trident是在storm基础上,一个以realtime 计算为目标的高度抽象。 它在提供处理大吞吐量数据能力的同时,也提供了低延时分布式查询和有状态流式处理的能力。 如果你对Pig和Cascading这种高级批量处理工具很了解的话,那么
《Spring Security 3》 【第二章】 Spring Security起步(2)
尽管Spring Security的配置可能会很难,但是它的作者是相当为我们着想的,因为他们为我们提供了一种简单的机制来使用它很多的功能并可以此作为起点。以这个为起点,额外的配置能够实现应用的分层次详细的安全控制。
[翻译]大数据处理的趋势-五种开源技术介绍
作者:杨鑫奇 本篇文章是一篇翻译文章,对未来大数据领域的技术进行一些前瞻性的介绍,个人感觉他写的文章还是很好的,推荐的技术也具有的一定的代表性,遂将本篇文章翻译出来,感兴趣的大家能够看看。 大数据领域的处理,我自己本身接触的时间也不长,正式的项目还在开发之中,深受大数据处理方面的吸引,所以也就有写文章的想法的了。 原文链接: http://techcrunch.com/2012/10/27/big
How to Start a Business in 10 Days
With an executive staffing venture about to open, a business loan from the in-laws gnawing at her conscience and a new baby to care for, Michelle Fish was already feeling the pressure. But what really
毛荣贵:英汉翻译的美学视角
英汉翻译的美学视角 毛荣贵 (2004-04-08) 谢海光 (党委宣传部长): 各位同学、各位老师,大家好。今天我们请来的是上海交通大学外国语学院英语系教授,博士生导师毛荣贵老师来进行主题演讲,并在网上进行交流。我们非常欢迎大家在网上收听、收看、点播这个节目。为了使今天的节目做得更好,我们外国语学院很多老师都支持这个节目,都做了精心的安排。其中我们特别高兴地向大家介绍今天特邀的嘉宾主持,“交大讲
Storm 【开发细节】 - geting Start with Storm
packagetest;importjava.io.IOException;importjava.util.Map;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importstorm.copyFromClass.TestWordSpout;importcom.esotericsoftware.minlog.Log;importbackt
storm
在http://www.cnblogs.com/panfeng412/archive/2012/11/30/how-to-install-and-deploy-storm-cluster.html Storm集群安装部署步骤【详细版】 作者:大圆那些事| 文章可以转载,请以超链接形式标明文章原始出处和作者信息 网址:http://www.cnblogs.com/panfeng412/archiv
[翻译]Twitter的实时海量数据 处理方案
首发于:我是买家博客 作者:杨鑫奇 对于实时的海量数据处理方案,最近在看hadoop和storm的比较,以及细看了下nathan marz大侠的storm介绍之后,决定深入,在他的博客中发现了一本他写的big data这本书,遂决定深入研究下big data下的各种的principles,就找资料在slideshare上发现了twitter的nk在2010.4.13的Qcon大会的分享。就决定把这
[翻译][Trident] Trident state原理
原文地址:https://github.com/nathanmarz/storm/wiki/Trident-state ----------------------------- Trident在读写有状态的数据源方面是有着一流的抽象封装的。状态即可以保留在topology的内部,比如说内存和HDFS,也可以放到外部存储当中,比如说Memcached或者Cassandra。这些都是使用同一套Tri
科技英语翻译480句 (五) 让步,目的
五、让步,目的(201――240) 201 Though energy can be neither vreated nor destroyed, it can be transformed from one form to another. 虽然能量不能被创造出来也不能被消灭,但它可以从一种形式转变成另一种形式。 202. In the subsequent years there
最新教程
更多java线程状态详解(6种)
java线程类为:java.lang.Thread,其实现java.lang.Runnable接口。 线程在运行过程中有6种状态,分别如下: NEW:初始状态,线程被构建,但是还没有调用start()方法 RUNNABLE:运行状态,Java线程将操作系统中的就绪和运行两种状态统称为“运行状态” BLOCK:阻塞状态,表示线程阻塞
redis从库只读设置-redis集群管理
默认情况下redis数据库充当slave角色时是只读的不能进行写操作,如果写入,会提示以下错误:READONLY You can't write against a read only slave. 127.0.0.1:6382> set k3 111 (error) READONLY You can't write against a read only slave. 如果你要开启从库
Netty环境配置
netty是一个java事件驱动的网络通信框架,也就是一个jar包,只要在项目里引用即可。
Netty基于流的传输处理
在TCP/IP的基于流的传输中,接收的数据被存储到套接字接收缓冲器中。不幸的是,基于流的传输的缓冲器不是分组的队列,而是字节的队列。 这意味着,即使将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一组字节(有点悲剧)。 因此,不能保证读的是您在远程定入的行数据
Netty入门实例-使用POJO代替ByteBuf
使用TIME协议的客户端和服务器示例,让它们使用POJO来代替原来的ByteBuf。
Netty入门实例-时间服务器
Netty中服务器和客户端之间最大的和唯一的区别是使用了不同的Bootstrap和Channel实现
Netty入门实例-编写服务器端程序
channelRead()处理程序方法实现如下
Netty开发环境配置
最新版本的Netty 4.x和JDK 1.6及更高版本
电商平台数据库设计
电商平台数据库表设计:商品分类表、商品信息表、品牌表、商品属性表、商品属性扩展表、规格表、规格扩展表
HttpClient 上传文件
我们使用MultipartEntityBuilder创建一个HttpEntity。 当创建构建器时,添加一个二进制体 - 包含将要上传的文件以及一个文本正文。 接下来,使用RequestBuilder创建一个HTTP请求,并分配先前创建的HttpEntity。
MongoDB常用命令
查看当前使用的数据库 > db test 切换数据库 > use foobar switched to db foobar 插入文档 > post={"title":"领悟书生","content":"这是一个分享教程的网站","date":new
快速了解MongoDB【基本概念与体系结构】
什么是MongoDB MongoDB is a general purpose, document-based, distributed database built for modern application developers and for the cloud era. MongoDB是一个基于分布式文件存储的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。
windows系统安装MongoDB
安装 下载MongoDB的安装包:mongodb-win32-x86_64-2008plus-ssl-3.2.10-signed.msi,按照提示步骤安装即可。 安装完成后,软件会安装在C:\Program Files\MongoDB 目录中 我们要启动的服务程序就是C:\Program Files\MongoDB\Server\3.2\bin目录下的mongod.exe,为了方便我们每次启动,我
Spring boot整合MyBatis-Plus 之二:增删改查
基于上一篇springboot整合MyBatis-Plus之后,实现简单的增删改查 创建实体类 添加表注解TableName和主键注解TableId import com.baomidou.mybatisplus.annotations.TableId; import com.baomidou.mybatisplus.annotations.TableName; import com.baom
分布式ID生成器【snowflake雪花算法】
基于snowflake雪花算法分布式ID生成器 snowflake雪花算法分布式ID生成器几大特点: 41bit的时间戳可以支持该算法使用到2082年 10bit的工作机器id可以支持1024台机器 序列号支持1毫秒产生4096个自增序列id 整体上按照时间自增排序 整个分布式系统内不会产生ID碰撞 每秒能够产生26万ID左右 Twitter的 Snowflake分布式ID生成器的JAVA实现方案