温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。
Fayson的github:https://github.com/fayson/cdhproject
提示:代码块部分可以左右滑动查看噢
1.文档编写目的
Fayson在前面的文章《
如何使用StreamSets实现MySQL中变化数据实时写入Kudu
》,本篇文章主要介绍如何使用StreamSets实时采集Kafka的数据并将采集的数据写入Kudu。
- 内容概述
1.测试环境准备
2.准备生产Kafka数据脚本
3.配置StreamSets
4.流程测试及数据验证
- 测试环境
1.RedHat7.4
2.CM和CDH版本为cdh5.13.3
3.kafka3.0.0(0.11.0)
4.Kudu 1.5.0
- 前置条件
1.集群已安装Kafka并正常运行
2.集群未启用Kerberos
2.测试环境准备
1.通过如下命令创建测试topic
kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic kafka2kudu_topic
(可左右滑动)
2.通过Hue使用Impala创建一个Kudu表,创建脚本如下:
CREATE TABLE ods_deal_daily_kudu ( id STRING COMPRESSION snappy, name STRING COMPRESSION snappy, sex STRING COMPRESSION snappy, city STRING COMPRESSION snappy, occupation STRING COMPRESSION snappy, mobile_phone_num STRING COMPRESSION snappy, fix_phone_num STRING COMPRESSION snappy, bank_name STRING COMPRESSION snappy, address STRING COMPRESSION snappy, marriage STRING COMPRESSION snappy, child_num INT COMPRESSION snappy, PRIMARY KEY (id) ) PARTITION BY HASH PARTITIONS 16 STORED AS KUDU TBLPROPERTIES ('kudu.master_addresses'='master.gzyh.com' );
(可左右滑动)
这里在创建Kudu表的时候增加了kudu.master的配置,如果在Impala中未启用集成kudu的配置则需要增加该参数,在Impala中配置向如下:
3..准备测试数据文件
共600条测试数据,数据的id是唯一的。
3.生产Kafka消息
在这里Fayson读取的是本地的数据文件,将每行文件解析并封装为json数据,实时的发送给Kafka。
1.创建Maven工程,工程的pom.xml文件内容如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cdh-project</artifactId> <groupId>com.cloudera</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>kafka-demo</artifactId> <packaging>jar</packaging> <name>kafka-demo</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency> <dependency> <groupId>net.sf.json-lib</groupId> <artifactId>json-lib</artifactId> <version>2.4</version> <classifier>jdk15</classifier> </dependency> </dependencies> </project>
(可左右滑动)
2.编写ReadFileToKafka.java文件内容如下:
package com.cloudera.nokerberos; import net.sf.json.JSONObject; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.io.*; import java.util.HashMap; import java.util.Map; import java.util.Properties; /** * package: com.cloudera.nokerberos * describe: 通过读取本地text文件将文件内容解析并组装为JSON发送到Kafka * creat_user: Fayson * email: htechinfo@163.com * creat_date: 2018/4/27 * creat_time: 下午4:42 * 公众号:Hadoop实操 */ public class ReadFileToKafka { public static String confPath = System.getProperty("user.dir") + File.separator + "conf"; public static void main(String[] args) { if(args.length < 1) { System.out.print("缺少输入参数,请指定要处理的text文件"); System.exit(1); } String filePath = args[0]; BufferedReader reader = null; try { Properties appProperties = new Properties(); appProperties.load(new FileInputStream(new File(confPath + File.separator + "app.properties"))); String brokerlist = String.valueOf(appProperties.get("bootstrap.servers")); String topic_name = String.valueOf(appProperties.get("topic.name")); Properties props = getKafkaProps(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerlist); Producer<String, String> producer = new KafkaProducer<String, String>(props); reader = new BufferedReader(new FileReader(filePath)); String tempString = null; int line = 1; // 一次读入一行,直到读入null为文件结束 while ((tempString = reader.readLine()) != null) { String detailJson = parseJSON(tempString); ProducerRecord record = new ProducerRecord<String, String>(topic_name, detailJson); producer.send(record); line++; } reader.close(); producer.flush(); producer.close(); } catch (Exception e) { e.printStackTrace(); } finally { if (reader != null) { try { reader.close(); } catch (IOException e1) { } } } } /** * 将txt文件中的每行数据解析并组装为json字符串 * @param tempString * @return */ private static String parseJSON(String tempString) { if(tempString != null && tempString.length() > 0) { Map<String, String> resultMap = null; String[] detail = tempString.split("\001"); resultMap = new HashMap<>(); resultMap.put("id", detail[0]); resultMap.put("name", detail[1]); resultMap.put("sex", detail[2]); resultMap.put("city", detail[3]); resultMap.put("occupation", detail[4]); resultMap.put("mobile_phone_num", detail[5]); resultMap.put("fix_phone_num", detail[6]); resultMap.put("bank_name", detail[7]); resultMap.put("address", detail[8]); resultMap.put("marriage", detail[9]); resultMap.put("child_num", detail[10]); return JSONObject.fromObject(resultMap).toString(); } return null; } /** * 初始化Kafka配置 * @return */ private static Properties getKafkaProps() { try{ Properties props = new Properties(); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000); //批量发送消息 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); return props; } catch (Exception e) { e.printStackTrace(); } return null; } }
(可左右滑动)
3.将编写好的代码使用mvn命令打包
在工程目录使用mvn cleanpackage命令进行编译打包
4.编写脚本run.sh脚本运行jar包
运行脚本目录结构如下
run.sh脚本内容如下
[root@master kafka-run]# vim run.sh #!/bin/bash ######################################### # 创建Topic # kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic ods_deal_daily_topic ######################################## JAVA_HOME=/usr/java/jdk1.8.0_131 #要读取的文件 read_file=$1 for file in `ls lib/*jar` do CLASSPATH=$CLASSPATH:$file done export CLASSPATH ${JAVA_HOME}/bin/java -Xms1024m -Xmx2048m com.cloudera.nokerberos.ReadFileToKafka $read_file
(可左右滑动)
conf目录下的配置文件app.properties内容如下
[root@master kafka-run]# vim conf/app.properties bootstrap.servers=cdh01.gzyh.com:9092,cdh02.gzyh.com:9092,cdh03.gzyh.com:9092 topic.name=ods_deal_daily_topic
(可左右滑动)
lib目录的依赖包
依赖包可以在命令行使用mvn命令导出:
mvn dependency:copy-dependencies -DoutputDirectory=/tmp/lib
(可左右滑动)
数据文件内容:
4.在StreamSets上创建Pipline
1.登录StreamSets,创建一个kafka2kudu的Pipline
2.在Pipline流程中添加Kafka Consumer作为源并配置Kafka基础信息
3.配置Kafka相关信息,如Broker、ZK及Topic
4.配置数据格式化方式,写入Kafka的数据为JSON格式,所以这里选择JSON
5.添加Kudu模块及配置基本信息
6.配置Kudu的Master、Table、Operation等
Kudu Masters:可以配置多个,多个地址以“,”分割
Table Name:如果使用Impala创建的Kudu表则需要添加impala::前缀
Field to ColumnMapping:配置Json中key与Kudu表的column的映射关系,如果字段名称一致则不需要配置。
DefaultOpertation:设置操作类型如:insert、upsert、delete
Kudu模块高级配置使用默认配置
5.流程测试验证
1.启动kafka2kudu的Pipline,启动成功如下图显示
2.在命令行运行run.sh脚本向Kafka发送消息
[root@master kafka-run]# sh run.sh ods_user_600.txt
(可左右滑动)
上面执行了两次脚本。
3.在命令行运行run.sh脚本向Kafka发送消息
点击Kudu模块,查看监控信息
4.查看Kudu的ods_deal_daily_kudu表内容
入库的数据总条数
可以看到ods_deal_daily_kudu表的总条数与准备的测试数据量一致。
GitHub地址:
https://github.com/fayson/cdhproject/tree/master/kafkademo/readlocalfile-kafka-shell
https://github.com/fayson/cdhproject/blob/master/kafkademo/src/main/java/com/cloudera/nokerberos/ReadFileToKafka.java
提示:代码块部分可以左右滑动查看噢
为天地立心,为生民立命,为往圣继绝学,为万世开太平。
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。
推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。
原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操
本文暂时没有评论,来添加一个吧(●'◡'●)