程序员开发实例大全宝库

网站首页 > 编程文章 正文

0254-如何使用StreamSets实时采集Kafka并入库Kudu

zazugpt 2024-09-04 22:19:56 编程文章 23 ℃ 0 评论

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

Fayson的github:https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1.文档编写目的


Fayson在前面的文章《

如何使用StreamSets实现MySQL中变化数据实时写入Kudu

》,本篇文章主要介绍如何使用StreamSets实时采集Kafka的数据并将采集的数据写入Kudu。

  • 内容概述

1.测试环境准备

2.准备生产Kafka数据脚本

3.配置StreamSets

4.流程测试及数据验证

  • 测试环境

1.RedHat7.4

2.CM和CDH版本为cdh5.13.3

3.kafka3.0.0(0.11.0)

4.Kudu 1.5.0

  • 前置条件

1.集群已安装Kafka并正常运行

2.集群未启用Kerberos

2.测试环境准备


1.通过如下命令创建测试topic

kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic kafka2kudu_topic

(可左右滑动)

2.通过Hue使用Impala创建一个Kudu表,创建脚本如下:

CREATE TABLE ods_deal_daily_kudu (
 id STRING COMPRESSION snappy,
 name STRING COMPRESSION snappy,
 sex STRING COMPRESSION snappy,
 city STRING COMPRESSION snappy,
 occupation STRING COMPRESSION snappy,
 mobile_phone_num STRING COMPRESSION snappy,
 fix_phone_num STRING COMPRESSION snappy,
 bank_name STRING COMPRESSION snappy,
 address STRING COMPRESSION snappy,
 marriage STRING COMPRESSION snappy,
 child_num INT COMPRESSION snappy,
 PRIMARY KEY (id)
)
 PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
 TBLPROPERTIES ('kudu.master_addresses'='master.gzyh.com'
);

(可左右滑动)

这里在创建Kudu表的时候增加了kudu.master的配置,如果在Impala中未启用集成kudu的配置则需要增加该参数,在Impala中配置向如下:

3..准备测试数据文件

共600条测试数据,数据的id是唯一的。

3.生产Kafka消息


在这里Fayson读取的是本地的数据文件,将每行文件解析并封装为json数据,实时的发送给Kafka。

1.创建Maven工程,工程的pom.xml文件内容如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <parent>
 <artifactId>cdh-project</artifactId>
 <groupId>com.cloudera</groupId>
 <version>1.0-SNAPSHOT</version>
 </parent>
 <modelVersion>4.0.0</modelVersion>
 <artifactId>kafka-demo</artifactId>
 <packaging>jar</packaging>
 <name>kafka-demo</name>
 <url>http://maven.apache.org</url>
 <properties>
 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 </properties>
 <dependencies>
 <dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>0.10.2.0</version>
 </dependency>
 <dependency>
 <groupId>net.sf.json-lib</groupId>
 <artifactId>json-lib</artifactId>
 <version>2.4</version>
 <classifier>jdk15</classifier>
 </dependency>
 </dependencies>
</project>

(可左右滑动)

2.编写ReadFileToKafka.java文件内容如下:

package com.cloudera.nokerberos;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
 * package: com.cloudera.nokerberos
 * describe: 通过读取本地text文件将文件内容解析并组装为JSON发送到Kafka
 * creat_user: Fayson
 * email: htechinfo@163.com
 * creat_date: 2018/4/27
 * creat_time: 下午4:42
 * 公众号:Hadoop实操
 */
