博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
storm-kafka(storm spout作为kafka的消费端)
阅读量:6511 次
发布时间:2019-06-24

本文共 3462 字,大约阅读时间需要 11 分钟。

storm是grovvy写的

kafka是scala写的

storm-kafka  storm连接kafka consumer的插件

下载地址:

除了需要storm和kafka相关jar包还需要google-collections-1.0.jar

以及zookeeper相关包 curator-framework-1.3.3.jar和curator-client-1.3.3.jar

以前由com.netflix.curator组织开发现在归到org.apache.curator下面

1.Kafka Consumer即Storm Spout代码

package demo;import java.util.ArrayList;import java.util.List;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.generated.AlreadyAliveException;import backtype.storm.generated.InvalidTopologyException;import backtype.storm.spout.SchemeAsMultiScheme;import backtype.storm.topology.TopologyBuilder;import storm.kafka.KafkaSpout;import storm.kafka.SpoutConfig;import storm.kafka.StringScheme;import storm.kafka.ZkHosts;public class MyKafkaSpout {public static void main(String[] args) {        String topic ="track";    ZkHosts zkhosts  = new ZkHosts("192.168.1.107:2181,192.168.1.108:2181,192.168.1.109:2181");        SpoutConfig spoutConfig = new SpoutConfig(zkhosts, topic,            "/MyKafka", //偏移量offset的根目录            "MyTrack");//子目录对应一个应用        List
 zkServers=new ArrayList
();    //zkServers.add("192.168.1.107");    //zkServers.add("192.168.1.108");    for(String host:zkhosts.brokerZkStr.split(","))    {        zkServers.add(host.split(":")[0]);    }        spoutConfig.zkServers=zkServers;    spoutConfig.zkPort=2181;    spoutConfig.forceFromStart=true;//从头开始消费,实际上是要改成false的    spoutConfig.socketTimeoutMs=60;    spoutConfig.scheme=new SchemeAsMultiScheme(new StringScheme());//定义输出为string类型        TopologyBuilder builder=new TopologyBuilder();    builder.setSpout("spout", new KafkaSpout(spoutConfig),1);//引用spout,并发度设为1    builder.setBolt("bolt1", new MyKafkaBolt(),1).shuffleGrouping("spout");        Config config =new Config();    config.setDebug(true);//上线之前都要改成false否则日志会非常多    if(args.length>0){                try {            StormSubmitter.submitTopology(args[0], config, builder.createTopology());        } catch (AlreadyAliveException e) {            // TODO Auto-generated catch block            e.printStackTrace();        } catch (InvalidTopologyException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }            }else{                LocalCluster localCluster=new LocalCluster();        localCluster.submitTopology("mytopology", config,  builder.createTopology());        //本地模式在一个进程里面模拟一个storm集群的所有功能    }            }}

2.Bolt代码只是简单打印输出,覆写execute方法即可

package demo;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class MyKafkaBolt implements IBasicBolt {    @Override    public void declareOutputFields(OutputFieldsDeclarer arg0) {        // TODO Auto-generated method stub    }    @Override    public Map
 getComponentConfiguration() {        // TODO Auto-generated method stub        return null;    }    @Override    public void cleanup() {        // TODO Auto-generated method stub    }    @Override    public void execute(Tuple input, BasicOutputCollector arg1) {    String kafkaMsg =input.getString(0);    System.err.println("bolt"+kafkaMsg);    }    @Override    public void prepare(Map arg0, TopologyContext arg1) {        // TODO Auto-generated method stub    }}

本文出自 “” 博客,请务必保留此出处

转载地址:http://wgdfo.baihongyu.com/

你可能感兴趣的文章
CAS服务器端集群
查看>>
Android内存泄漏的常见场景及解决方案
查看>>
设计模式 之 访问者模式
查看>>
JAVA Collections框架
查看>>
更改Windwos server 2003 域用户密码策略默认配置
查看>>
进制转换
查看>>
反转字符串中的单词
查看>>
html与html5的一些区别
查看>>
ASCII码
查看>>
java常用四种排序源代码
查看>>
win7 下硬盘安装Redhat7
查看>>
Redis 分布式锁的正确实现方式
查看>>
mysqldump 备份命令使用中的一些经验总结
查看>>
Linux下MySql安装配置方法总结
查看>>
本IT博客用于域名投资、互联网、资源下载等相关干货收藏和学习
查看>>
ArrayList底层实现
查看>>
【转载】Java程序设计入门 (二)
查看>>
which、whereis、location和fand的区别
查看>>
单词最近距离
查看>>
程序猿知道英语词汇
查看>>