kafka+storm+hbase如何实现计算WordCount

这篇文章主要介绍了kafka+storm+hbase如何实现计算WordCount,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

创新互联建站专注为客户提供全方位的互联网综合服务,包含不限于成都做网站、成都网站制作、成都外贸网站建设、白塔网络推广、微信小程序开发、白塔网络营销、白塔企业策划、白塔品牌公关、搜索引擎seo、人物专访、企业宣传片、企业代运营等,从售前售中售后,我们都将竭诚为您服务,您的肯定,是我们最大的嘉奖;创新互联建站为所有大学生创业者提供白塔建站搭建服务,24小时服务热线:028-86922220,官方网址:www.cdcxhl.com

kafka+storm+hbase实现计算WordCount。

(1)表名:wc

(2)列族:result

(3)RowKey:word

(4)Field:count

1、解决:

(1)第一步:首先准备kafka、storm和hbase相关jar包。依赖如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

"http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  4.0.0

  com

  kafkaSpout

  0.0.1-SNAPSHOT

   

    

        

            org.apache.storm

            storm-core

            0.9.3

        

        

            org.apache.storm

            storm-kafka

            0.9.3

        

        

            org.apache.kafka

            kafka_2.10

            0.8.1.1

            

                

                    org.apache.zookeeper

                    zookeeper

                

                

                    log4j

                    log4j

                

            

        

        

            org.apache.hbase

            hbase-client

            0.99.2

            

                

                    org.slf4j

                    slf4j-log4j12

                

                

                    org.apache.zookeeper

                    zookeeper

                

            

        

         

       

 

         com.google.protobuf

 

         protobuf-java

 

         2.5.0

 

        

 

        

            org.apache.curator

            curator-framework

            2.5.0

            

                

                    log4j

                    log4j

                

                

                    org.slf4j

                    slf4j-log4j12

                

            

        

                                                                              

           

            jdk.tools

            jdk.tools

            1.7

            system

            C:\Program Files\Java\jdk1.7.0_51\lib\tools.jar

            

         

    

  

    

        

            central

            http://repo1.maven.org/maven2/

            

                false

            

            

                true

            

        

        

            clojars

            https://clojars.org/repo/

            

                true

            

            

                true

            

        

        

            scala-tools

            http://scala-tools.org/repo-releases

            

                true

            

            

                true

            

        

        

            conjars

            http://conjars.org/repo/

            

                true

            

            

                true

            

        

    

 

    

        

            

                org.apache.maven.plugins

                maven-compiler-plugin

                3.1

                

                    1.6

                    1.6

                    UTF-8

                    true

                    true

                

            

            

                maven-assembly-plugin

                

                    

                        jar-with-dependencies

                    

                    

                        

                            

                        

                    

                

                

                    

                        make-assembly

                        package

                        

                            single

                        

                    

                

            

        

    

 

(2)将kafka发来的数据通过levelSplit的bolt进行分割处理,然后再发送到下一个Bolt中。代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

package com.kafka.spout;

 

import java.util.regex.Matcher;

import java.util.regex.Pattern;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

  

public class LevelSplit extends BaseBasicBolt {

  

    public void execute(Tuple tuple, BasicOutputCollector collector) {

        String words = tuple.getString(0).toString();//the cow jumped over the moon

        String []va=words.split(" ");

        for(String word : va)

        {

            collector.emit(new Values(word));

        }

         

    }

    

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word"));

    }

 

}

(3)将levelSplit的Bolt发来的数据到levelCount的Bolt中进行计数处理,然后发送到hbase(Bolt)中。代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

package com.kafka.spout;

 

import java.util.HashMap;

import java.util.Map;

import java.util.Map.Entry;

 

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

  

public class LevelCount extends BaseBasicBolt {

    Map counts = new HashMap();

 

