当前位置:文档之家› 如何在Oracle中集成Hadoop

如何在Oracle中集成Hadoop

如何在Oracle中集成Hadoop
如何在Oracle中集成Hadoop

如何在Oracle中集成Hadoop

文: SRC 张旭东

许多垂直行业都在关注文件系统中庞大的数据。这些数据中通常包含大量无关的明细信息,以及部分可用于趋势分析或丰富其他数据的精华信息。尽管这些数据存储在数据库之外,但一些客户仍然希望将其与数据库中的数据整合在一起以提取对业务用户有价值的信息。

本文详细介绍了如何从 Oracle 数据库访问存储在 Hadoop 集群里的数据。请注意,本文选择了 Hadoop 和 HDFS 作为示例,但这里的策略同样适用于其他分布式存储机制。本文中介绍了各种访问方法,还通过一个具体示例说明了其中一种访问方法的实现。

要从 Oracle 数据库里访问某个文件系统中的外部文件或外部数据,最简单的方法莫过于使用外部表。外部表以表的形式展示存储在文件系统中的数据,并且可在 SQL 查询中完全透明地使用。因此,可以考虑用外部表从 Oracle 数据库中直接访问 HDFS(Hadoop 文件系统)中存储的数据。遗憾的是,常规的操作系统无法调用外部表驱动直接访问 HDFS 文件。FUSE(File System in Userspace)项目针对这种情况提供了解决方法。有多种 FUSE 驱动程序支持用户挂载 HDFS 存储,并将其作为常规文件系统处理。通过使用一个此类驱动程序,并在数据库实例上挂载 HDFS (如果是 RAC 数据库,则在其所有实例上挂载 HDFS),即可使用外部表基础架构轻松访问 HDFS 文件。

图 1. 用数据库内置的 MapReduce 通过外部表进行访问

在图 1 中,我们利用 Oracle Database 11g 实现本文所述的数据库内的 map-reduce。通常情况下,Oracle Database 11g 中的并行执行框架足以满足针对外部表大多数的并行操作。

在有些情况下(例如,如果 FUSE 不可用),外部表方法可能不适用。Oracle 表函数提供了从 Hadoop 中获取数据的替代方法。本文附带的示例展示了一种这样的方法。更深入地来讲,我们用一个表函数来实现,这个表函数使用 DBMS_SCHEDULER 框架异步调用外部shell 脚本,然后

Oracle 高级队列特性进行通信。Hadoop mapper 将数据排入一个公共队列,而表函数则从该队列中取出数据。由于该表函数能够并行运行,因此使用额外的逻辑来确保仅有一个服务进程提交外部作业。

图 2. 利用表函数进行并行处理

由于表函数可以并行运行,Hadoop 流作业也可以不同程度地并行运行,并且后者不受Oracle 查询协调器的控制,这种情况下,队列能提供负载平衡。

下面我们将以一个实际示例展示图 2 的架构。请注意,我们的示例仅展示了使用表函数访问Hadoop 中存储的数据的一个模板实现。显然可能存在其他的甚至可能更好的实现。

下图是图 2 中原始示意图在技术上更准确、更具体的展示,解释了我们要在何处、如何使用后文给出的部分实际代码:

图 3. 启动 Mapper 作业并检索数据

第 1 步是确定由谁作为查询协调器。对此我们采用一种将具有相同键值的记录写入表的简单机制。首个插入胜出,作为此进程的查询协调器 (QC)。请注意,QC 表函数调用同时也承担 着处理角色。

在第 2 步中,该表函数调用 (QC) 使用 dbms_scheduler(图 3 中的作业控制器)启动一个异步作业,该作业接着在 Hadoop 集群上运行同步 bash 脚本。这个 bash 脚本就是图 3 中的启动程序 (launcher),它在 Hadoop 集群上启动 mapper 进程(第 3 步)。

mapper 进程处理数据,并在第 5 步写入一个队列。在本文的示例中,我们选择了一个在集群范围内可用的队列。现在,我们只是单纯地将任何输出直接写入到队列里。您可以通过批量处理输出并将其移入队列来提高性能。显然,您也可以选择管道和关系表等其他各种机制。

随后的第 6 步是出队过程,这是通过数据库中的表函数并行调用来实现的。这些并行调用处理得到的数据将会提供给查询请求来使用。表函数同时处理Oracle数据库的数据和来自队列 中的数据,并将来自两个来源的数据整合为单一结果集提供给最终用户。

图 4. 监控进程

Hadoop的进程 (mapper) 启动之后,作业监控器进程将监视启动程序脚本。一旦mapper 完成Hadoop 集群中数据的处理之后,bash 脚本即完成,如图 4 所示。

