Hadoop とそのエコシステムの使用¶
コンテンツ :
MapReduce¶
初心者のための MapReduce¶
MapReduceは、大規模なデータ分析に Hadoop が使用するプログラミングのパラダイムです。これは基本的に分散したマシンのクラスタ内で divide-and-conquer 戦略を適用します :
- オリジナルの入力データは、多くのチャンクに分割されます。チャンクサイズは Hadoop ブロックサイズで、デフォルトは64 MBです
- プロセスは各データチャンクに対して分散/並列方式で実行され、各プロセスは部分入力データに対して同じ変換またはフィルタリング機能を実行します。一部の部分出力データが生成されます。これらのプロセスは "Mapper" と呼ばれます
- Mapper の出力を受信し、各プロセスを同じ集約関数または結合関数で実行することを担当する1つまたは複数のプロセスが実行されます。これらのプロセスは "reducers" と呼ばれます。Reducer の数は Hadoop によって計算されます
入力分割の詳細¶
設定された InputFormat
クラスは、大きなデータファイルがブロックに分割される方法と、ブロックがどのように読み取られるかを管理します。 これにより、defaultFileInputFormat
クラスが使用され、格納された各 HDFS データブロックごとにInputSplit
オブジェクトが作成されます。 RecordReader 関数は InputSplit
オブジェクトで与えられます。
どちらも InputSplit
と RecordReader
は透過的な概念であり、プログラマは以下のものから標準入力フォーマットを指定するだけです :
FileInputFormat
TextInputFormat
KeyValueTextInputFormat
SequenceFileInputFormat
SequenceFileAsTextInputFormat
A custom one
Mapperの詳細¶
前述のように、Mapperは完全な InputSplits
の読み取りと処理を担当しています。これは RecordReader
関数を呼び出すことによって行ごとに読み込まれ、split (key) 内の行の相対位置とデータ行自体 (value) について、key-value ペアを提供します。
入力ライン上で変換またはフィルタリング機能を実行すると、別の key-value ペアが出力されます。推測できるように、値は変換またはフィルタリングされた値です。キーは、アプリケーションロジックに応じてマッピング関数によって決定されます。変換されたデータのタイプに関する一定の値またはタグであってもかまいません。同じキーを共有するキーと値のペアのリストを Reducers に送信するために、出力されたキーを正しく決定することは非常に重要です。
Reducer の詳細¶
Reducers は、Mapper によって出力された key-value ペアのリストをそれぞれ受信し、すべてのペアに同じキーを共有します。このリストは、すべての Reducers と同じ集約関数または結合関数を実行するために反復され、別の key-value が戻されます。推測されるように、値は計算された集計に関連し、キーはアプリケーションロジックに依存します。受信した key-value ペアのリストの中の共有キーと同じですが、アプリケーションで必要な場合は非常に異なるものになる可能性があります。
Reducers の数は、いくつかのパラメーターと構成に応じて Hadoop によって選択されます。Mappers と Reducers の数について詳しくは、こちらをご覧ください。
出力の詳細¶
設定された OutputFormat
クラスは、reducer の結果が HDFS に書き込まれる方法を制御します。 デフォルトでは、FileOutputFormat
はキーと値の間の空白を使って key-value ペアを HDFS ファイルにシリアル化する RecordWriter
関数を提供します。
結果の出力は、各 reducer ごとに1つずつある、シリアライズされたデータの複数のファイルによって構成されます。
他のシリアライゼーション形式は、Hadoop のものとは別に使用できます :
- Thrift
- Protocol Buffers
- Avro
Combiners¶
key-value ペアの詳細¶
MapReduce プロセス全体で交換される key-value ペアには、次のプロパティがあります :
- Values は Writable インターフェイスを実装します
- Key は WritableComparable を実装します
- いくつかの out-of-the-box Hadoop クラス : IntWritable, LongWritable, FloatWritable, DoubleWritable, ...
MapReduce ジョブのプログラミング¶
Hadoop の MapReduce ジョブは、次のもので構成されます :
- driver : 入力、出力、フォーマットなどを定義するソフトウェア、およびジョブを起動するためのエントリ・ポイント
- Mapper のセット : その振る舞いを定義するソフトウェアによって与えられます
- Reducers のセット : その振る舞いを定義するソフトウェアによって与えられます
ここから、Hadoop の母国語であるため、Java 言語が使用されます。それにもかかわらず、Hadoop ストリーミング機能により、他のプログラミング言語を使用することができます。
ドライバ・コードの例¶
次の例は、tidoop-mr-lib から取得したもので、MapReduce ドライバのルック・アンド・フィールを示しています。
ご覧のとおり、ドライバは、Configured
クラスを拡張して、ツール・インタフェースを実装する main 関数を含む Java クラスです。インターフェイスの実装のため、このようなインターフェイスを実装するクラスを実行するために使用される、ToolRunner
から実行される静的メソッドを呼び出すことは可能です。一般的な Hadoop のコマンドライン引数(-libjars
など)を解析し、ツールの設定を変更します。 アプリケーション固有のオプションは変更されずに渡されます。
run
メソッドの実装は、アプリケーション・パラメータをチェックして開始し、何らかのエラーが見つかった場合に正しい使用法を示します。 次に、Configuration
オブジェクトが取得され、アプリケーション・パラメータの1つ、つまりフィルタリング基準を定義する正規表現が構成に追加されます。これは、mapper がそのようなパラメータを知る必要があり、 Configuration
オブジェクトへのアクセス権しか持たないからです。
パラメータがチェックされ、設定に追加されると、ジョブが開始されることがあります。それは Job
クラスのインスタンスとして作成され、その動作を定義するためにいくつかのセットが適用されます :
- Reducer の数は1に設定されています。これはこの例に関する特定の制約によるものであり、一般的に必要なステップではありません
- アプリケーションを含む Java jar は、アプリケーション・クラス名を指定して設定されます。これは、Hadoop が指定された Java jar 内のその他すべてのもの (mappers, reducers, その他の補助クラス)を見つけようとするため、必須のステップです
- Mapper, combiner, reducer クラスは常に設定する必要があります
- Mapper 出力の key クラスと value クラスを指定する必要があります
- 同じことは、Reducer の key クラスと value クラスについても同様です
最後に、入出力パス(これらのパスはファイルではなくフォルダに関するものです)が設定され、MapReduce ジョブが起動されます。
/**
* Copyright 2015 Telefonica Investigación y Desarrollo, S.A.U
*
* This file is part of fiware-tidoop (FI-WARE project).
*
* fiware-tidoop is free software: you can redistribute it and/or modify it under the terms of the GNU Affero
* General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
* fiware-tidoop is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
* implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
* for more details.
*
* You should have received a copy of the GNU Affero General Public License along with fiware-tidoop. If not, see
* http://www.gnu.org/licenses/.
*
* For those usages not covered by the GNU Affero General Public License please contact with
* francisco.romerobueno at telefonica dot com
*/
package com.telefonica.iot.tidoop.mrlib.jobs;
import com.telefonica.iot.tidoop.mrlib.combiners.LinesCombiner;
import com.telefonica.iot.tidoop.mrlib.mappers.LineFilter;
import com.telefonica.iot.tidoop.mrlib.reducers.LinesJoiner;
import com.telefonica.iot.tidoop.mrlib.utils.Constants;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @author frb
*/
public final class Filter extends Configured implements Tool {
/**
* Main class.
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Filter(), args);
System.exit(res);
} // main
@Override
public int run(String[] args) throws Exception {
// check the number of arguments, show the usage if it is wrong
if (args.length != 3) {
showUsage();
return -1;
} // if
// get the arguments
String input = args[0];
String output = args[1];
String regex = args[2];
// create and configure a MapReduce job
Configuration conf = this.getConf();
conf.set(Constants.PARAM_REGEX, regex);
Job job = Job.getInstance(conf, "tidoop-mr-lib-filter");
job.setNumReduceTasks(1);
job.setJarByClass(Filter.class);
job.setMapperClass(LineFilter.class);
job.setCombinerClass(LinesCombiner.class);
job.setReducerClass(LinesJoiner.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
// run the MapReduce job
return job.waitForCompletion(true) ? 0 : 1;
} // main
private void showUsage() {
System.out.println("Usage:");
System.out.println();
System.out.println("hadoop jar \");
System.out.println(" target/tidoop-mr-lib-x.y.z-jar-with-dependencies.jar \");
System.out.println(" com.telefonica.iot.tidoop.mrlib.Filter \");
System.out.println(" -libjars target/tidoop-mr-lib-x.y.z-jar-with-dependencies.jar \");
System.out.println(" <HDFS input> \");
System.out.println(" <HDFS output> \");
System.out.println(" <regex>");
} // showUsage
} // Filter
Mapper コードの例¶
Mapper は、Mapper
クラスを拡張し、必要なマッピング関数の実装で少なくとも map
メソッドをオーバーライドする必要があります。 以下の例では、マッピング関数は、テキスト行内の特定の文字列(正規表現で定義された)の存在に基づいて、テキスト行をフィルタリングするかどうかを指定します。 このような正規表現は前述のように、Configuration
オブジェクトに渡され、 map
メソッドは設定にアクセスできないため、setup
メソッドの実装で取得する必要があります。
入力と出力の key-value のタイプを確認してください。FileInputSplit
クラスから RecordReader
メソッドを透過的に呼び出した結果、(Object,Text)
ペアが、map
メソッドに渡されます。このペアは、既に説明したように、分割またはデータ・ブロック内の読取りテキスト行の相対位置を含みます。 この特定の場合には、定数キー("common-key"
)とそれがフィルタリングされなかった場合の入力テキスト行とを含むマップ関数の結果として、(Text,Text)
ペアが出力されます。この定数キーを使用する理由は、出力されたすべてのペアが同じ単一の reducer に送信されるためです。
/**
* Copyright 2015 Telefonica Investigación y Desarrollo, S.A.U
*
* This file is part of fiware-tidoop (FI-WARE project).
*
* fiware-tidoop is free software: you can redistribute it and/or modify it under the terms of the GNU Affero
* General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
* fiware-tidoop is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
* implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
* for more details.
*
* You should have received a copy of the GNU Affero General Public License along with fiware-tidoop. If not, see
* http://www.gnu.org/licenses/.
*
* For those usages not covered by the GNU Affero General Public License please contact with
* francisco.romerobueno at telefonica dot com
*/
package com.telefonica.iot.tidoop.mrlib.mappers;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
*
* @author frb
*/
/**
* LineFilter.
*/
public static class LineFilter extends Mapper<Object, Text, Text, Text> {
private Pattern pattern = null;
private final Text commonKey = new Text("common-key");
@Override
public void setup(Context context) throws IOException, InterruptedException {
// compile just once the regex; use an empty regex if no one is provided
pattern = Pattern.compile(context.getConfiguration().get(Constants.PARAM_REGEX, ""));
} // setup
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
Matcher matcher = pattern.matcher(value.toString());
if (matcher.matches()) {
context.write(commonKey, value);
} // if
} // map
} // LineFilter
Reducer コード例¶
Reducer は、Reducer
クラスを拡張して、少なくとも reduce
メソッドをオーバーライドして、望ましい還元関数を実装する必要があります。 以下の例では、reducing 関数は受信したすべてのペアを出力に出すことです。
入力と出力の key-value ペアのタイプを確認してください。 いくつかの (Text,Text)
ペアは、iterable
オブジェクトとして reduce
メソッドに渡されます。これらは mappers がアルゴリズムの前のステップで出力したペアです。reduce 関数の結果としていくつかの(NullWritable、Text)
対が出力されます。 NullWritable の使用は、フィルタ処理の結果全体を含む最終的な HDFS ファイルにキーをシリアル化したくないためです。 最後に、 RecordWriter
メソッドを介した FileOutputFormat
は、1つの reducer だけがセットアップされているので、結果のペアのそれぞれを1つの HDFS ファイルにシリアライズします。
/**
* Copyright 2015 Telefonica Investigación y Desarrollo, S.A.U
*
* This file is part of fiware-tidoop (FI-WARE project).
*
* fiware-tidoop is free software: you can redistribute it and/or modify it under the terms of the GNU Affero
* General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
* fiware-tidoop is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
* implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
* for more details.
*
* You should have received a copy of the GNU Affero General Public License along with fiware-tidoop. If not, see
* http://www.gnu.org/licenses/.
*
* For those usages not covered by the GNU Affero General Public License please contact with
* francisco.romerobueno at telefonica dot com
*/
package com.telefonica.iot.tidoop.mrlib.reducers;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
*
* @author frb
*/
/**
* LinesJoiner.
*/
public class LinesJoiner extends Reducer<Text, Text, NullWritable, Text> {
@Override
public void reduce(Text key, Iterable<Text> filteredLines, Context context)
throws IOException, InterruptedException {
for (Text filteredLine : filteredLines) {
context.write(NullWritable.get(), filteredLine);
} // for
} // reduce
} // LinesJoiner
Compilation¶
上記のクラスがコード化されたら、それらをコンパイルして jar ファイルを作成するだけです。現代の Java 用 IDE はそれをあなたのために行います。とにかく、ソースファイルが src フォルダにあると仮定すると、コンパイルされたクラスのデスティネーションは classes で、jar ファイルのデスティネーションは dist です。次の Java コマンドですべてを作成できます :
$ rm dist/*
$ rm classes/es/tid/*
$ javac -classpath <path_to_hadoop_core_jar> -d classes/ src/*.java
$ jar -cvf dist/<my_app>.jar -C classes/ .
作成された jar は、呼び出されると Hadoop によってすべてのクラスタ・ノードに自動的に配布されます。この wiki の MapReduce ジョブのアップロードと実行のセクションを参照してください。Hadoop classpath のどこかにそれをあなた自身でコピーすることもできます :
$ cp dist/word-count.jar /usr/lib/hadoop/lib
MapReduceジョブのアップロードと実行¶
Hadoop MapReduce ジョブは Java で書かれ、Java jar ファイルとしてパッケージ化されています。このセクションでは、入力データファイルを含むフォルダと、結果が使用可能になるはずの場所を指定することによって、これらのジョブを実行する方法について説明します。両方のフォルダは HDFS ユーザ空間の下に配置されます。入力フォルダは実行前に存在する必要がありますが、ジョブが完了すると出力フォルダーが作成されます。例えば :
$ hadoop fs -ls /user/myuserspace
Found 2 items
drwxr----- - myuserspace myuserspace 0 2015-07-09 12:04 /user/frb/.staging
drwxr-xr-x - myuserspace myuserspace 0 2013-06-21 09:41 /user/myuserspace/input
$ hadoop fs -ls /user/myuserspace/input
Found 1 items
-rw-r--r-- 3 myuserspace myuserspace 1234 2013-06-21 09:41 /user/myuserspace/input/mydatafile.txt
$ hadoop fs -cat /user/myuserspace/input/mydatafile.txt
these are some lines of data
this is not real data
but it is useful for demostration purposes
these are some lines of data
this is not real data
but it is useful for demostration purposes
these are some lines of data
this is not real data
but it is useful for demostration purposes
...
デモ目的のためだけに tidoop-mr-lib から Filter アプリケーションを実行すると仮定すると、これは次のコマンドを入力した後の出力になります :
$ hadoop jar target/tidoop-mr-lib-0.0.0-SNAPSHOT-jar-with-dependencies.jar com.telefonica.iot.tidoop.mrlib.jobs.Filter -libjars target/tidoop-mr-lib-0.0.0-SNAPSHOT-jar-with-dependencies.jar input output ^.*\bdata\b.*$
15/08/11 16:02:59 INFO impl.TimelineClientImpl: Timeline service address: http://dev-fiwr-bignode-12.hi.inet:8188/ws/v1/timeline/
15/08/11 16:02:59 INFO client.RMProxy: Connecting to ResourceManager at dev-fiwr-bignode-12.hi.inet/10.95.236.44:8050
15/08/11 16:03:00 INFO input.FileInputFormat: Total input paths to process : 1
15/08/11 16:03:01 INFO mapreduce.JobSubmitter: number of splits:1
15/08/11 16:03:01 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1438089170493_0002
15/08/11 16:03:01 INFO impl.YarnClientImpl: Submitted application application_1438089170493_0002
15/08/11 16:03:01 INFO mapreduce.Job: The url to track the job: http://dev-fiwr-bignode-12.hi.inet:8088/proxy/application_1438089170493_0002/
15/08/11 16:03:01 INFO mapreduce.Job: Running job: job_1438089170493_0002
15/08/11 16:03:08 INFO mapreduce.Job: Job job_1438089170493_0002 running in uber mode : false
15/08/11 16:03:08 INFO mapreduce.Job: map 0% reduce 0%
15/08/11 16:03:13 INFO mapreduce.Job: map 100% reduce 0%
15/08/11 16:03:19 INFO mapreduce.Job: map 100% reduce 100%
15/08/11 16:03:20 INFO mapreduce.Job: Job job_1438089170493_0002 completed successfully
15/08/11 16:03:20 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=6
FILE: Number of bytes written=204841
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=1924
HDFS: Number of bytes written=0
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Rack-local map tasks=1
Total time spent by all maps in occupied slots (ms)=3659
Total time spent by all reduces in occupied slots (ms)=3497
Total time spent by all map tasks (ms)=3659
Total time spent by all reduce tasks (ms)=3497
Total vcore-seconds taken by all map tasks=3659
Total vcore-seconds taken by all reduce tasks=3497
Total megabyte-seconds taken by all map tasks=14987264
Total megabyte-seconds taken by all reduce tasks=14323712
Map-Reduce Framework
Map input records=57
Map output records=0
Map output bytes=0
Map output materialized bytes=6
Input split bytes=138
Combine input records=0
Combine output records=0
Reduce input groups=0
Reduce shuffle bytes=6
Reduce input records=0
Reduce output records=0
Spilled Records=0
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=25
CPU time spent (ms)=1100
Physical memory (bytes) snapshot=1407291392
Virtual memory (bytes) snapshot=8527355904
Total committed heap usage (bytes)=1702363136
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=1786
File Output Format Counters
Bytes Written=0
このコマンドは次のように構成されています :
hadoop jar <jar_file> <main_class> <existing_input_folder> <non_existing_output_folder>
ジョブが完了すると(実際のジョブはタスクを完了するのに数時間または数日かかることがあります)、結果は指定された出力フォルダにあります :
$ hadoop fs -ls output
Found 2 items
-rw-r--r-- 3 myuserspace myuserspace 0 2015-08-11 16:03 output/_SUCCESS
-rw-r--r-- 3 myuserspace myuserspace 969 2015-08-11 16:03 output/part-r-00000
Filter アプリケーションは単一の reducer (前のセクションを参照)を実行するようにセットアップされているので、出力フォルダには1つの part-r-0000
ファイルが表示されます。複数の reducer がセットアップされていた場合、それぞれの reducer にファイルが表示されます。その出力ファイルの内容を表示して、単語のデータが格納されている行だけが維持されていることを確認できます :
$ hadoop fs -cat output/part-r-00000
these are some lines of data
this is not real data
these are some lines of data
this is not real data
these are some lines of data
this is not real data
...
Tidoop MapReduce ジョブの実行¶
Github の README のジョブ・リファレンスのセクションを参照してください。
CKAN データを消費する MapReduce ジョブのプログラミング¶
Github の README の usage のセクションを参照してください。
Hive¶
Hiveは、Hadoop のデータウェアハウス・システムであり、簡単なデータ集計、SQL と同様のクエリのアドホック、および Hadoop 互換ファイルシステムに格納された大規模データセットの分析を容易にします[4]。Hive を使用すると MapReduce のプログラミングは不要です。なぜなら、すべての MapReduce は自動的に Hive によって処理されるからです。
Hive が動作する方法は、すべてのデータを SQL のようなテーブルにロードし、HiveQL で記述された内部的な (Hive の cli を使用した) または 外部の (Java ベースの Hive クライアントを使用した) SQL のようなクエリを可能にすることです。既に述べたように、ほとんどの場合、クエリが実行されるたびに、必要なデータを選択、フィルタリング、結合、グループ化するために、事前定義された Hive MapReduce ジョブが実行されます。単純な select * from table
が実行された場合、MapReduce ジョブは実行されません (テーブル内のすべてのデータが返されるため)。
Hive CLI¶
Hive CLI [6]は、テスト目的または Oozie を介したリモート実行のみを考慮する必要があります。これは、あなたの資格情報を使ってヘッド・ノードに ssh し、シェルに hive をタイプすることによって使用できます :
$ hive
$ Hive history file=/tmp/myuser/hive_job_log_opendata_201310030912_2107722657.txt
$ hive>select column1,column2,otherColumns from mytable where column1='whatever' and columns2 like '%whatever%';
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapred.reduce.tasks=<number>
Starting Job = job_201308280930_0953, Tracking URL = http://cosmosmaster-gi:50030/jobdetails.jsp?jobid=job_201308280930_0953
Kill Command = /usr/lib/hadoop/bin/hadoop job -Dmapred.job.tracker=cosmosmaster-gi:8021 -kill job_201308280930_0953
2013-10-03 09:15:34,519 Stage-1 map = 0%, reduce = 0%
2013-10-03 09:15:36,545 Stage-1 map = 67%, reduce = 0%
2013-10-03 09:15:37,554 Stage-1 map = 100%, reduce = 0%
2013-10-03 09:15:44,609 Stage-1 map = 100%, reduce = 33%
2013-10-03 09:15:45,631 Stage-1 map = 100%, reduce = 100%
Ended Job = job_201308280930_0953
OK
the result set...
カスタム Hive クライアントのプログラミング¶
通常、Hive サーバはコンピューティング・サービスノードで実行され、Hive クライアントからの接続は通常どおり TCP/10000 ポートで処理されます。いくつかの一般的なプログラミング言語 (Java, Pythonなど) で独自の Hive クライアントを書く方法を学ぶために、次のセクションを見てください。独自の Hive クライアントを作成する際にも、このリンクを参考にしてください。
Java¶
以下の例では、Hive 0.13.0 がコンピューティング・サービスノードに配備され、サーバの実行が HiveServer2 バージョンであると想定します。さらに、Maven がクライアントの構築に使用されていると仮定します。
したがって、依存関係を解決するために、まず pom.xml
に次の行を追加します :
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>0.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>0.13.0</version>
</dependency>
これが最小コードです :
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.DriverManager;
public class HiveClient {
// JDBC driver required for Hive connections
private static String driverName = "org.apache.hive.jdbc.HiveDriver";
private static Connection con;
private static Connection getConnection(String ip, String port, String db, String user, String password) {
try {
// dynamically load the Hive JDBC driver
Class.forName(driverName);
} catch (ClassNotFoundException e) {
System.out.println(e.getMessage());
return null;
} // try catch
try {
// return a connection based on the Hive JDBC driver
return DriverManager.getConnection("jdbc:hive2://" + ip + ":" + port + "/" + db, user, password);
} catch (SQLException e) {
System.out.println(e.getMessage());
return null;
} // try catch
} // getConnection
private static void doQuery() {
try {
// from here on, everything is SQL!
Statement stmt = con.createStatement();
ResultSet res = stmt.executeQuery("select column1,column2,otherColumns "
+ "from mytable where column1='whatever' and columns2 like '%whatever%'");
// iterate on the result
while (res.next()) {
String column1 = res.getString(1);
Integer column2 = res.getInt(2);
// whatever you want to do with this row, here
} // while
// close everything
res.close();
stmt.close();
con.close();
} catch (SQLException ex) {
System.exit(0);
} // try catch
} // doQuery
public static void main(String[] args) {
// get a connection to the Hive server running on the specified IP address, listening on 10000/TCP port
// authenticate using my credentials
con = getConnection("cosmos.lab.fiware.org", "10000", "default", "myuser", "mypasswd");
// do a query, querying the Hive server will automatically imply the execution of one or more MapReduce jobs
doQuery();
} // main
} // HiveClient
Python¶
以下の例では、Hive 0.13.0 がコンピューティング・サービス・ノードに配備され、サーバの実行が HiveServer2 バージョンであると想定します。また、pip
, Python パッケージ・マネージャーとしてインストールされているものとします。
したがって、HiveServer2 用の 3 driver をインストールすることから始めます :
$ pip install pyhs2`
次のコードは Python を使って基本的な Hive クライアントを実装しています :
import pyhs2
with pyhs2.connect(host='cosmos.lab.fiware.org',
port=10000,
authMechanism="PLAIN",
user='myuser',
password='mypassword',
database='default') as conn:
with conn.cursor() as cur:
cur.execute("select * from table")
print cur.getSchema()
for i in cur.fetch():
print i`