1818IP-服务器技术教程,云服务器评测推荐,服务器系统排错处理,环境搭建,攻击防护等

当前位置:首页 - 运维 - 正文

君子好学,自强不息!

Twitter Storm进阶初步设置

2022-11-06 | 运维 | gtxyzz | 550°c
A+ A-

本篇Blog是一个简单的Storm入门例子,目的让读者明白Storm是怎样的运行机制。以及后续会放出的几篇Storm高级特性以及最终将Storm融入Hadoop 2.x的YARN中。目的读者是已经进阶大数据的Hadoop,Spark用户,或者了解Storm想深入理解Storm的读者用户。

项目Pom(Storm jar没有提交到Maven中央仓库,需要在项目中加入下面的仓库地址):

<repositories>
<repository>
<id>central</id>
<name>MavenRepositorySwitchboard</name>
<layout>default</layout>
<url>http://maven.oschina.net/content/groups/public/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>clojars</id>
<url>https://clojars.org/repo/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.13</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.3.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0.1</version>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>libthrift7</artifactId>
<version>0.7.0</version>
</dependency>
</dependencies>

下面是一个Storm的HelloWord的例子,代码有删减,熟悉Storm的读者自然能把代码组织成一个完整的例子。

publicstaticvoidmain(String[]args){
Configconf=newConfig();
conf.put(Config.STORM_LOCAL_DIR,"/Volumes/Study/data/storm");
conf.put(Config.STORM_CLUSTER_MODE,"local");
//conf.put("storm.local.mode.zmq","false");
conf.put("storm.zookeeper.root","/storm");
conf.put("storm.zookeeper.session.timeout",50000);
conf.put("storm.zookeeper.servers","nowledgedata-n15");
conf.put("storm.zookeeper.port",2181);
//conf.setDebug(true);
//conf.setNumWorkers(2);
TopologyBuilderbuilder=newTopologyBuilder();
builder.setSpout("words",newTestWordSpout(),2);
builder.setBolt("exclaim2",newDefaultStringBolt(),5)
.shuffleGrouping("words");
LocalClustercluster=newLocalCluster();
cluster.submitTopology("test",conf,builder.createTopology());
}

Config.STORM_LOCAL_DIR是配置一个本地路径,Storm会在这个路径写入一些配置信息和临时数据。

Config.STORM_CLUSTER_MODE是运行模式,local和distributed两个选项,即本地模式和分布式模式。本地模式在运行时时多线程模拟的,开发测试用;分布式模式在分布式集群下是多进程的,真正的分布式。

Storm的Spout和Blot高可用是通过ZooKeeper协调的,storm.zookeeper.root是一个ZooKeeper地址,并且有对应的端口号

Debug是测试模式,有更详细的日志信息。

TestWordSpout是一个Storm自带的例子,用来随机的产生new String[] {“nathan”, “mike”, “jackson”, “golda”, “bertels”};列表中的字符串,用来提供数据源。

其中DefaultStringBolt的源码:

OutputCollectorcollector;
publicvoidprepare(Mapconf,TopologyContextcontext,OutputCollectorcollector){
this.collector=collector;
}
publicvoidexecute(Tupletuple){
log.info("revamessage:"+tuple.getString(0));
collector.emit(tuple,newValues(tuple.getString(0)+"!!!"));
collector.ack(tuple);
}

运行日志:

10658[Thread-29-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:jackson
10658[Thread-31-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:jackson
10758[Thread-26-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:mike
10758[Thread-33-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:nathan
10859[Thread-26-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:nathan
10859[Thread-29-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:bertels
10961[Thread-31-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:jackson
10961[Thread-33-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:jackson
11061[Thread-35-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:nathan
11062[Thread-35-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:nathan
11162[Thread-26-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:bertels
11163[Thread-26-exclaim2]INFOcn.pointways.dstorm.bolt.DefaultStringBolt-revamessage:jackson

数据由一个Storm叫做喷嘴(Spout,也相当一个水龙头,能产生数据的来源端)产生,然后传递给后端一连串的的Blot,最终被转换和消费。而Spout和Blot都是并行的,并行度都可以自己设置(本地运行是靠多线程模拟的)。如:

builder.setSpout("words",newTestWordSpout(),2);
builder.setBolt("exclaim2",newDefaultStringBolt(),5)

喷嘴TestWordSpout的并行度是2,DefaultStringBolt的并行度是5.

从日志可以看出,数据经过喷嘴到达预先定于的一个Blot,打印了日志。我测试代码设置的并行度是5,日志中统计,确实是5个线程:

Thread-29-exclaim2
Thread-31-exclaim2
Thread-26-exclaim2
Thread-33-exclaim2
Thread-35-exclaim2

关于Storm是是什么?这里有详细的介绍。

借用OSC网友的话说,Hadoop就是商场里自动升降式的电梯,用户需要排队等待,选按楼层,然后到达;而Storm就像是自动扶梯,扶梯预先设置好运行后,来人就立即运走,目的地是明确的。

Storm按我的理解,Storm和Hadoop是完全不同的,设计上也没有半点拟合的部分。Storm更像是我之前介绍过的Spring Integration,是一个数据流系统。它能把数据按照预设定的流程,把数据做各种转换,传递,分解,合并,***数据到达后端存储。只不过Storm是可以分布式,而且分布式的能力也是可以自己设置。

Storm的这种特性很适合大数据类的ETL系统开发。

本文来源:1818IP

本文地址:https://www.1818ip.com/post/7348.html

免责声明:本文由用户上传,如有侵权请联系删除!

发表评论

必填

选填

选填

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。