public class ReadFileToKafka {
 public static String confPath = System.getProperty("user.dir") + File.separator + "conf";
 public static void main(String[] args) {
 if(args.length < 1) {
 System.out.print("缺少输入参数,请指定要处理的text文件");
 System.exit(1);
 }
 String filePath = args[0];
 BufferedReader reader = null;
 try {
 Properties appProperties = new Properties();
 appProperties.load(new FileInputStream(new File(confPath + File.separator + "app.properties")));
 String brokerlist = String.valueOf(appProperties.get("bootstrap.servers"));
 String topic_name = String.valueOf(appProperties.get("topic.name"));
 Properties props = getKafkaProps();
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerlist);
 Producer<String, String> producer = new KafkaProducer<String, String>(props);
 reader = new BufferedReader(new FileReader(filePath));
 String tempString = null;
 int line = 1;
 // 一次读入一行,直到读入null为文件结束
 while ((tempString = reader.readLine()) != null) {
 String detailJson = parseJSON(tempString);
 ProducerRecord record = new ProducerRecord<String, String>(topic_name, detailJson);
 producer.send(record);
 line++;
 }
 reader.close();
 producer.flush();
 producer.close();
 } catch (Exception e) {
 e.printStackTrace();
 } finally {
 if (reader != null) {
 try {
 reader.close();
 } catch (IOException e1) {
 }
 }
 }
 }
 /**
 * 将txt文件中的每行数据解析并组装为json字符串
 * @param tempString
 * @return
 */
 private static String parseJSON(String tempString) {
 if(tempString != null && tempString.length() > 0) {
 Map<String, String> resultMap = null;
 String[] detail = tempString.split("\001");
 resultMap = new HashMap<>();
 resultMap.put("id", detail[0]);
 resultMap.put("name", detail[1]);
 resultMap.put("sex", detail[2]);
 resultMap.put("city", detail[3]);
 resultMap.put("occupation", detail[4]);
 resultMap.put("mobile_phone_num", detail[5]);
 resultMap.put("fix_phone_num", detail[6]);
 resultMap.put("bank_name", detail[7]);
 resultMap.put("address", detail[8]);
 resultMap.put("marriage", detail[9]);
 resultMap.put("child_num", detail[10]);
 return JSONObject.fromObject(resultMap).toString();
 }
 return null;
 }
 /**
 * 初始化Kafka配置
 * @return
 */
 private static Properties getKafkaProps() {
 try{
 Properties props = new Properties();
 props.put(ProducerConfig.ACKS_CONFIG, "all");
 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000); //批量发送消息
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
 return props;
 } catch (Exception e) {
 e.printStackTrace();
 }
 return null;
 }
}

(可左右滑动)

3.将编写好的代码使用mvn命令打包

在工程目录使用mvn cleanpackage命令进行编译打包

4.编写脚本run.sh脚本运行jar包

运行脚本目录结构如下

run.sh脚本内容如下

[root@master kafka-run]# vim run.sh 
#!/bin/bash
#########################################
# 创建Topic
# kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic ods_deal_daily_topic
########################################
JAVA_HOME=/usr/java/jdk1.8.0_131
#要读取的文件
read_file=$1
for file in `ls lib/*jar`
do
 CLASSPATH=$CLASSPATH:$file
done
export CLASSPATH
${JAVA_HOME}/bin/java -Xms1024m -Xmx2048m com.cloudera.nokerberos.ReadFileToKafka $read_file

(可左右滑动)

conf目录下的配置文件app.properties内容如下

[root@master kafka-run]# vim conf/app.properties 
bootstrap.servers=cdh01.gzyh.com:9092,cdh02.gzyh.com:9092,cdh03.gzyh.com:9092
topic.name=ods_deal_daily_topic

(可左右滑动)

lib目录的依赖包

依赖包可以在命令行使用mvn命令导出:

mvn dependency:copy-dependencies -DoutputDirectory=/tmp/lib

(可左右滑动)

数据文件内容:

4.在StreamSets上创建Pipline


1.登录StreamSets,创建一个kafka2kudu的Pipline

2.在Pipline流程中添加Kafka Consumer作为源并配置Kafka基础信息

3.配置Kafka相关信息,如Broker、ZK及Topic

4.配置数据格式化方式,写入Kafka的数据为JSON格式,所以这里选择JSON

5.添加Kudu模块及配置基本信息

6.配置Kudu的Master、Table、Operation等

Kudu Masters:可以配置多个,多个地址以“,”分割

Table Name:如果使用Impala创建的Kudu表则需要添加impala::前缀

Field to ColumnMapping:配置Json中key与Kudu表的column的映射关系,如果字段名称一致则不需要配置。

DefaultOpertation:设置操作类型如:insert、upsert、delete

Kudu模块高级配置使用默认配置

5.流程测试验证


1.启动kafka2kudu的Pipline,启动成功如下图显示

2.在命令行运行run.sh脚本向Kafka发送消息

[root@master kafka-run]# sh run.sh ods_user_600.txt 

(可左右滑动)

上面执行了两次脚本。

3.在命令行运行run.sh脚本向Kafka发送消息

点击Kudu模块,查看监控信息

4.查看Kudu的ods_deal_daily_kudu表内容

入库的数据总条数

可以看到ods_deal_daily_kudu表的总条数与准备的测试数据量一致。

GitHub地址:

https://github.com/fayson/cdhproject/tree/master/kafkademo/readlocalfile-kafka-shell

https://github.com/fayson/cdhproject/blob/master/kafkademo/src/main/java/com/cloudera/nokerberos/ReadFileToKafka.java

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表