如何在Oracle中集成Hadoop
- 格式:pdf
- 大小:563.50 KB
- 文档页数:11
Hadoop平台上大数据处理的使用教程大数据处理已经成为当今企业分析和决策制定的关键领域。
Hadoop 作为一个开源的大数据处理平台,提供了一种可靠、可扩展且高效的方式来存储和处理大规模数据。
本教程将为您介绍如何在Hadoop平台上进行大数据处理,并给出详细的操作步骤和示例。
## 第一部分:Hadoop平台的安装和配置1. 下载Hadoop:从Hadoop官方网站获取最新的Hadoop二进制文件,并解压到本地文件夹。
2. 配置Hadoop环境:编辑Hadoop配置文件,设置必要的参数,如文件系统地址、数据节点和任务跟踪器等。
3. 启动Hadoop:通过命令行启动Hadoop集群,可以使用start-all.sh脚本来同时启动所有的Hadoop进程。
## 第二部分:Hadoop中的存储和数据管理1. 分布式文件系统(HDFS):使用Hadoop分布式文件系统来存储大规模数据。
学习如何创建、移动和删除文件,以及如何设置和管理文件的副本。
2. Hadoop YARN:了解Hadoop的资源管理系统,学习如何提交和管理应用程序,并了解如何配置YARN队列来优化任务调度。
3. 数据加载和导入:学习如何将数据加载到Hadoop集群中,可以通过命令行工具或使用Hadoop的数据导入工具,如Sqoop和Flume。
## 第三部分:Hadoop上的数据处理1. MapReduce:学习如何使用MapReduce编写分布式数据处理程序。
使用Java编写Map和Reduce函数,将任务提交到Hadoop集群,并了解如何监视任务的执行。
2. Pig:了解Pig语言,它是一种类似SQL的高级脚本语言,用于高效地进行数据分析和转换。
学习如何编写Pig脚本,并将其提交到Hadoop集群进行执行。
3. Hive:学习如何使用Hive进行数据查询和分析。
了解Hive的数据模型和查询语言(HiveQL),并编写HiveQL脚本来查询和处理数据。
datax使用案例datax是阿里巴巴集团开源的一个用于大数据实时同步导入导出的数据框架,它提供了全量同步和增量同步的功能,支持多种数据源和目标存储介质。
下面是关于datax使用案例的介绍。
1. 数据库之间的数据同步使用datax可以方便地将一个数据库中的数据同步到另一个数据库中,可以是同一个数据库系统,也可以是不同的数据库系统。
比如将MySQL中的数据同步到Oracle数据库中,或将Oracle中的数据同步到MySQL中。
2. 数据仓库的数据同步datax可以将数据从各种数据源(如关系型数据库、NoSQL数据库、Hadoop等)同步到数据仓库中,如Hive、HBase等。
通过datax 的配置,可以实现数据的全量同步和增量同步,保证数据的一致性和准确性。
3. 日志数据的实时导入datax可以将实时产生的日志数据导入到数据存储介质中,如Hadoop、Hive等。
通过datax的配置,可以实现日志数据的实时导入,方便进行后续的分析和处理。
4. 数据库到文件的导出datax可以将数据库中的数据导出到文件中,如CSV、Excel等。
通过datax的配置,可以选择需要导出的表和字段,并指定导出的文件格式和存储位置。
5. 文件到数据库的导入datax可以将文件中的数据导入到数据库中,如CSV、Excel等。
通过datax的配置,可以选择需要导入的文件和字段,并指定导入的数据库表和字段。
6. 不同数据库之间的数据迁移使用datax可以方便地将一个数据库中的数据迁移到另一个数据库中,可以是同一个数据库系统,也可以是不同的数据库系统。
比如将MySQL中的数据迁移到Oracle数据库中,或将Oracle中的数据迁移到MySQL中。
7. 数据库到Hadoop的导出datax可以将数据库中的数据导出到Hadoop中,方便进行大数据分析和处理。
通过datax的配置,可以选择需要导出的表和字段,并指定导出的Hadoop存储位置。
idea连接虚拟机hadoop的详细步骤以下是连接虚拟机和Hadoop的详细步骤:1. 启动虚拟机:首先,启动虚拟机,确保虚拟机的操作系统已经正常运行。
2. 检查网络设置:在虚拟机中,检查网络设置是否正确配置。
确保能够与主机(外部)网络通信。
3. 安装Java JDK:检查虚拟机是否已安装Java JDK。
如果没有安装,则需要通过下载JDK并按照安装说明进行安装。
4. 下载Hadoop:从Apache Hadoop官方网站上下载所需版本的Hadoop。
下载完毕后,将文件保存到虚拟机中的合适位置。
5. 解压Hadoop文件:将下载的Hadoop文件解压到虚拟机上的合适位置。
可以使用以下命令解压tar.gz文件:`tar -zxvf hadoop-x.x.x.tar.gz`其中,x.x.x代表具体版本号。
6. 配置Hadoop环境变量:打开虚拟机中的终端,并编辑`~/.bashrc`文件。
在文件末尾添加以下行:`export HADOOP_HOME=/path/to/hadoop` (将/path/to/hadoop替换为实际Hadoop目录的路径)`export PATH=$PATH:$HADOOP_HOME/bin``export PATH=$PATH:$HADOOP_HOME/sbin``export HADOOP_MAPRED_HOME=$HADOOP_HOME``export HADOOP_COMMON_HOME=$HADOOP_HOME` `export HADOOP_HDFS_HOME=$HADOOP_HOME``export YARN_HOME=$HADOOP_HOME``exportHADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/ lib/native``export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/native"`保存文件并执行以下命令使环境变量生效:`source ~/.bashrc`7. 配置Hadoop:进入Hadoop安装目录,并编辑`hadoop-env.sh`文件。
组建hadoop集群实验报告一、实验目的本次实验的目的是通过组建Hadoop 集群,熟悉和掌握Hadoop 的部署过程和相关技术,加深对分布式计算的理解并掌握其应用。
二、实验环境- 操作系统:Ubuntu 20.04- Hadoop 版本:3.3.0- Java 版本:OpenJDK 11.0.11三、实验步骤1. 下载和安装Hadoop在官方网站下载Hadoop 的二进制文件,并解压到本地的文件夹中。
然后进行一些配置,如设置环境变量等,以确保Hadoop 可以正常运行。
2. 配置Hadoop 集群a) 修改核心配置文件在Hadoop 的配置目录中找到`core-site.xml` 文件,在其中添加以下配置:xml<configuration><property><name>fs.defaultFS</name><value>hdfs:localhost:9000</value></property></configuration>b) 修改HDFS 配置文件在配置目录中找到`hdfs-site.xml` 文件,在其中添加以下配置:xml<configuration><property><name>dfs.replication</name><value>1</value></property></configuration>c) 修改YARN 配置文件在配置目录中找到`yarn-site.xml` 文件,在其中添加以下配置:xml<configuration><property><name>yarn.resourcemanager.hostname</name><value>localhost</value></property><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property><property><name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</nam e><value>org.apache.hadoop.mapred.ShuffleHandler</value></property></configuration>3. 启动Hadoop 集群在终端中执行以下命令来启动Hadoop 集群:bashstart-all.sh这将启动Hadoop 中的所有守护进程,包括NameNode、DataNode、ResourceManager 和NodeManager。
Hadoop集群的搭建方法与步骤随着大数据时代的到来,Hadoop作为一种分布式计算框架,被广泛应用于数据处理和分析领域。
搭建一个高效稳定的Hadoop集群对于数据科学家和工程师来说至关重要。
本文将介绍Hadoop集群的搭建方法与步骤。
一、硬件准备在搭建Hadoop集群之前,首先要准备好适合的硬件设备。
Hadoop集群通常需要至少三台服务器,一台用于NameNode,两台用于DataNode。
每台服务器的配置应该具备足够的内存和存储空间,以及稳定的网络连接。
二、操作系统安装在选择操作系统时,通常推荐使用Linux发行版,如Ubuntu、CentOS等。
这些操作系统具有良好的稳定性和兼容性,并且有大量的Hadoop安装和配置文档可供参考。
安装操作系统后,确保所有服务器上的软件包都是最新的。
三、Java环境配置Hadoop是基于Java开发的,因此在搭建Hadoop集群之前,需要在所有服务器上配置Java环境。
下载最新版本的Java Development Kit(JDK),并按照官方文档的指引进行安装和配置。
确保JAVA_HOME环境变量已正确设置,并且可以在所有服务器上运行Java命令。
四、Hadoop安装与配置1. 下载Hadoop从Hadoop官方网站上下载最新的稳定版本,并将其解压到一个合适的目录下,例如/opt/hadoop。
2. 编辑配置文件进入Hadoop的安装目录,编辑conf目录下的hadoop-env.sh文件,设置JAVA_HOME环境变量为Java的安装路径。
然后,编辑core-site.xml文件,配置Hadoop的核心参数,如文件系统的默认URI和临时目录。
接下来,编辑hdfs-site.xml文件,配置Hadoop分布式文件系统(HDFS)的相关参数,如副本数量和数据块大小。
最后,编辑mapred-site.xml文件,配置MapReduce框架的相关参数,如任务调度器和本地任务运行模式。
如何使用Hadoop处理大数据随着互联网和互联技术的飞速发展,数据的规模不断扩大,如何高效地管理和处理海量的数据成为了各个领域所面临的重要挑战。
在这个背景下,Hadoop作为一种分布式计算框架,逐渐走进了大数据处理的领域。
本文旨在介绍Hadoop的基本概念和使用方法,以帮助读者更好地理解和应用此工具来处理大数据。
一、Hadoop概述Hadoop是一个开源的、基于Java语言的分布式计算框架。
最初由Apache公司开发,并在2006年贡献给了Apache开源社区。
Hadoop是基于Google公司研发的Google File System(GFS)和MapReduce的思想而发展出来的。
它主要包括Hadoop Distributed File System(HDFS)和MapReduce两个模块。
HDFS主要用于大数据的存储,而MapReduce则是基于HDFS的数据计算框架。
除此之外,Hadoop还包括一些较为基础的组件,如ZooKeeper、HBase、Spark等。
二、Hadoop的使用1. Hadoop的安装Hadoop的安装比较简单,只需要下载Hadoop的安装包、运行相应的脚本即可。
但在安装过程中,需要进行一些参数配置和环境变量的设置,才能够使Hadoop正常运行。
安装完成后,可以通过执行“hadoop version”来检查安装结果。
2. Hadoop的使用使用Hadoop主要可以通过以下两种方式:(1)Hadoop shell:Hadoop shell是一个基于命令行的操作界面,可以通过HDFS shell和MapReduce shell两个模块来进行大数据的存储和计算操作。
比如,可以通过hadoop fs -ls /来查看当前HDFS 中的文件目录,通过hadoop fs -put local_file_path hadoop_path来将本地文件上传到HDFS中,通过hadoop jar mapreducejarinput_path output_path JobName来运行Hadoop的MapReduce程序。
Hadoop集群搭建步骤1.先建⽴⼀台虚拟机,分配内存2G,硬盘20G,⽹络为nat 模式,设置⼀个静态的ip 地址: 例如设定3台机器的ip 为192.168.63.167(master) 192.16863.168(slave1) 192.168.63.169 (slave2)2.修改第⼀台主机的⽤户名3.复制master⽂件两次,重命名为slave1和slave2,打开虚拟机⽂件,然后按照同样的⽅法设置两个节点的ip和主机名4.建⽴主机名和ip的映射5.查看是否能ping通,关闭防⽕墙和selinux 配置6.配置ssh免密码登录在root⽤户下输⼊ssh-keygen -t rsa ⼀路回车秘钥⽣成后在~/.ssh/⽬录下,有两个⽂件id_rsa(私钥)和id_rsa.pub(公钥),将公钥复制到authorized_keys并赋予authorized_keys600权限同理在slave1和slave2节点上进⾏相同的操作,然后将公钥复制到master节点上的authoized_keys检查是否免密登录(第⼀次登录会有提⽰)7..安装JDK(省去)三个节点安装java并配置java环境变量8.安装MySQL(master 节点省去)9.安装SecureCRT或者xshell 客户端⼯具,然后分别链接上 3台服务器12.搭建集群12.1 集群结构三个结点:⼀个主节点master两个从节点内存2GB 磁盘20GB12.2 新建hadoop⽤户及其⽤户组⽤adduser新建⽤户并设置密码将新建的hadoop⽤户添加到hadoop⽤户组前⾯hadoop指的是⽤户组名,后⼀个指的是⽤户名赋予hadoop⽤户root权限12.3 安装hadoop并配置环境变量由于hadoop集群需要在每⼀个节点上进⾏相同的配置,因此先在master节点上配置,然后再复制到其他节点上即可。
将hadoop包放在/usr/⽬录下并解压配置环境变量在/etc/profile⽂件中添加如下命令12.4 搭建集群的准备⼯作在master节点上创建以下⽂件夹/usr/hadoop-2.6.5/dfs/name/usr/hadoop-2.6.5/dfs/data/usr/hadoop-2.6.5/temp12.5 配置hadoop⽂件接下来配置/usr/hadoop-2.6.5/etc//hadoop/⽬录下的七个⽂件slaves core-site.xml hdfs-site.xml mapred-site.xml yarn-site.xml hadoop-env.sh yarn-env.sh配置hadoop-env.sh配置yarn-env.sh配置slaves⽂件,删除localhost配置core-site.xml配置hdfs-site.xml配置mapred-site.xml配置yarn-site.xml将配置好的hadoop⽂件复制到其他节点上12.6 运⾏hadoop格式化Namenodesource /etc/profile13. 启动集群。
hadoop 操作手册Hadoop 是一个分布式计算框架,它使用 HDFS(Hadoop Distributed File System)存储大量数据,并通过 MapReduce 进行数据处理。
以下是一份简单的 Hadoop 操作手册,介绍了如何安装、配置和使用 Hadoop。
一、安装 Hadoop1. 下载 Hadoop 安装包,并解压到本地目录。
2. 配置 Hadoop 环境变量,将 Hadoop 安装目录添加到 PATH 中。
3. 配置 Hadoop 集群,包括 NameNode、DataNode 和 JobTracker 等节点的配置。
二、配置 Hadoop1. 配置 HDFS,包括 NameNode 和 DataNode 的配置。
2. 配置 MapReduce,包括 JobTracker 和 TaskTracker 的配置。
3. 配置 Hadoop 安全模式,如果需要的话。
三、使用 Hadoop1. 上传文件到 HDFS,使用命令 `hadoop fs -put local_file_path/hdfs_directory`。
2. 查看 HDFS 中的文件和目录信息,使用命令 `hadoop fs -ls /`。
3. 运行 MapReduce 作业,编写 MapReduce 程序,然后使用命令`hadoop jar my_` 运行程序。
4. 查看 MapReduce 作业的运行结果,使用命令 `hadoop fs -cat/output_directory/part-r-00000`。
5. 从 HDFS 中下载文件到本地,使用命令 `hadoop fs -get/hdfs_directory local_directory`。
6. 在 Web 控制台中查看 HDFS 集群信息,在浏览器中打开7. 在 Web 控制台中查看 MapReduce 作业运行情况,在浏览器中打开四、管理 Hadoop1. 启动和停止 Hadoop 集群,使用命令 `` 和 ``。
hadoop的基本使用Hadoop的基本使用Hadoop是一种开源的分布式计算系统和数据处理框架,具有可靠性、高可扩展性和容错性等特点。
它能够处理大规模数据集,并能够在集群中进行并行计算。
本文将逐步介绍Hadoop的基本使用。
一、Hadoop的安装在开始使用Hadoop之前,首先需要进行安装。
以下是Hadoop的安装步骤:1. 下载Hadoop:首先,从Hadoop的官方网站(2. 配置环境变量:接下来,需要将Hadoop的安装目录添加到系统的环境变量中。
编辑~/.bashrc文件(或其他相应的文件),并添加以下行:export HADOOP_HOME=/path/to/hadoopexport PATH=PATH:HADOOP_HOME/bin3. 配置Hadoop:Hadoop的配置文件位于Hadoop的安装目录下的`etc/hadoop`文件夹中。
其中,最重要的配置文件是hadoop-env.sh,core-site.xml,hdfs-site.xml和mapred-site.xml。
根据具体需求,可以在这些配置文件中进行各种参数的设置。
4. 启动Hadoop集群:在完成配置后,可以启动Hadoop集群。
运行以下命令以启动Hadoop集群:start-all.sh二、Hadoop的基本概念在开始使用Hadoop之前,了解一些Hadoop的基本概念是非常重要的。
以下是一些重要的概念:1. 分布式文件系统(HDFS):HDFS是Hadoop的核心组件之一,用于存储和管理大规模数据。
它是一个可扩展的、容错的文件系统,能够在多个计算机节点上存储数据。
2. MapReduce:MapReduce是Hadoop的编程模型,用于并行计算和处理大规模数据。
它由两个主要的阶段组成:Map阶段和Reduce阶段。
Map阶段将输入数据切分为一系列键值对,并运行在集群中的多个节点上。
Reduce阶段将Map阶段的输出结果进行合并和计算。
大数据技术实验报告大数据技术实验一Hadoop大数据平台安装实验1实验目的在大数据时代,存在很多开源的分布式数据采集、计算、存储技术,本实验将在熟练掌握几种常见Linux命令的基础上搭建Hadoop(HDFS、MapReduce、HBase、Hive)、Spark、Scala、Storm、Kafka、JDK、MySQL、ZooKeeper等的大数据采集、处理分析技术环境。
2实验环境个人笔记本电脑Win10、Oracle VM VirtualBox 5.2.44、CentOS-7-x86_64-Minimal-1511.iso3实验步骤首先安装虚拟机管理程序,然后创建三台虚拟服务器,最后在虚拟服务器上搭建以Hadoop 集群为核心的大数据平台。
3.1快速热身,熟悉并操作下列Linux命令·创建一个初始文件夹,以自己的姓名(英文)命名;进入该文件夹,在这个文件夹下创建一个文件,命名为Hadoop.txt。
·查看这个文件夹下的文件列表。
·在Hadoop.txt中写入“Hello Hadoop!”,并保存·在该文件夹中创建子文件夹”Sub”,随后将Hadoop.txt文件移动到子文件夹中。
·递归的删除整个初始文件夹。
3.2安装虚拟机并做一些准备工作3.2.1安装虚拟机下载系统镜像,CentOS-7-x86_64-Minimal-1511.iso。
虚拟机软件使用Oracle VM VirtualBox 5.2.44。
3.2.2准备工作关闭防火墙和Selinux,其次要安装perl 、libaio、ntpdate 和screen。
然后检查网卡是否开机自启,之后修改hosts,检查网络是否正常如图:然后要创建hadoop用户,之后多次用,并且生成ssh 密钥并分发。
最后安装NTP 服务。
3.3安装MYSQL 3.3.1安装3.3.2测试3.4安装ZooKeeper。
如何在Oracle中集成Hadoop文: SRC 张旭东许多垂直行业都在关注文件系统中庞大的数据。
这些数据中通常包含大量无关的明细信息,以及部分可用于趋势分析或丰富其他数据的精华信息。
尽管这些数据存储在数据库之外,但一些客户仍然希望将其与数据库中的数据整合在一起以提取对业务用户有价值的信息。
本文详细介绍了如何从 Oracle 数据库访问存储在 Hadoop 集群里的数据。
请注意,本文选择了 Hadoop 和 HDFS 作为示例,但这里的策略同样适用于其他分布式存储机制。
本文中介绍了各种访问方法,还通过一个具体示例说明了其中一种访问方法的实现。
要从 Oracle 数据库里访问某个文件系统中的外部文件或外部数据,最简单的方法莫过于使用外部表。
外部表以表的形式展示存储在文件系统中的数据,并且可在 SQL 查询中完全透明地使用。
因此,可以考虑用外部表从 Oracle 数据库中直接访问 HDFS(Hadoop 文件系统)中存储的数据。
遗憾的是,常规的操作系统无法调用外部表驱动直接访问 HDFS 文件。
FUSE(File System in Userspace)项目针对这种情况提供了解决方法。
有多种 FUSE 驱动程序支持用户挂载 HDFS 存储,并将其作为常规文件系统处理。
通过使用一个此类驱动程序,并在数据库实例上挂载 HDFS (如果是 RAC 数据库,则在其所有实例上挂载 HDFS),即可使用外部表基础架构轻松访问 HDFS 文件。
图 1. 用数据库内置的 MapReduce 通过外部表进行访问在图 1 中,我们利用 Oracle Database 11g 实现本文所述的数据库内的 map-reduce。
通常情况下,Oracle Database 11g 中的并行执行框架足以满足针对外部表大多数的并行操作。
在有些情况下(例如,如果 FUSE 不可用),外部表方法可能不适用。
Oracle 表函数提供了从 Hadoop 中获取数据的替代方法。
本文附带的示例展示了一种这样的方法。
更深入地来讲,我们用一个表函数来实现,这个表函数使用 DBMS_SCHEDULER 框架异步调用外部shell 脚本,然后Oracle 高级队列特性进行通信。
Hadoop mapper 将数据排入一个公共队列,而表函数则从该队列中取出数据。
由于该表函数能够并行运行,因此使用额外的逻辑来确保仅有一个服务进程提交外部作业。
图 2. 利用表函数进行并行处理由于表函数可以并行运行,Hadoop 流作业也可以不同程度地并行运行,并且后者不受Oracle 查询协调器的控制,这种情况下,队列能提供负载平衡。
下面我们将以一个实际示例展示图 2 的架构。
请注意,我们的示例仅展示了使用表函数访问Hadoop 中存储的数据的一个模板实现。
显然可能存在其他的甚至可能更好的实现。
下图是图 2 中原始示意图在技术上更准确、更具体的展示,解释了我们要在何处、如何使用后文给出的部分实际代码:图 3. 启动 Mapper 作业并检索数据第 1 步是确定由谁作为查询协调器。
对此我们采用一种将具有相同键值的记录写入表的简单机制。
首个插入胜出,作为此进程的查询协调器 (QC)。
请注意,QC 表函数调用同时也承担 着处理角色。
在第 2 步中,该表函数调用 (QC) 使用 dbms_scheduler(图 3 中的作业控制器)启动一个异步作业,该作业接着在 Hadoop 集群上运行同步 bash 脚本。
这个 bash 脚本就是图 3 中的启动程序 (launcher),它在 Hadoop 集群上启动 mapper 进程(第 3 步)。
mapper 进程处理数据,并在第 5 步写入一个队列。
在本文的示例中,我们选择了一个在集群范围内可用的队列。
现在,我们只是单纯地将任何输出直接写入到队列里。
您可以通过批量处理输出并将其移入队列来提高性能。
显然,您也可以选择管道和关系表等其他各种机制。
随后的第 6 步是出队过程,这是通过数据库中的表函数并行调用来实现的。
这些并行调用处理得到的数据将会提供给查询请求来使用。
表函数同时处理Oracle数据库的数据和来自队列 中的数据,并将来自两个来源的数据整合为单一结果集提供给最终用户。
图 4. 监控进程Hadoop的进程 (mapper) 启动之后,作业监控器进程将监视启动程序脚本。
一旦mapper 完成Hadoop 集群中数据的处理之后,bash 脚本即完成,如图 4 所示。
作业监控器将监视数据库调度程序队列,并在 shell 脚本完成时发出通知(第 7 步)。
作业监控器检查数据队列中的剩余数据元素(第 8 步)。
只要队列中存在数据,表函数调用就会继续处理数据(第 6 步)。
图 5. 关闭处理当表函数并行调用取出队列中的全部数据之后,作业监控器将终止队列(图 5 所示的第 9步)以确保 Oracle 中的表函数调用停止。
此时,所有数据均已交付给请求这些数据的查询。
本文中的示例表明,将 Hadoop 系统与 Oracle Database 11g 集成是非常容易的。
本文中讨论的方法允许客户将 Hadoop 中的数据直接传递到 Oracle 查询中。
这避免了将数据获取到本地文件系统并物化到 Oracle 表中,之后才能在 SQL 查询中访问这些数据的过程。
示例代码图3 至 图 5 实现的解决方案使用以下代码。
Oracle官方称:以下示例的所有代码均在 Oracle Database 11g 和 5 个节点的 Hadoop 集群上进行过测试。
处理数据的表函数该脚本中包含某些设置组件。
例如,脚本开始的部分创建了图 3 中第 1 步所展示的仲裁表。
本例中使用的是一直广受欢迎的 OE 模式。
connect oe/oe-- Table to use as locking mechanisim for the hdfs reader as-- leveraged in Figure 3 step 1DROP TABLE run_hdfs_read;CREATE TABLE run_hdfs_read(pk_id NUMBER,status VARCHAR2(100),PRIMARY KEY(pk_id));-- Object type used for AQ that receives the dataCREATE OR REPLACE TYPE hadoop_row_obj AS OBJECT(a NUMBER, b NUMBER);/connect/as sysdba-- system job to launch external script-- this job is used to eventually run the bash script-- described in Figure 3 step 3CREATE OR REPLACE PROCEDURE launch_hadoop_job_async (in_directory IN VARCHAR2,id NUMBER)IScnt NUMBER;BEGINBEGINDBMS_SCHEDULER.DROP_JOB ('ExtScript' || id,TRUE);EXCEPTIONWHEN OTHERSTHENNULL;END;-- Run a scriptDBMS_SCHEDULER.CREATE_JOB (job_name =>'ExtScript' || id,job_type =>'EXECUTABLE',job_action =>'/bin/bash',number_of_arguments =>1);DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE ('ExtScript' || id,1, in_directory);DBMS_SCHEDULER.ENABLE ('ExtScript' || id);-- Wait till the job is done. This ensures the hadoop job is completedLOOPSELECT COUNT(*)INTO cntFROM DBA_SCHEDULER_JOBSWHERE job_name ='EXTSCRIPT' || id;DBMS_OUTPUT.put_line ('Scheduler Count is ' || cnt);IF(cnt =0) THENEXIT;ELSEDBMS_LOCK.sleep (5);END IF;END LOOP;-- Wait till the queue is empty and then drop it-- as shown in Figure 5-- The TF will get an exception and it will finish quietlyLOOPSELECT SUM(c)INTO cntFROM(SELECT enqueued_msgs - dequeued_msgs cFROM gv$persistent_queuesWHERE queue_name ='HADOOP_MR_QUEUE'UNION ALLSELECT num_msgs + spill_msgs cFROM gv$buffered_queuesWHERE queue_name ='HADOOP_MR_QUEUE'UNION ALLSELECT0 c FROM DUAL);IF(cnt =0) THEN-- Queue is done. stop it.DBMS_AQADM.STOP_QUEUE ('HADOOP_MR_QUEUE');DBMS_AQADM.DROP_QUEUE ('HADOOP_MR_QUEUE');RETURN;ELSE-- Wait for a whileDBMS_LOCK.sleep (5);END IF;END LOOP;END;/-- Grants needed to make hadoop reader package work grant execute on launch_hadoop_job_async to oe;GRANT SELECT ON v_$session TO oe;GRANT SELECT ON v_$instance TO oe;GRANT SELECT ON v_$px_process TO oe;GRANT EXECUTE ON DBMS_AQADM TO oe;GRANT EXECUTE ON DBMS_AQ TO oe;connect oe/oe-- Simple reader package to read a file containing two numbersCREATE OR REPLACE PACKAGE hdfs_readerIS-- Return type of pl/sql table functionTYPE return_rows_t IS TABLE OF hadoop_row_obj;-- Checks if current invocation is serialFUNCTION is_serialRETURN BOOLEAN;-- Function to actually launch a Hadoop jobFUNCTION launch_hadoop_job (in_directory IN VARCHAR2,id IN OUT NUMBER)RETURN BOOLEAN;-- Tf to read from Hadoop-- This is the main processing code reading from the queue in-- Figure 3 step 6. It also contains the code to insert into-- the table in Figure 3 step 1FUNCTION read_from_hdfs_file (pcur IN SYS_REFCURSOR,in_directory IN VARCHAR2)RETURN return_rows_tPIPELINEDPARALLEL_ENABLE(PARTITION pcur BY ANY);END;/CREATE OR REPLACE PACKAGE BODY hdfs_readerIS-- Checks if current process is a px_processFUNCTION is_serialRETURN BOOLEANISc NUMBER;BEGINSELECT COUNT(*)INTO cFROM v$px_processWHERE sid=SYS_CONTEXT('USERENV','SESSIONID');IF c <>0 THENRETURN FALSE;ELSERETURN TRUE;END IF;EXCEPTIONWHEN OTHERSTHENRAISE;END;FUNCTION launch_hadoop_job (in_directory IN VARCHAR2,id IN OUT NUMBER)RETURN BOOLEANISPRAGMA AUTONOMOUS_TRANSACTION;instance_id NUMBER;jname VARCHAR2(4000);BEGINIF is_serial THEN-- Get id by mixing instance # and session id id := SYS_CONTEXT('USERENV', 'SESSIONID');SELECT instance_number INTO instance_id FROM v$instance;id:= instance_id * 100000+id;ELSE-- Get id of the QCSELECT owneridINTO idFROM v$sessionWHERE sid=SYS_CONTEXT('USERENV','SESSIONID');END IF;-- Create a row to 'lock' it so only one person does the job-- schedule. Everyone else will get an exception-- This is in Figure 3 step 1INSERT INTO run_hdfs_readVALUES(id,'RUNNING');jname :='Launch_hadoop_job_async';-- Launch a job to start the hadoop jobDBMS_SCHEDULER.CREATE_JOB (job_name => jname,job_type =>'STORED_PROCEDURE',job_action =>'unch_hadoop_job_async',number_of_arguments =>2);DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE (jname,1, in_directory);DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE (jname,2,CAST(id AS VARCHAR2)); DBMS_SCHEDULER.ENABLE ('Launch_hadoop_job_async');COMMIT;RETURN TRUE;EXCEPTION-- one of my siblings launched the job. Get out quitelyWHEN DUP_VAL_ON_INDEXTHENDBMS_OUTPUT.put_line ('dup value exception');RETURN FALSE;WHEN OTHERSTHENRAISE;END;FUNCTION read_from_hdfs_file (pcur IN SYS_REFCURSOR,in_directory IN VARCHAR2)RETURN return_rows_tPIPELINEDPARALLEL_ENABLE(PARTITION pcur BY ANY)ISPRAGMA AUTONOMOUS_TRANSACTION;cleanup BOOLEAN;payload hadoop_row_obj;id NUMBER;dopt DBMS_AQ.DEQUEUE_OPTIONS_T;mprop DBMS_AQ.MESSAGE_PROPERTIES_T;msgid RAW(100);BEGIN-- Launch a job to kick off the hadoop jobcleanup := launch_hadoop_job (in_directory,id);dopt.visibility := DBMS_AQ.IMMEDIATE;dopt.delivery_mode := DBMS_AQ.BUFFERED;LOOPpayload :=NULL;-- Get next rowdopt,mprop,payload,msgid);COMMIT;PIPE ROW(payload);END LOOP;EXCEPTIONWHEN OTHERSTHENIF cleanupTHENDELETE run_hdfs_readWHERE pk_id =id;COMMIT;END IF;END;END;/Bash 脚本下面这个简短的脚本是图 3 的第 3 步和第 4 步所示的数据库外控制器。