作业监控器将监视数据库调度程序队列,并在 shell 脚本完成时发出通知(第 7 步)。作业监控器检查数据队列中的剩余数据元素(第 8 步)。只要队列中存在数据,表函数调用就会继续处理数据(第 6 步)。

图 5. 关闭处理

当表函数并行调用取出队列中的全部数据之后,作业监控器将终止队列(图 5 所示的第 9步)以确保 Oracle 中的表函数调用停止。此时,所有数据均已交付给请求这些数据的查询。

本文中的示例表明,将 Hadoop 系统与 Oracle Database 11g 集成是非常容易的。 本文中讨论的方法允许客户将 Hadoop 中的数据直接传递到 Oracle 查询中。这避免了将数据获取到本

地文件系统并物化到 Oracle 表中,之后才能在 SQL 查询中访问这些数据的过程。

示例代码

图3 至 图 5 实现的解决方案使用以下代码。Oracle官方称:以下示例的所有代码均在 Oracle Database 11g 和 5 个节点的 Hadoop 集群上进行过测试。

处理数据的表函数

该脚本中包含某些设置组件。例如,脚本开始的部分创建了图 3 中第 1 步所展示的仲裁表。本例中使用的是一直广受欢迎的 OE 模式。

connect oe/oe

-- Table to use as locking mechanisim for the hdfs reader as

-- leveraged in Figure 3 step 1

DROP TABLE run_hdfs_read;

CREATE TABLE run_hdfs_read

(

pk_id NUMBER,

status VARCHAR2(100),

PRIMARY KEY(pk_id)

);

-- Object type used for AQ that receives the data

CREATE OR REPLACE TYPE hadoop_row_obj AS OBJECT

(a NUMBER, b NUMBER);

/

connect/as sysdba

-- system job to launch external script

-- this job is used to eventually run the bash script

-- described in Figure 3 step 3

CREATE OR REPLACE PROCEDURE launch_hadoop_job_async (

in_directory IN VARCHAR2,

id NUMBER)

IS

cnt NUMBER;

BEGIN

BEGIN

DBMS_SCHEDULER.DROP_JOB ('ExtScript' || id,TRUE);

EXCEPTION

WHEN OTHERS

THEN

NULL;

END;

-- Run a script

DBMS_SCHEDULER.CREATE_JOB (job_name =>'ExtScript' || id,

job_type =>'EXECUTABLE',

job_action =>'/bin/bash',

number_of_arguments =>1);

DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE ('ExtScript' || id,1, in_directory);

DBMS_SCHEDULER.ENABLE ('ExtScript' || id);

-- Wait till the job is done. This ensures the hadoop job is completed

LOOP

SELECT COUNT(*)

INTO cnt

FROM DBA_SCHEDULER_JOBS

WHERE job_name ='EXTSCRIPT' || id;

DBMS_OUTPUT.put_line ('Scheduler Count is ' || cnt);

IF(cnt =0) THEN

EXIT;

ELSE

DBMS_LOCK.sleep (5);

END IF;

END LOOP;

-- Wait till the queue is empty and then drop it

-- as shown in Figure 5

-- The TF will get an exception and it will finish quietly

LOOP

SELECT SUM(c)

INTO cnt

FROM(SELECT enqueued_msgs - dequeued_msgs c

FROM gv$persistent_queues

WHERE queue_name ='HADOOP_MR_QUEUE'

UNION ALL

SELECT num_msgs + spill_msgs c

FROM gv$buffered_queues

WHERE queue_name ='HADOOP_MR_QUEUE'

UNION ALL

SELECT0 c FROM DUAL);

IF(cnt =0) THEN

-- Queue is done. stop it.

DBMS_AQADM.STOP_QUEUE ('HADOOP_MR_QUEUE');

DBMS_AQADM.DROP_QUEUE ('HADOOP_MR_QUEUE');

RETURN;

ELSE

-- Wait for a while

DBMS_LOCK.sleep (5);

END IF;

END LOOP;

END;

/

-- Grants needed to make hadoop reader package work grant execute on launch_hadoop_job_async to oe;

GRANT SELECT ON v_$session TO oe;

GRANT SELECT ON v_$instance TO oe;

GRANT SELECT ON v_$px_process TO oe;

GRANT EXECUTE ON DBMS_AQADM TO oe;

GRANT EXECUTE ON DBMS_AQ TO oe;

connect oe/oe

-- Simple reader package to read a file containing two numbers

CREATE OR REPLACE PACKAGE hdfs_reader

IS

-- Return type of pl/sql table function

TYPE return_rows_t IS TABLE OF hadoop_row_obj;

