ETL之kettle进行二次开发简单demo
- 格式:docx
- 大小:19.99 KB
- 文档页数:5
ETL:kettleSpoon转换+作业
Kettle能做什么?
前⾔:需将db2中数据导⼊到mysql中,利⽤etl⼯具进⾏多表转换。
以此为切⼊点,系统整理、学习kettle⼯具。
提醒:
kettle是纯java编写,机器需要有jre环境。
它允许管理来⾃不同数据库的数据,通过⼀个图形化的⽤户环境来描述你想要什么。
kettle中有两种脚本⼯作。
1)transformation(转换):完成针对数据的基础转换。
后缀名 .ktr
2)job(作业):完成整个⼯作流的控制。
后缀名 .kjb
Kettle是⼀款国外著名的开源etl⼯具,Kettle中⽂版需要在java环境下运⾏,Kettle免费版的中⽂名字叫做⽔壶,程序员希望将所有格式的数据集中在⼀起,然后以⼀种特定的格式流出。
Kettle免费版操作简单,任何⽤户都可以快速掌握。
sqlserver :如果选择下⾯报错,选择上⾯的。
如果Kettle 连接类型采⽤MS SQL Server(Native),会报“JDBC Driver class not found: com.microsoft.sqlserver.jdbc.SQLServerDriver”错误,此时只要将sqljdbc4.jar(oracle 是ojdbc5.jar)包放⼊data-integration\libswt对应操作系统⽂件夹下,重启Kettle即可连接上。
kettle中源和⽬标表结构不⼀致的情况处理:Kettle教程1(转换):
Kettle教程2(转换):。
kettle 8.2 ETL项目实战教程01项目简介01 Sakila简介业务简介Sakila是在线DVD租赁商店系统的数据库,Sakila示例数据库最初由MySQL AB文档团队的前成员Mike Hillyer开发,现在由MySQL文档团队维护和销售。
Sakila的主要目的是支撑DVD 租赁商店的业务流程,下面列举了一些业务流程活动中的关键点来理解数据库是如何支撑的。
●每个商店维护自己的租赁影片清单,当客户取走或归还DVD光盘时会有一个专门的店员对这个清单进行维护。
●影片描述的内容同样在维护信息范围之内,如分类(动作、冒险、喜剧等)、演员、等级、特殊分类(例如被删除的情节和预告片)。
这些信息可能被打印在DVD包装的标签上。
●必须在商店注册成为会员才可以租赁光盘。
●客户可以在任何一家商店租赁一张或多张光盘,同时,商店希望客户在每张光盘对应的租赁期内归还之前租赁的光盘。
●顾客可以在任意时间对任何租赁的光盘付费。
Sakila数据库的表13 payment 付款表14 rental 租赁表15 staff 工作人员表16 store 商店表总体设计规范●每张表都有自增主键列,列名采用“表名_id”的规则命名,如表film的自增列为film_id。
●外键约束引用主键,且名字与主键列的相同。
例如,表store的address_id列引用addess的address_id列。
●每张表有一个列叫做last_update,这是一个TIMESTAMP类型的字段,用来记录增加或更新数据时的时间。
sakila模型关系图02安装sakila数据库软件:MySQL 5.5Navicat for MySQL在sakila-db的文件夹下有两个SQL脚本文件。
sakila-schema.sql创建数据库和表sakila-data.sql 插入数据03维度建模维度建模基本概念维度建模(dimensional modeling)是专门用于分析型数据库、数据仓库、数据集市建模的方法。
kettle的基本介绍Kettle主要内容:⼀.ETL介绍⼆.Kettle介绍三.调⽤Kettle API⼀、ETL介绍1. ETL是什么?1).ETL分别是“Extract”、“ Transform” 、“Load”三个单词的⾸字母缩写也即数据抽取、转换、装载的过程,但我们⽇常往往简称其为数据抽取。
ETL包含了三⽅⾯:Extract(抽取):将数据从各种原始的业务系统中读取出来,这是所有⼯作的前提。
⼀般抽取过程需要连接到不同的数据源,以便为随后的步骤提供数据。
这⼀部分看上去简单⽽琐碎,实际上它是 ETL 解决⽅案的成功实施的⼀个主要障碍。
Transform(转换):按照预先设计好的规则将抽取得数据进⾏转换,使本来异构的数据格式能统⼀起来。
任何对数据的处理过程都是转换。
这些处理过程通常包括(但不限于)下⾯⼀些操作:移动数据根据规则验证数据数据内容和的修改将多个数据源的数据集成根据处理后的数据计算派⽣值和聚集值Load(装载):将转换完的数据按计划增量或全部导⼊到数据仓库中。
也就是说将数据加载到⽬标系统的所有操作。
2).ETL是(Extract)、清洗(Cleaning)、转换(Transform)、装载(Load)的过程。
是构建的重要⼀环,⽤户从抽取出所需的数据,经过,最终按照预先定义好的数据仓库模型,将数据加载到数据仓库中去。
3).ETL是BI/DW( Business Intelligence/Data Warehouse , 商务智能/数据仓库)的核⼼和灵魂,按照统⼀的规则集成并提⾼数据的价值,是负责完成数据从数据源向⽬标数据仓库转化的过程,是实施数据仓库的重要步骤。
DW(Data Warehouse)即数据仓库:这个概念是由被誉为“数据仓库之⽗”的WilliamH.Inmon博⼠提出的:数据仓库是⼀个⾯向主题的、集成的、随时间变化的、信息相对稳定的数据集合,它⽤于对企业管理和决策提供⽀持。
kettle etl设计方案1. 引言Kettle是一款强大的ETL(Extract, Transform, Load)工具,可以帮助用户实现数据的抽取、转换和加载。
本文档旨在介绍Kettle的设计方案,包括架构设计、任务调度、数据传输等重要内容。
2. 架构设计Kettle的架构由三个主要组件组成:Spoon、Pan和Kitchen。
•Spoon:是Kettle的图形用户界面工具,用于创建和编辑Kettle的转换和作业。
•Pan:是一个命令行工具,用于在命令行模式下执行Kettle的转换和作业。
•Kitchen:也是一个命令行工具,用于在命令行模式下执行Kettle的作业,和Pan相比,Kitchen可以更好地管理和监控作业的执行过程。
Kettle的架构还包括一个元数据数据库,用于存储转换和作业的定义、日志、错误信息等。
3. 任务调度Kettle提供了多种任务调度的方式,可以根据实际需求选择合适的方式。
3.1 定时任务调度Kettle可以通过定时任务调度器(例如Quartz)实现定时执行转换和作业。
用户可以配置任务的执行时间、频率和参数等信息,实现自动化的数据处理。
3.2 事件触发任务调度Kettle还支持通过事件触发的任务调度。
用户可以定义一个事件,当满足特定条件时触发任务的执行。
例如,可以设置一个文件变化的事件,当指定的文件发生变化时,触发转换或作业的执行。
4. 数据传输Kettle支持多种数据传输方式,可以方便地从源数据源中抽取数据、进行转换,并加载到目标数据源中。
4.1 数据抽取Kettle可以从各种关系型数据库、文件系统、Web服务等数据源中抽取数据。
用户可以通过Kettle提供的数据抽取组件,配置数据源的连接信息、查询语句等参数,实现数据的快速抽取。
4.2 数据转换Kettle的转换组件提供了丰富的数据转换功能,包括数据清洗、数据过滤、字段映射、数据计算等操作。
用户可以通过拖拽转换组件并连接它们,定义数据的转换逻辑和处理流程。
概览Kettle也叫PDI(全称是Pentaho Data Integeration),是一款开源的ETL工具,项目开始于2003年,2006年加入了开源的BI 组织Pentaho, 正式命名为PDI。
官方网站:/术语1.Transformation转换步骤,可以理解为将一个或者多个不同的数据源组装成一条数据流水线。
然后最终输出到某一个地方,文件或者数据库等。
2.Job作业,可以调度设计好的转换,也可以执行一些文件处理(比较,删除等),还可以ftp上传,下载文件,发送邮件,执行shell命令等,3.Hop连接转换步骤或者连接Job(实际上就是执行顺序)的连线Transformation hop:主要表示数据的流向。
从输入,过滤等转换操作,到输出。
Job hop:可设置执行条件:1,无条件执行2,当上一个Job执行结果为true时执行3,当上一个Job执行结果为false时执行Kettle,etl设计及运行1.Kettle整体结构图Kettle整体结构图2.转换设计样例图绿色线条为hop,流水线转换设计样例3.运行方式使用java web start 方式运行的配置方法命令行方式1)Windows下执行kitchen.bat,多个参数之间以“/”分隔,Key和value以”:”分隔例如:kitchen.bat /file:F:\samples\demo-table2table.ktr /level:Basic /log:test123.log/file:指定转换文件的路径/level:执行日志执行级别/log: 执行日志文件路径2)Linux下执行kitchen.sh,多个参数之间以“-”分隔,Key和value以”=”分隔kitchen.sh -file=/home/updateWarehouse.kjb -level=Minimal如果设计的转换,Job是保存在数据库中,则命令如下:Kitchen.bat /rep:资源库名称/user:admin /pass:admin /job:job名4.Xml保存转换,job流程设计用户定义的作业可以保存在(xml格式)中或某一个特定的数据库中转换的设计文件以.ktr结尾(xml文格式),保存所有配置好的数据库连接,文件相对路径,字段映射关系等信息。
Kettle命令行使用说明Kettle命令行使用说明1.KETTLE简介说到ETL开源项目,Kettle当属翘首,项目名称很有意思,水壶。
按项目负责人Matt 的说法:把各种数据放到一个壶里,然后呢,以一种你希望的格式流出。
呵呵,外国人都很有联想力。
看了提供的文档,然后对发布程序的简单试用后,可以很清楚得看到Kettle的四大块:1)Chef——工作(job)设计工具(GUI方式)2)Kitchen——工作(job)执行器(命令行方式)3)Spoon——转换(transform)设计工具(GUI方式)4)Span——转换(trasform)执行器(命令行方式)1.1.Chef——工作(job)设计器这是一个GUI工具,操作方式主要通过拖拖拉拉,勿庸多言,一看就会。
何谓工作?多个作业项,按特定的工作流串联起来,开成一项工作。
正如:我的工作是软件开发。
我的作业项是:设计、编码、测试!先设计,如果成功,则编码,否则继续设计,编码完成则开始设计,周而复始,作业完成。
1.1.1.Chef中的作业项包括:1)转换:指定更细的转换任务,通过Spoon生成。
通过Field来输入参数;2)SQL:sql语句执行;3)FTP:下载ftp文件;4)邮件:发送邮件;5)检查表是否存在;6)检查文件是否存在;7)执行shell脚本:如dos命令。
8)批处理:(注意:windows批处理不能有输出到控制台)。
9)Job包:作为嵌套作业使用。
10)JavaScript执行:这个比较有意思,我看了一下源码,如果你有自已的Script引擎,可以很方便的替换成自定义Script,来扩充其功能;11)SFTP:安全的Ftp协议传输;12)HTTP方式的上/下传。
1.1.2.工作流如上文所述,工作流是作业项的连接方式。
分为三种:无条件,成功,失败,为了方便工作流使用,KETTLE提供了几个辅助结点单元(也可将其作为简单的作业项):Start单元:任务必须由此开始。
eclipse jenkins二次开发例子-回复Eclipse Jenkins 二次开发例子Eclipse 是一个非常流行的集成开发环境(IDE),而Jenkins 则是一个自动化构建和持续集成工具。
可以说,Eclipse 和Jenkins 都是软件开发中的重要工具。
本文将以Eclipse Jenkins 二次开发为主题,为您介绍如何进行这样的开发,并提供一步一步的指导。
Eclipse Jenkins 二次开发是一项令人兴奋和有挑战性的任务。
通过二次开发,您可以根据自己的需求对Jenkins 进行定制和扩展,以更好地满足项目的要求。
这样的开发工作需要一定的编程能力和对Jenkins 插件开发的了解。
那么,让我们一起来看看如何进行Eclipse Jenkins 二次开发吧。
首先,我们需要安装Eclipse 和Jenkins,并保证二者能够正常工作。
安装和配置Eclipse 比较简单,您只需按照官方指南一步一步操作即可。
安装Jenkins 的方式有很多种,可以选择通过容器(如Docker)部署Jenkins,也可以直接下载并安装Jenkins。
在本文中,我们将假设您已经正确安装并配置了Eclipse 和Jenkins。
接下来,我们需要创建一个Java 项目作为Eclipse Jenkins 二次开发的工作空间。
在Eclipse 中,选择"File"-> "New"-> "Project",然后选择"Java Project"。
在项目配置界面上,您可以填写项目的名称和存储位置,然后点击"Finish" 完成创建。
此时,您应该能够在Eclipse 的项目资源管理器中看到新创建的Java 项目。
在Eclipse 中,我们可以通过插件进行Jenkins 二次开发。
幸运的是,Eclipse 提供了一些方便的插件来帮助我们创建和管理Jenkins 插件。
目录1. ETL知识 (3)1.1. ETL定义 (3)1.1.1. 定义 (3)1.1.2. 前提 (3)1.1.3. 原则 (3)1.2. 模式及比较 (4)1.3. ETL过程 (7)1.3.1. 总流程 (7)1.3.2. 数据抽取流程 (8)1.3.3. 数据清洗流程 (8)1.3.4. 数据转换流程 (10)1.3.5. 数据加载流程 (11)1.4. 问题分析 (12)1.4.1. 字符集问题 (12)1.4.2. 缓慢变化维处理 (14)1.4.3. 增量、实时同步的处理 (14)1.4.4. 断点续传 (15)1.5. ETL工具 (15)2. Kettle简介及使用 (16)2.1. 什么Kettle? (16)2.2. 下载及安装Kettle (17)2.3. Kettle简单例子 (19)2.3.1. 启动Kettle (19)2.3.2. 创建transformation过程 (20)2.3.3. 创建job过程 (41)2.3.4. 命令行运行ktr和kjb (45)1.ETL知识1.1.ETL定义1.1.1.定义●定义:数据的抽取(Extract)、转换(Transform)、装载(Load)的过程。
●目标:数据优化。
以最小代价(包括对日常操作的影响和对技能的要求) 将针对日常业务操作的数据转化为针对数据仓库而存储的决策支持型数据。
1.1.2.前提●确定ETL范围通过对目标表信息的收集,确定ETL的范围●选择ETL工具考虑资金运行的平台、对源和目标的支持程度、可编程的灵活性、对源数据变化的监测、数据处理时间的控制、管理和调度功能、对异常情况的处理●确定解决方案抽取分析、变化数据的捕获、目标表的刷新策略、数据的转换及数据验证1.1.3.原则●应尽量利用数据中转区对运营数据进行预处理。
保证数据的安全性、集成与加载的高效性。
●ETL的过程应是主动“拉取”,而不是从内部“推送”,其可控性将大为增强。
KETTLE SPOON使用手册融汇兴业 开发部高健2010年12月MSN: hhjtu524@1.到官方网站下载/2.配置JA V A环境,解压下载包到英文路径下。
如:D:\pdi-ce-4.0.1-stable\data-integration3.文档说明:本文只介绍基本的同步和定时JOB,其他高级操作请自行拓展。
4.工作区介绍:运行安装目录下Spoon.bat欢迎界面:欢迎界面后进入工作平台,一般应用中使用转换和JOB即可。
双击转换即新建转换作用。
通过主对象树可以配置所要用到的数据库连接。
配置好后点击核心对象。
下图是JOB用到的基本元素。
5.建立转换选中列表中的的“表输入”、“表输出”、“插入/更新”,拖拽到右侧工作区。
鼠标选中节点键盘按shift键,拖拽即可连线。
下图是一个转换,有两个动作,一是直接从数据源表同步数据到目标表,一是检查源表的插入和更新,同步到目标表。
黑色线为生效,灰色线为失效,节点和连接可双击进行编辑。
可预览数据是否正确。
双击编辑输出动作。
点击“Enter field mapping”进行字段选择。
选择要用的字段后点确定,注意字段的约束条件。
说明:现输入和输出表为不同库的两个同构数据表,ID为主键。
编辑好后点击上面的执行即可。
左侧参数暂不说明,执行完成后可以观看下方的日志。
双击编辑“插入/更新”动作。
然后点击运行。
执行前,在源库里加条新纪录。
确定作业执行正常后可以保存作业,如保存路径到本地磁盘地址C:\Documents and Settings\w_gaoj\My Documents\etl2.ktr。
6.建立定时执行的JOB建立JOB的前提是有可以完整执行的作业,上面步骤定制的作业进行保存。
从左侧列表拖拽“START”,“Success”,“Transformation”到工作区并连接,如下图。
然后双击“START”动作进行编辑。
然后双击编辑“Transformation”活动。
demo,利用kettle的api,将一个数据源中的信息导入到另外一个数据源中:[java]view plain copy1.package .saidi.job;2.3.import mons.io.FileUtils;4.import org.pentaho.di.core.KettleEnvironment;5.import org.pentaho.di.core.database.DatabaseMeta;6.import org.pentaho.di.core.exception.KettleDatabaseException;7.import org.pentaho.di.core.exception.KettleXMLException;8.import org.pentaho.di.core.plugins.PluginRegistry;9.import org.pentaho.di.core.plugins.StepPluginType;10.import org.pentaho.di.trans.TransHopMeta;11.import org.pentaho.di.trans.TransMeta;12.import org.pentaho.di.trans.step.StepMeta;13.import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;14.import org.pentaho.di.trans.steps.tableinput.TableInputMeta;15.16.import java.io.File;17.18./**19. * Created by 戴桥冰 on 2017/1/16.20. */21.public class TransDemo {22.23.public static TransDemo transDemo;24.25./**26. * 两个库中的表名27. */28.public static String bjdt_tablename = "test1";29.public static String kettle_tablename = "test2";30.31./**32. * 数据库连接信息,适用于DatabaseMeta其中一个构造器DatabaseMeta(String xml)33. */34.public static final String[] databasesXML = {35.36."<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +37."<connection>" +38."<name>bjdt</name>" +39."<server>192.168.1.122</server>" +40."<type>Mysql</type>" +41."<access>Native</access>" +42."<database>daiqiaobing</database>" +43."<port>3306</port>" +44."<username>root</username>" +45."<password>root</password>" +46."</connection>",47."<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +48."<connection>" +49."<name>kettle</name>" +50."<server>192.168.1.122</server>" +51."<type>Mysql</type>" +52."<access>Native</access>" +53."<database>daiqiaobing</database>" +54."<port>3306</port>" +55."<username>root</username>" +56."<password>root</password>" +57."</connection>"58.59. };60.61.public static void main(String[] args) {62.try {63. KettleEnvironment.init();64. transDemo = new TransDemo();65. TransMeta transMeta = transDemo.generateMyOwnTrans();66. String transXml = transMeta.getXML();67. String transName = "etl/update_insert_Trans.ktr";68. File file = new File(transName);69. FileUtils.writeStringToFile(file, transXml, "UTF-8");70. System.out.println(databasesXML.length+"\n"+databasesXML[0]+"\n"+databasesXML[1]);71. } catch (Exception e) {72. e.printStackTrace();73.return;74. }75. }76.77./**78. * 生成一个转化,把一个数据库中的数据转移到另一个数据库中,只有两个步骤,第一个是表输入,第二个是表插入与更新操作79. * @return80. * @throws KettleXMLException81. */82.public TransMeta generateMyOwnTrans() throws KettleXMLException, KettleDatabaseException {83. System.out.println("************start to generate my own transformation***********");84. TransMeta transMeta = new TransMeta();85.//设置转化的名称86. transMeta.setName("insert_update");87.//添加转换的数据库连接88.for (int i=0;i<databasesXML.length;i++){89. DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);90. transMeta.addDatabase(databaseMeta);91. }92.//registry是给每个步骤生成一个标识Id用93. PluginRegistry registry = PluginRegistry.getInstance();94.//第一个表输入步骤(TableInputMeta)95. TableInputMeta tableInput = new TableInputMeta();96. String tableInputPluginId = registry.getPluginId(StepPluginType.class, tableInput);97.//给表输入添加一个DatabaseMeta连接数据库98. DatabaseMeta database_bjdt = transMeta.findDatabase("bjdt");99. tableInput.setDatabaseMeta(database_bjdt);100. String select_sql = "SELECT name FROM "+bjdt_tablename;101. tableInput.setSQL(select_sql);102.103.//添加TableInputMeta到转换中104. StepMeta tableInputMetaStep = new StepMeta(tableInputPluginId,"tab le input",tableInput);105.//给步骤添加在spoon工具中的显示位置106. tableInputMetaStep.setDraw(true);107. tableInputMetaStep.setLocation(100, 100);108. transMeta.addStep(tableInputMetaStep);109.110.//第二个步骤插入与更新111. InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta(); 112. String insertUpdateMetaPluginId = registry.getPluginId(StepPluginTy pe.class,insertUpdateMeta);113.//添加数据库连接114. DatabaseMeta database_kettle = transMeta.findDatabase("kettle"); 115. insertUpdateMeta.setDatabaseMeta(database_kettle);116.//设置操作的表117. insertUpdateMeta.setTableName(kettle_tablename);118.//设置用来查询的关键字119. insertUpdateMeta.setKeyLookup(new String[]{"name"});120. insertUpdateMeta.setKeyStream(new String[]{"name"});121. insertUpdateMeta.setKeyStream2(new String[]{""});//一定要加上122. insertUpdateMeta.setKeyCondition(new String[]{"="});123.124.//设置要更新的字段125. String[] updatelookup = {"name"} ;126.127. String [] updateStream = {"name"};128. Boolean[] updateOrNot = {true};129. insertUpdateMeta.setUpdateLookup(updatelookup);130. insertUpdateMeta.setUpdateStream(updateStream);131. insertUpdateMeta.setUpdate(updateOrNot);132. String[] lookup = insertUpdateMeta.getUpdateLookup();133.//添加步骤到转换中134. StepMeta insertUpdateStep = new StepMeta(insertUpdateMetaPluginId, "insert_update",insertUpdateMeta);135. insertUpdateStep.setDraw(true);136. insertUpdateStep.setLocation(250,100);137. transMeta.addStep(insertUpdateStep);138.//***************************************************************** *139.140.//***************************************************************** *141.142.//添加hop把两个步骤关联起来143. transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertU pdateStep));144. System.out.println("***********the end************");145.return transMeta;146. }147.148.}上述操作将会产生一个ktr文件,接下来的操作是对ctr文件进行转换:[java]view plain copy1.public static void main(String[] args) throws KettleException {2.//初始化ketlle3. KettleEnvironment.init();4.//创建转换元数据对象5. TransMeta meta = new TransMeta("etl/update_insert_Trans.ktr");6. Trans trans = new Trans(meta);7. trans.prepareExecution(null);8. trans.startThreads();9. trans.waitUntilFinished();10.if(trans.getErrors()!=0){11. System.out.println("执行失败!");12. }13. }。