    public void execute(Tuple tuple, BasicOutputCollector collector) {

        // TODO Auto-generated method stub

        String word = tuple.getString(0);

        Integer count = counts.get(word);

        if (count == null)

            count = 0;

        count++;

        counts.put(word, count);

 

        for (Entry e : counts.entrySet()) {

            //sum += e.getValue();

            System.out.println(e.getKey()

                                "----------->" +e.getValue());

        }

        collector.emit(new Values(word, count));     

    }

 

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        // TODO Auto-generated method stub

         declarer.declare(new Fields("word""count"));

    }

}

(4)准备连接kafka和hbase条件以及设置整个拓扑结构并且提交拓扑。代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

package com.kafka.spout;

  

import java.util.HashMap;

import java.util.Map;

 

import com.google.common.collect.Maps;

 

//import org.apache.storm.guava.collect.Maps;

  

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.spout.SchemeAsMultiScheme;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.tuple.Fields;

import backtype.storm.utils.Utils;

import storm.kafka.BrokerHosts;

import storm.kafka.KafkaSpout;

import storm.kafka.SpoutConfig;

import storm.kafka.ZkHosts;

   

public class StormKafkaTopo {

    public static void main(String[] args) {

                  

        BrokerHosts brokerHosts = new ZkHosts("zeb,yjd,ylh");

        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "yjd""/storm""kafkaspout");

        Config conf = new Config();  

        spoutConfig.scheme =  new SchemeAsMultiScheme(new MessageScheme());   

         

        SimpleHBaseMapper mapper = new SimpleHBaseMapper();

        mapper.withColumnFamily("result");

        mapper.withColumnFields(new Fields("count"));

        mapper.withRowKeyField("word");

         

        Map map = Maps.newTreeMap();

        map.put("hbase.rootdir""hdfs://zeb:9000/hbase");

        map.put("hbase.zookeeper.quorum""zeb:2181,yjd:2181,ylh:2181");

         

        // hbase-bolt

        HBaseBolt hBaseBolt = new HBaseBolt("wc", mapper).withConfigKey("hbase.conf");

 

        conf.setDebug(true);

        conf.put("hbase.conf", map);

          

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout"new KafkaSpout(spoutConfig));

        builder.setBolt("split"new LevelSplit(), 1).shuffleGrouping("spout");

        builder.setBolt("count"new LevelCount(), 1).fieldsGrouping("split"new Fields("word"));

        builder.setBolt("hbase", hBaseBolt, 1).shuffleGrouping("count");

         

        if(args != null && args.length > 0) {

            //提交到集群运行

            try {

                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

            catch (AlreadyAliveException e) {

                e.printStackTrace();

            catch (InvalidTopologyException e) {

                e.printStackTrace();

            }

        else {

            //本地模式运行

            LocalCluster cluster = new LocalCluster();

            cluster.submitTopology("Topotest1121", conf, builder.createTopology());

            Utils.sleep(1000000);

            cluster.killTopology("Topotest1121");

            cluster.shutdown();

        }          

    }

}

(5)在kafka端用控制台生产数据,如下:

kafka+storm+hbase如何实现计算WordCount

2、运行结果截图:

 kafka+storm+hbase如何实现计算WordCount

3、遇到的问题:

(1)把所有的工作做好后,提交了拓扑,运行代码。发生了错误1,如下:

 kafka+storm+hbase如何实现计算WordCount

解决:原来是因为依赖版本要统一的问题,最后将版本修改一致后,成功解决。

(2)发生了错误2,如下:

 kafka+storm+hbase如何实现计算WordCount

解决:原来是忘记开hbase中的HMaster和HRegionServer。启动后问题成功解决。

感谢你能够认真阅读完这篇文章,希望小编分享的“kafka+storm+hbase如何实现计算WordCount”这篇文章对大家有帮助,同时也希望大家多多支持创新互联,关注创新互联行业资讯频道,更多相关知识等着你来学习!


网站名称:kafka+storm+hbase如何实现计算WordCount
标题URL:http://hbruida.cn/article/gecgsh.html