-- Checks if current invocation is serial

FUNCTION is_serial

RETURN BOOLEAN;

-- Function to actually launch a Hadoop job

FUNCTION launch_hadoop_job (in_directory IN VARCHAR2,id IN OUT NUMBER)

RETURN BOOLEAN;

-- Tf to read from Hadoop

-- This is the main processing code reading from the queue in

-- Figure 3 step 6. It also contains the code to insert into

-- the table in Figure 3 step 1

FUNCTION read_from_hdfs_file (pcur IN SYS_REFCURSOR,

in_directory IN VARCHAR2)

RETURN return_rows_t

PIPELINED

PARALLEL_ENABLE(PARTITION pcur BY ANY);

END;

/

CREATE OR REPLACE PACKAGE BODY hdfs_reader

IS

-- Checks if current process is a px_process

FUNCTION is_serial

RETURN BOOLEAN

IS

c NUMBER;

BEGIN

SELECT COUNT(*)

INTO c

FROM v$px_process

WHERE sid=SYS_CONTEXT('USERENV','SESSIONID');

IF c <>0 THEN

RETURN FALSE;

ELSE

RETURN TRUE;

END IF;

EXCEPTION

WHEN OTHERS

THEN

RAISE;

END;

FUNCTION launch_hadoop_job (in_directory IN VARCHAR2,id IN OUT NUMBER)

RETURN BOOLEAN

IS

PRAGMA AUTONOMOUS_TRANSACTION;

instance_id NUMBER;

jname VARCHAR2(4000);

BEGIN

IF is_serial THEN

-- Get id by mixing instance # and session id id := SYS_CONTEXT('USERENV', 'SESSIONID');

SELECT instance_number INTO instance_id FROM v$instance;

id:= instance_id * 100000+id;

ELSE

-- Get id of the QC

SELECT ownerid

INTO id

FROM v$session

WHERE sid=SYS_CONTEXT('USERENV','SESSIONID');

END IF;

-- Create a row to 'lock' it so only one person does the job

-- schedule. Everyone else will get an exception

-- This is in Figure 3 step 1

INSERT INTO run_hdfs_read

VALUES(id,'RUNNING');

jname :='Launch_hadoop_job_async';

-- Launch a job to start the hadoop job

DBMS_SCHEDULER.CREATE_JOB (job_name => jname,

job_type =>'STORED_PROCEDURE',

job_action =>'https://www.doczj.com/doc/91872578.html,unch_hadoop_job_async',

number_of_arguments =>2);

DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE (jname,1, in_directory);

DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE (jname,2,CAST(id AS VARCHAR2)); DBMS_SCHEDULER.ENABLE ('Launch_hadoop_job_async');

COMMIT;

RETURN TRUE;

EXCEPTION

-- one of my siblings launched the job. Get out quitely

WHEN DUP_VAL_ON_INDEX

THEN

DBMS_OUTPUT.put_line ('dup value exception');

RETURN FALSE;

WHEN OTHERS

THEN

RAISE;

END;

FUNCTION read_from_hdfs_file (pcur IN SYS_REFCURSOR,

in_directory IN VARCHAR2)

RETURN return_rows_t

PIPELINED

PARALLEL_ENABLE(PARTITION pcur BY ANY)

IS

PRAGMA AUTONOMOUS_TRANSACTION;

cleanup BOOLEAN;

payload hadoop_row_obj;

id NUMBER;

dopt DBMS_AQ.DEQUEUE_OPTIONS_T;

mprop DBMS_AQ.MESSAGE_PROPERTIES_T;

msgid RAW(100);

BEGIN

-- Launch a job to kick off the hadoop job

cleanup := launch_hadoop_job (in_directory,id);

dopt.visibility := DBMS_AQ.IMMEDIATE;

dopt.delivery_mode := DBMS_AQ.BUFFERED;

LOOP

payload :=NULL;

-- Get next row

dopt,

mprop,

payload,

msgid);

COMMIT;

PIPE ROW(payload);

END LOOP;

EXCEPTION

WHEN OTHERS

THEN

IF cleanup

THEN

DELETE run_hdfs_read

WHERE pk_id =id;

COMMIT;

END IF;

END;

END;

/

Bash 脚本

下面这个简短的脚本是图 3 的第 3 步和第 4 步所示的数据库外控制器。只要 Hadoop mapper

保持运行,系统就会持续执行这个同步步骤。

#!/bin/bash

cd –HADOOP_HOME-

A="/net/scratch/java/jdk1.6.0_16/bin/java -classpath

/home/hadoop:/home/hadoop/ojdbc6.jar StreamingEq"

bin/hadoop fs -rmr output

