博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于Hadoop生态SparkStreaming的大数据实时流处理平台的搭建
阅读量:6452 次
发布时间:2019-06-23

本文共 11664 字,大约阅读时间需要 38 分钟。

随着公司业务发展,对的获取和实时处理的要求就会越来越高,日志处理、用户行为分析、场景业务分析等等,传统的写日志方式根本满足不了业务的实时处理需求,所以本人准备开始着手改造原系统中的数据处理方式,重新搭建一个实时流处理平台,主要是基于生态,利用Kafka作为中转,SparkStreaming框架实时获取数据并清洗,将结果多维度的存储进HBase数据库。

整个平台大致的框架如下:

操作系统:Centos7

用到的框架:

1. Flume1.8.0

2. Hadoop2.9.0

3. kafka2.11-1.0.0

4.Spark2.2.1

5.HBase1.2.6

6. ZooKeeper3.4.11

7. maven3.5.2

整体的开发环境是基于JDK1.8以上以及Scala,所以得提前把java和Scala的环境给准备好,接下来就开始着手搭建基础平台:

一、配置开发环境

下载并解压JDK1.8,、下载并解压Scala,配置profile文件:

vim /etc/profile复制代码
export JAVA_HOME=/usr/java/jdk1.8.0_144export PATH=$JAVA_HOME/bin:$PATHexport CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jarexport SCALA_HOME=/usr/local/scala-2.11.12export PATH=$PATH:$SCALA_HOME/bin复制代码
source /etc/profile复制代码

二、配置zookeeper、maven环境

下载并解压zookeeper以及maven并配置profile文件

wget http://mirrors.hust.edu.cn/apache/maven/maven-3/3.5.2/binaries/apache-maven-3.5.2-bin.tar.gztar -zxvf apache-maven-3.5.2-bin.tar.gz -C /usr/localwget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gztar -zxvf zookeeper-3.4.11.tar.gz -C /usr/localvim /etc/profile复制代码
export MAVEN_HOME=/usr/local/apache-maven-3.5.2export PATH=$PATH:$MAVEN_HOME/bin复制代码
source /etc/profile复制代码

zookeeper的配置文件配置一下:

cp /usr/local/zookeeper-3.4.11/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.11/conf/zoo.cfg复制代码

然后配置一下zoo.cfg里面的相关配置,指定一下dataDir目录等等

启动zookeeper:

/usr/local/zookeeper-3.4.11/bin/zkServer.sh start复制代码

如果不报错,jps看一下是否启动成功

三、安装配置Hadoop

Hadoop的安装配置在之前文章中有说过(传送门),为了下面的步骤方便理解,这里只做一个单机版的简单配置说明:

下载hadoop解压并配置环境:

wget http://mirrors.hust.edu.cn/apache/hadoop/common/hadoop-2.9.0/hadoop-2.9.0.tar.gztar -zxvf hadoop-2.9.0.tar.gz -C /usr/localvim /etc/profile复制代码
export HADOOP_HOME=/usr/local/hadoop-2.9.0export PATH=$PATH:$HADOOP_HOME/bin复制代码
source /etc/profile复制代码

配置hadoop 进入/usr/local/hadoop-2.9.0/etc/hadoop目录

cd /usr/local/hadoop-2.9.0/etc/hadoop复制代码

首先配置hadoop-env.sh、yarn-env.sh,修改JAVA_HOME到指定的JDK安装目录/usr/local/java/jdk1.8.0_144

创建hadoop的工作目录

mkdir /opt/data/hadoop复制代码

编辑core-site.xml、hdfs-site.xml、yarn-site.xml等相关配置文件,具体配置不再阐述请看前面的文章,配置完成之后记得执行hadoop namenode -format,否则hdfs启动会报错,启动完成后不出问题浏览器访问50070端口会看到hadoop的页面。

想学习大数据或者想学习大数据的朋友,我整理了一套大数据的学习视频免费分享给大家,从入门到实战都有,大家可以加微信:Lxiao_28获取,还可以入微信群交流!(备注领取资料,真实有效)。

