程序员开发实例大全宝库

网站首页 > 编程文章 正文

kettle-循环抽取,插入kafka(kettle循环遍历)

zazugpt 2024-10-20 16:05:41 编程文章 295 ℃ 0 评论

最近,在docker环境下,使用kettle 8循环查询数据库数据后,插入kafka集群中。特此记录。

docker配置kettle

  1. 准备安装介质 ,需要自行下载。
  • pdi-ce-8.2.0.0-342.zip
  • jdk-8u161-linux-x64.tar.gz
  1. 自定义dockerfile

由于kettle需要用图形化界面进行配置,所以需要对centos镜像进行初步配置。dockerfile文件内容为:

FROM centos:7
RUN rm -rf /etc/localtime && ln -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \
&& yum -y install webkitgtk4 redhat-lsb bzip2 wget kde-l10n-Chinese glibc-common && yum groupinstall "fonts" -y \
&&localedef -c -f UTF-8 -i zh_CN zh_CN.utf8 && yum clean all \
&& wget http://ftp.mozilla.org/pub/mozilla.org/xulrunner/nightly/2012/03/2012-03-02-03-32-11-mozilla-1.9.2/xulrunner-1.9.2.28pre.en-US.linux-x86_64.tar.bz2 \
&& tar -jxf xulrunner-1.9.2.28pre.en-US.linux-x86_64.tar.bz2 && rm -f  xulrunner-1.9.2.28pre.en-US.linux-x86_64.tar.bz2 \
&& cd xulrunner/ && ./xulrunner -register-global
ENV LC_ALL zh_CN.UTF-8
ENV JAVA_HOME /opt/jdk1.8.0_161
ENV PATH $JAVA_HOME/bin:$PATH
  1. 编写docker-comose.yml文件

将安装介质解压后,将安装介质解压后目录映射到docker容器中。

映射jdk到容器: ./resource/jdk1.8.0_161:/opt/jdk1.8.0_161

映射kettle到容器:./resource/data-integration:/opt/kettle

映射kettle配置后生成的文件:- ./resource/kettle_files:/opt/kettle_files

version: '3.7'
networks:
  net_docker:
    external: true
services:
  kettle8.2:
    image: kettle8:0.1
    container_name: kettle8.2
    networks:
      - net_docker
 # 安装介质目录映射
    volumes:
      - ./resource/jdk1.8.0_161:/opt/jdk1.8.0_161
      - ./resource/data-integration:/opt/kettle
      - ./resource/kettle_files:/opt/kettle_files
    command: [/bin/bash]
    tty: true
    stdin_open: true
  1. 启动kettle
  • 首先运行构建dockerfile(注意目录路径)
[root@jk-s-monitor dockerFile]# pwd
/opt/business/kettle8.2/dockerFile
[root@jk-s-monitor dockerFile]# docker build . -t kettle8:0.1
  • 将安装介质拷贝到指定目录后解压
[root@jk-s-monitor resource]# pwd
/opt/business/kettle8.2/resource
[root@jk-s-monitor resource]# ls
data-integration  jdk1.8.0_161  jdk-8u161-linux-x64.tar.gz  kettle_files  pdi-ce-8.2.0.0-342.zip
  • 启动docker容器
[root@jk-s-monitor kettle8.2]# ls
docker-compose.yml  dockerFile  resource
[root@jk-s-monitor kettle8.2]# docker-compose up
  • 进入容器后启动kettle
[root@jk-s-monitor kettle8.2]# docker exec -it kettle8.2 /bin/bash
[root@62e156d9cf1c /]# export DISPLAY=192.168.157.126:0.0
[root@62e156d9cf1c /]# /opt/kettle/spoon.sh
....

在终端打开xmanager-passive


打开kettle

循环实现

  • 需求描述

需要采集2个数据库中数据。2个数据库的表结构相同。

分析:

有2个数据库配置,使用变量灵活代替配置。

需要执行2次数据采集,使用循环解决重复劳动。

  • 需求实现
  1. 创建配置表

因为数据还有其他的计算需求,这里,笔者使用数据库表的方式,进行数据库配置信息存储,(简单一点,也可以使用配置文件,如json 、yml等)。

在配置数据库(oracle)中创建配置信息表

create table kettle_db_info
(
  db_name VARCHAR2(10),
  ip      VARCHAR2(16),
  username VARCHAR2(44),
  passwd  VARCHAR2(530),
  port    VARCHAR2(5)
);
INSERT INTO kettle_db_info VALUES('orcl','10.0.0.0','username0','passwd0','1521');
INSERT INTO kettle_db_info VALUES('orcl','10.0.0.1','username1','passwd1','1521');
COMMIT;
  1. kettl中创建输入转换

在左边菜单中选择 输入》表输入,配置表输入转换如下图:


在左边菜单中选择 作业》复制记录到结果,配置完成后如下图:


  1. 配置数据插入kafka 转换

选择表输入、需要新建数据库连接,配置的时候需要用变量代替。

根据业务逻辑编写sql,这里笔者将数据进行json格式包装:

分别选择 输出》JSON output 和 Streaming》Kafka producer,选择后如下图:


  • 配置 输出》JSON output


配置字段

  • 配置 Streaming》Kafka producer
  1. 创建循环执行的job

新建job如下图:


其中 转换(in_data) 的配置为:

其中 转换(in_kafka)配置为:

选项 执行每一个输入行 必须要选择才能循环执行。


需要将变量名称进行映射,才能使用变量。

执行job 并 验证kafka数据

执行job:


在kafka中验证数据

# docker exec -it 8829606db8f8 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --topic kettle_kafka --from-beginning
{"tb_name":"XXARTMENTDX","tbs_name":"xxx"}
{"tb_name":"XXMPANYDEFIX","tbs_name":"xxx"}

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表