bin/hadoop jar ./contrib/streaming/hadoop-0.20.0-streaming.jar -input input/nolist.txt -output output -mapper "$A"-jobconf mapred.reduce.tasks=0

JAVA Mapper 脚本

我们为本例编写了在 Hadoop 集群上执行的一个简单的 mapper 进程。实际上当然存在许多更加完善的 mapper。这个 mapper 将一个字符串转为两个数字,并按照逐行的方式将其提供给队列。 // Simplified mapper example for Hadoop cluster

import java.sql.CallableStatement;

import java.sql.Connection;

import java.sql.ResultSet;

import java.sql.Statement;

import oracle.jdbc.pool.OracleDataSource;

//import java.util.Arrays;

//import oracle.sql.ARRAY;

//import oracle.sql.ArrayDescriptor;

public class StreamingEq {

public static void main(String args[]) throws Exception {

// connect through driver

OracleDataSource ods = new OracleDataSource();

Connection conn = null;

Statement stmt = null;

java.io.BufferedReader stdin;

String line;

int countArr = 0, i;

oracle.sql.ARRAY sqlArray;

oracle.sql.ArrayDescriptor arrDesc;

try {

ods.setURL("jdbc:oracle:thin:oe/oe@$ORACLE_INSTANCE");

conn = ods.getConnection();

} catch (Exception e) {

System.out.println("Got exception " + e);

if (rset != null)

rset.close();

if (conn != null)

conn.close();

System.exit(0);

return;

}

System.out.println("connection works conn " + conn);

stdin = new java.io.BufferedReader(new java.io.InputStreamReader(System.in));

String query = "declare dopt dbms_aq.enqueue_options_t; mprop

dbms_aq.message_properties_t; msgid raw(100); begin dopt.visibility := DBMS_AQ.IMMEDIATE; dopt.delivery_mode := DBMS_AQ.BUFFERED; dbms_Aq.enqueue('HADOOP_MR_QUEUE', dopt, mprop, hadoop_row_obj(?,?), msgid);COMMIT;END;";

CallableStatement pstmt = null;

try {

pstmt = conn.prepareCall(query);

}

catch (Exception e) {

System.out.println("Got exception " + e);

if (rset != null)

rset.close();

if (conn != null)

conn.close();

}

while ((line = stdin.readLine()) != null) {

if (line.replaceAll(" ", "").length() < 2)

continue;

String[] data = line.split(",");

if (data.length != 2)

continue;

countArr++;

pstmt.setInt(1, Integer.parseInt(data[0]));

pstmt.setInt(2, Integer.parseInt(data[1]));

pstmt.executeUpdate();

}

if (conn != null)

conn.close();

System.exit(0);

return;

}

}

查询请求数据

利用表函数处理,在上述系统中执行 select 查询的示例如下:

-- Set up phase for the data queue

execute DBMS_AQADM.CREATE_QUEUE_TABLE ('HADOOP_MR_QUEUE','HADOOP_ROW_OBJ'); execute DBMS_AQADM.CREATE_QUEUE ('HADOOP_MR_QUEUE','HADOOP_MR_QUEUE'); execute DBMS_AQADM.START_QUEUE ('HADOOP_MR_QUEUE');

-- Query being executed by an end user or BI tool

-- Note the hint is not a real hint, but a comment

-- to clarify why we use the cursor

SELECT MAX(a),AVG(b)

FROM TABLE(

hdfs_reader.

read_from_hdfs_file (

CURSOR(

SELECT/*+ FAKE cursor to kick start parallelism */ *

FROM orders),

'/home/hadoop/eq_test4.sh'));

基于Hadoop的研究及性能分析

基于Hadoop的研究及性能分析 摘要 在大数据到来的今天,本文首先介绍了Hadoop及其核心技术MapReduce的工作原理。详细讨论了Hadoop推测执行算法和SALS 推测执行算法并对它们的性能进行分析。最后,分析了MapReduce 框架的通用二路连接算法 RSJ。为了提高性能,提出了一种基于DistributedCache 的改进算法,通过减少 mapper 输出的数据来达到优化的目的。 关键字:Hadoop MapReduce 性能算法

Abstract:In the era of big data, this paper introduces Hadoop, MapReduce and its core technology works.I have discussed the Hadoop speculative execution algorithms and SALS speculative execution algorithm and analyzed their performance.Finally, I analyzed the Common Road Join Algorithm in MapReduce framework.To improve performance, I propose an improved algorithm based DistributedCache by reducing the mapper output data to achieve optimization purposes. Key words:Hadoop; MapReduce; Performance;Algorithm

相关主题
文本预览
相关文档 最新文档