四、安装配置kafka

还是一样,先下载kafka,然后配置:

wget http://mirrors.hust.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgztar -zxvf kafka_2.11-1.0.0.tgz -C /usr/localvim /etc/profile复制代码
export KAFKA_HOME=/usr/local/kafka_2.11-1.0.0export PATH=$KAFKA_HOME/bin:$PATH复制代码
source /etc/profile复制代码

进入kafka的config目录,配置server.properties,指定log.dirs和zookeeper.connect参数;配置zookeeper.properties文件中zookeeper的dataDir,配置完成后启动kafka

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties复制代码

可以用jps查看有没有kafka进程,然后测试一下kafka是否能够正常收发消息,开两个终端,一个用来做producer发消息一个用来做consumer收消息,首先,先创建一个topic

kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic testTopickafka-topics.sh --describe --zookeeper localhost:2181 --topic testTopic复制代码

如果不出一下会看到如下输出:

Topic:testTopic	PartitionCount:1	ReplicationFactor:1	Configs:Topic: testTopic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0复制代码

然后在第一个终端中输入命令:

kafka-console-producer.sh –broker-list localhost:9092 –topic testTopic

在第二个终端中输入命令:

kafka-console-consumer.sh –zookeeper 127.0.0.1:2181 –topic testTopic

如果启动都正常,那么这两个终端将进入阻塞监听状态,在第一个终端中输入任何消息第二个终端都将会接收到。

五、安装配置HBase

下载并解压HBase:

wget http://mirrors.hust.edu.cn/apache/hbase/1.2.6/hbase-1.2.6-bin.tar.gztar -zxvf hbase-1.2.6-bin.tar.gz -C /usr/local/vim /etc/profile复制代码
export HBASE_HOME=/usr/local/hbase-1.2.6export PATH=$PATH:$HBASE_HOME/bin复制代码
source /etc/profile复制代码

修改hbase下的配置文件,首先修改hbase-env.sh,主要修改JAVA_HOME以及相关参数,这里要说明一下HBASE_MANAGES_ZK这个参数,因为采用了自己的zookeeper,所以这里设置为false,否则hbase会自己启动一个zookeeper

cd /usr/local/hbase-1.2.6/confvim hbase-env.sh复制代码
export JAVA_HOME=/usr/local/java/jdk1.8.0_144/HBASE_CLASSPATH=/usr/local/hbase-1.2.6/confexport HBASE_MASTER_OPTS="$HBASE_MASTER_OPTS -XX:PermSize=256m -XX:MaxPermSize=1024m"export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS -XX:PermSize=256m -XX:MaxPermSize=1024m"export HBASE_PID_DIR=/opt/data/hbaseexport HBASE_MANAGES_ZK=false复制代码

然后修改hbase-site.xml,我们设置hbase的文件放在hdfs中,所以要设置hdfs地址,其中tsk1是我安装hadoop的机器的hostname,hbase.zookeeper.quorum参数是安装zookeeper的地址,这里的各种地址最好用机器名

vim hbase-site.xml复制代码
hbase.rootdir
hdfs://tsk1:9000/hbase
hbase.master
tsk1:60000
hbase.master.port
60000
hbase.cluster.distributed
true
hbase.zookeeper.quorum
192.168.70.135
zookeeper.znode.parent
/hbase
hbase.zookeeper.property.dataDir
/opt/data/zookeeper
hbase.master.info.bindAddress
tsk1
复制代码

配置完成后启动hbase,输入命令:

start-hbase.sh

完成后查看日志没有报错的话测试一下hbase,用hbase shell进行测试:

hbase shellhbase(main):001:0>create 'myTestTable','info'0 row(s) in 2.2460 seconds=> Hbase::Table - myTestTablehbase(main):003:0>listTABLE                                                                                                                    testTable                                                                                                                1 row(s) in 0.1530 seconds=> ["myTestTable"]复制代码

至此,hbase搭建成功,访问以下hadoop的页面,查看file system(菜单栏Utilities->Browse the file system),这时可以看见base的相关文件已经载hadoop的文件系统中。

六、安装spark

下载spark并解压

wget http://mirrors.hust.edu.cn/apache/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgztar -zxvf spark-2.2.1-bin-hadoop2.7.tgz -C /usr/localvim /etc/profile复制代码
export SPARK_HOME=/usr/local/spark-2.2.1-bin-hadoop2.7export PATH=$PATH:$SPARK_HOME/bin复制代码
source /etc/profile复制代码

七、测试

至此,环境基本搭建完成,以上搭建的环境仅是服务器生产环境的一部分,涉及服务器信息、具体调优信息以及集群的搭建就不写在这里了,下面我们写一段代码整体测试一下从kafka生产消息到spark streaming接收到,然后处理消息并写入HBase。先写一个HBase的连接类HBaseHelper:

public class HBaseHelper {    private static HBaseHelper ME;    private static Configuration config;    private static Connection conn;    private static HBaseAdmin admin;    public static HBaseHelper getInstances() {        if (null == ME) {            ME = new HBaseHelper();            config = HBaseConfiguration.create();            config.set("hbase.rootdir", "hdfs://tsk1:9000/hbase");            config.set("hbase.zookeeper.quorum", "tsk1");            config.set("hbase.zookeeper.property.clientPort", "2181");            config.set("hbase.defaults.for.version.skip", "true");        }        if (null == conn) {            try {                conn = ConnectionFactory.createConnection(config);                admin = new HBaseAdmin(config);            } catch (IOException e) {                e.printStackTrace();            }        }        return ME;    }    public Table getTable(String tableName) {        Table table = null;        try {            table = conn.getTable(TableName.valueOf(tableName));        } catch (Exception ex) {            ex.printStackTrace();        }        return table;    }    public void putAdd(String tableName, String rowKey, String cf, String column, Long value) {        Table table = this.getTable(tableName);        try {            table.incrementColumnValue(rowKey.getBytes(), cf.getBytes(), column.getBytes(), value);            System.out.println("OK!");        } catch (IOException e) {            e.printStackTrace();        }    } //......以下省略}复制代码

再写一个测试类KafkaRecHbase用来做spark-submit提交

package com.test.spark.spark_test;import java.util.HashMap;import java.util.Map;import java.util.regex.Pattern;import org.apache.log4j.Level;import org.apache.log4j.Logger;import org.apache.spark.SparkConf;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.kafka.KafkaUtils;import scala.Tuple2;public class KafkaRecHbase {    private static final Pattern SPACE = Pattern.compile(" ");    public static void main(String[] args) throws Exception {        Logger.getLogger("org").setLevel(Level.ERROR);        SparkConf sparkConf = new SparkConf();        sparkConf.setAppName("kafkaRecHbase");        sparkConf.setMaster("local[2]");        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));        int numThreads = Integer.parseInt(args[3]);        Map
topicMap = new HashMap<>(); String[] topics = args[2].split(","); for (String topic : topics) { topicMap.put(topic, numThreads); } JavaPairReceiverInputDStream
kafkaStream = KafkaUtils.createStream(ssc, args[0], args[1], topicMap); JavaDStream
lines = kafkaStream.map(Tuple2::_2); JavaDStream
lineStr = lines.map(line -> { if (null == line || line.equals("")) { return ""; } String[] strs = SPACE.split(line); if (strs.length < 1) { return ""; } try { for (String str : strs) { HBaseHelper.getInstances().putAdd("myTestTable", str, "info", "wordCunts", 1l); } return "strs:" + line; } catch (Exception ex) { System.out.println(line); return "报错了:" + ex.getMessage(); } }); lineStr.print(); ssc.start(); System.out.println("spark 启动!!!"); ssc.awaitTermination(); }}复制代码

编译提交到服务器,执行命令:

spark-submit --jars $(echo /usr/local/hbase-1.2.6/lib/*.jar | tr ' ' ',') --class com.test.spark.spark_test.KafkaRecHbase --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1 /opt/FileTemp/streaming/spark-test-0.1.1.jar tsk1:2181 test testTopic 1复制代码

没报错的话执行kafka的producer,输入几行数据在HBase内就能看到结果了!

八、装一个Flume实时采集Nginx日志写入Kafka

Flume是一个用来日志采集的框架,安装和配置都比较简单,可以支持多个数据源和输出,具体可以参考Flume的文档,写的比较全 传送门

下载Flume并配置环境

wget http://mirrors.hust.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gztar -zxvf apache-flume-1.8.0-bin.tar.gz -C /usr/localvim /etc/profile复制代码
export FLUME_HOME=/usr/local/apache-flume-1.8.0-bin/export PATH=$FLUME_HOME/bin:$PATH复制代码
source /etc/profile复制代码

写一个Flume的配置文件在flume的conf目录下:

vim nginxStreamingKafka.conf复制代码
agent1.sources=r1agent1.channels=logger-channelagent1.sinks=kafka-sinkagent1.sources.r1.type=execagent1.sources.r1.deserializer.outputCharset= UTF-8agent1.sources.r1.command=tail -F /opt/data/nginxLog/nginxLog.logagent1.channels.logger-channel.type=memoryagent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSinkagent1.sinks.kafka-sink.topic = flumeKafkaagent1.sinks.kafka-sink.brokerList = tsk1:9092agent1.sinks.kafka-sink.requiredAcks = 1agent1.sinks.kafka-sink.batchSize = 20agent1.sources.r1.channels=logger-channelagent1.sinks.kafka-sink.channel=logger-channel复制代码

kafka创建一个名为flumeKafka的topic用来接收,然后启动flume:

flume-ng agent --name agent1 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/nginxStreamingKafka.conf -Dflume.root.logger=INFO,console复制代码

如果没有报错,Flume将开始采集opt/data/nginxLog/nginxLog.log中产生的日志并实时推送给kafka,再按照上面方法写一个spark streaming的处理类进行相应的处理就好。

转载地址:http://yagwo.baihongyu.com/

你可能感兴趣的文章
linux pbs 用户时间,【Linux】单计算机安装PBS系统(Torque)与运维
查看>>
linux系统可用内存减少,在Linux中检查可用内存的5种方法
查看>>
linux 脚本map,Linux Shell Map的用法详解
查看>>
linux mariadb忘记密码,Linux上mariadb重置密码
查看>>
如何在linux系统下配置共享文件夹,如何在windows和Linux系统之间共享文件夹.doc
查看>>
thinkpad装linux无线网卡驱动,ThinkPad E530 Fedora 20 下无线网卡驱动的安装
查看>>
linux磁盘管理是什么东西,Linux磁盘管理详解
查看>>
linux卸载软件出现依赖,关于ubuntu循环依赖软件的删除
查看>>
linux操作系统加固软件,系统安全:教你Linux操作系统的安全加固
查看>>
linux中yum源安装dhcp,24.Linux系统下动态网络源部署方法(dhcpd)
查看>>
linux屏幕复制显示出来的,linux – stdout到gnu屏幕复制缓冲区
查看>>
c语言规定数据长度,C语言中各种数据类型长度
查看>>
android l 新功能,Android L SDK -- 一些有趣的新功能
查看>>
android中心打开式动画,android 围绕中心旋转动画
查看>>
android 键盘 光标位置不对,鼠标定位不准的解决方法大全
查看>>
android pokemon go 虚拟定位,《Pokemon GO》推出全新道具 每年可转一次队伍
查看>>
控件拉伸(转)
查看>>
Linux -进程-孤儿进程-僵尸进程-预防僵尸进程
查看>>
高性能性能测试开发方法
查看>>
薪资留人还是情感留人?
查看>>