Hadoop とそのエコシステムの使用

コンテンツ :

MapReduce

初心者のための MapReduce

MapReduceは、大規模なデータ分析に Hadoop が使用するプログラミングのパラダイムです。これは基本的に分散したマシンのクラスタ内で divide-and-conquer 戦略を適用します :

  1. オリジナルの入力データは、多くのチャンクに分割されます。チャンクサイズは Hadoop ブロックサイズで、デフォルトは64 MBです
  2. プロセスは各データチャンクに対して分散/並列方式で実行され、各プロセスは部分入力データに対して同じ変換またはフィルタリング機能を実行します。一部の部分出力データが生成されます。これらのプロセスは "Mapper" と呼ばれます
  3. Mapper の出力を受信し、各プロセスを同じ集約関数または結合関数で実行することを担当する1つまたは複数のプロセスが実行されます。これらのプロセスは "reducers" と呼ばれます。Reducer の数は Hadoop によって計算されます

トップ

入力分割の詳細

設定された InputFormat クラスは、大きなデータファイルがブロックに分割される方法と、ブロックがどのように読み取られるかを管理します。 これにより、defaultFileInputFormat クラスが使用され、格納された各 HDFS データブロックごとにInputSplit オブジェクトが作成されます。 RecordReader 関数は InputSplit オブジェクトで与えられます。

どちらも InputSplitRecordReader は透過的な概念であり、プログラマは以下のものから標準入力フォーマットを指定するだけです :

  • FileInputFormat
  • TextInputFormat
  • KeyValueTextInputFormat
  • SequenceFileInputFormat
  • SequenceFileAsTextInputFormat
  • A custom one

トップ

Mapperの詳細

前述のように、Mapperは完全な InputSplits の読み取りと処理を担当しています。これは RecordReader 関数を呼び出すことによって行ごとに読み込まれ、split (key) 内の行の相対位置とデータ行自体 (value) について、key-value ペアを提供します。

入力ライン上で変換またはフィルタリング機能を実行すると、別の key-value ペアが出力されます。推測できるように、値は変換またはフィルタリングされた値です。キーは、アプリケーションロジックに応じてマッピング関数によって決定されます。変換されたデータのタイプに関する一定の値またはタグであってもかまいません。同じキーを共有するキーと値のペアのリストを Reducers に送信するために、出力されたキーを正しく決定することは非常に重要です。

トップ

Reducer の詳細

Reducers は、Mapper によって出力された key-value ペアのリストをそれぞれ受信し、すべてのペアに同じキーを共有します。このリストは、すべての Reducers と同じ集約関数または結合関数を実行するために反復され、別の key-value が戻されます。推測されるように、値は計算された集計に関連し、キーはアプリケーションロジックに依存します。受信した key-value ペアのリストの中の共有キーと同じですが、アプリケーションで必要な場合は非常に異なるものになる可能性があります。

Reducers の数は、いくつかのパラメーターと構成に応じて Hadoop によって選択されます。Mappers と Reducers の数について詳しくは、こちらをご覧ください。

トップ

出力の詳細

設定された OutputFormat クラスは、reducer の結果が HDFS に書き込まれる方法を制御します。 デフォルトでは、FileOutputFormat はキーと値の間の空白を使って key-value ペアを HDFS ファイルにシリアル化する RecordWriter 関数を提供します。

結果の出力は、各 reducer ごとに1つずつある、シリアライズされたデータの複数のファイルによって構成されます。

他のシリアライゼーション形式は、Hadoop のものとは別に使用できます :

  • Thrift
  • Protocol Buffers
  • Avro

トップ

Combiners

トップ

key-value ペアの詳細

MapReduce プロセス全体で交換される key-value ペアには、次のプロパティがあります :

  • Values は Writable インターフェイスを実装します
  • Key は WritableComparable を実装します
  • いくつかの out-of-the-box Hadoop クラス : IntWritable, LongWritable, FloatWritable, DoubleWritable, ...

トップ

MapReduce ジョブのプログラミング

Hadoop の MapReduce ジョブは、次のもので構成されます :

  • driver : 入力、出力、フォーマットなどを定義するソフトウェア、およびジョブを起動するためのエントリ・ポイント
  • Mapper のセット : その振る舞いを定義するソフトウェアによって与えられます
  • Reducers のセット : その振る舞いを定義するソフトウェアによって与えられます

ここから、Hadoop の母国語であるため、Java 言語が使用されます。それにもかかわらず、Hadoop ストリーミング機能により、他のプログラミング言語を使用することができます。

トップ

ドライバ・コードの例

次の例は、tidoop-mr-lib から取得したもので、MapReduce ドライバのルック・アンド・フィールを示しています。

ご覧のとおり、ドライバは、Configured クラスを拡張して、ツール・インタフェースを実装する main 関数を含む Java クラスです。インターフェイスの実装のため、このようなインターフェイスを実装するクラスを実行するために使用される、ToolRunner から実行される静的メソッドを呼び出すことは可能です。一般的な Hadoop のコマンドライン引数(-libjarsなど)を解析し、ツールの設定を変更します。 アプリケーション固有のオプションは変更されずに渡されます。

run メソッドの実装は、アプリケーション・パラメータをチェックして開始し、何らかのエラーが見つかった場合に正しい使用法を示します。 次に、Configuration オブジェクトが取得され、アプリケーション・パラメータの1つ、つまりフィルタリング基準を定義する正規表現が構成に追加されます。これは、mapper がそのようなパラメータを知る必要があり、 Configuration オブジェクトへのアクセス権しか持たないからです。

パラメータがチェックされ、設定に追加されると、ジョブが開始されることがあります。それは Job クラスのインスタンスとして作成され、その動作を定義するためにいくつかのセットが適用されます :

  • Reducer の数は1に設定されています。これはこの例に関する特定の制約によるものであり、一般的に必要なステップではありません
  • アプリケーションを含む Java jar は、アプリケーション・クラス名を指定して設定されます。これは、Hadoop が指定された Java jar 内のその他すべてのもの (mappers, reducers, その他の補助クラス)を見つけようとするため、必須のステップです
  • Mapper, combiner, reducer クラスは常に設定する必要があります
  • Mapper 出力の key クラスと value クラスを指定する必要があります
  • 同じことは、Reducer の key クラスと value クラスについても同様です

最後に、入出力パス(これらのパスはファイルではなくフォルダに関するものです)が設定され、MapReduce ジョブが起動されます。

/**
 * Copyright 2015 Telefonica Investigación y Desarrollo, S.A.U
 *
 * This file is part of fiware-tidoop (FI-WARE project).
 *
 * fiware-tidoop is free software: you can redistribute it and/or modify it under the terms of the GNU Affero
 * General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 * fiware-tidoop is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
 * implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
 * for more details.
 *
 * You should have received a copy of the GNU Affero General Public License along with fiware-tidoop. If not, see
 * http://www.gnu.org/licenses/.
 *
 * For those usages not covered by the GNU Affero General Public License please contact with
 * francisco.romerobueno at telefonica dot com
 */

package com.telefonica.iot.tidoop.mrlib.jobs;

import com.telefonica.iot.tidoop.mrlib.combiners.LinesCombiner;
import com.telefonica.iot.tidoop.mrlib.mappers.LineFilter;
import com.telefonica.iot.tidoop.mrlib.reducers.LinesJoiner;
import com.telefonica.iot.tidoop.mrlib.utils.Constants;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 *
 * @author frb
 */
public final class Filter extends Configured implements Tool {

   /**
    * Main class.
    * @param args
    * @throws Exception
    */
   public static void main(String[] args) throws Exception {
       int res = ToolRunner.run(new Configuration(), new Filter(), args);
       System.exit(res);
   } // main

   @Override
   public int run(String[] args) throws Exception {
       // check the number of arguments, show the usage if it is wrong
       if (args.length != 3) {
           showUsage();
           return -1;
       } // if

       // get the arguments
       String input = args[0];
       String output = args[1];
       String regex = args[2];

       // create and configure a MapReduce job
       Configuration conf = this.getConf();
       conf.set(Constants.PARAM_REGEX, regex);
       Job job = Job.getInstance(conf, "tidoop-mr-lib-filter");
       job.setNumReduceTasks(1);
       job.setJarByClass(Filter.class);
       job.setMapperClass(LineFilter.class);
       job.setCombinerClass(LinesCombiner.class);
       job.setReducerClass(LinesJoiner.class);
       job.setMapOutputKeyClass(Text.class);
       job.setMapOutputValueClass(Text.class);
       job.setOutputKeyClass(NullWritable.class);
       job.setOutputValueClass(Text.class);
       FileInputFormat.addInputPath(job, new Path(input));
       FileOutputFormat.setOutputPath(job, new Path(output));

       // run the MapReduce job
       return job.waitForCompletion(true) ? 0 : 1;
   } // main

   private void showUsage() {
       System.out.println("Usage:");
       System.out.println();
       System.out.println("hadoop jar \");
       System.out.println("   target/tidoop-mr-lib-x.y.z-jar-with-dependencies.jar \");
       System.out.println("   com.telefonica.iot.tidoop.mrlib.Filter \");
       System.out.println("   -libjars target/tidoop-mr-lib-x.y.z-jar-with-dependencies.jar \");
       System.out.println("   <HDFS input> \");
       System.out.println("   <HDFS output> \");
       System.out.println("   <regex>");
   } // showUsage

} // Filter

トップ

Mapper コードの例

Mapper は、Mapper クラスを拡張し、必要なマッピング関数の実装で少なくとも map メソッドをオーバーライドする必要があります。 以下の例では、マッピング関数は、テキスト行内の特定の文字列(正規表現で定義された)の存在に基づいて、テキスト行をフィルタリングするかどうかを指定します。 このような正規表現は前述のように、Configuration オブジェクトに渡され、 map メソッドは設定にアクセスできないため、setup メソッドの実装で取得する必要があります。

入力と出力の key-value のタイプを確認してください。FileInputSplit クラスから RecordReader メソッドを透過的に呼び出した結果、(Object,Text) ペアが、map メソッドに渡されます。このペアは、既に説明したように、分割またはデータ・ブロック内の読取りテキスト行の相対位置を含みます。 この特定の場合には、定数キー("common-key")とそれがフィルタリングされなかった場合の入力テキスト行とを含むマップ関数の結果として、(Text,Text) ペアが出力されます。この定数キーを使用する理由は、出力されたすべてのペアが同じ単一の reducer に送信されるためです。

/**
 * Copyright 2015 Telefonica Investigación y Desarrollo, S.A.U
 *
 * This file is part of fiware-tidoop (FI-WARE project).
 *
 * fiware-tidoop is free software: you can redistribute it and/or modify it under the terms of the GNU Affero
 * General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 * fiware-tidoop is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
 * implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
 * for more details.
 *
 * You should have received a copy of the GNU Affero General Public License along with fiware-tidoop. If not, see
 * http://www.gnu.org/licenses/.
 *
 * For those usages not covered by the GNU Affero General Public License please contact with
 * francisco.romerobueno at telefonica dot com
 */

package com.telefonica.iot.tidoop.mrlib.mappers;

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 *
 * @author frb
 */

/**
 * LineFilter.
 */
public static class LineFilter extends Mapper<Object, Text, Text, Text> {

   private Pattern pattern = null;
   private final Text commonKey = new Text("common-key");

   @Override
   public void setup(Context context) throws IOException, InterruptedException {
       // compile just once the regex; use an empty regex if no one is provided
       pattern = Pattern.compile(context.getConfiguration().get(Constants.PARAM_REGEX, ""));
   } // setup

   @Override
   public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
       Matcher matcher = pattern.matcher(value.toString());

       if (matcher.matches()) {
           context.write(commonKey, value);
       } // if
   } // map

} // LineFilter

トップ

Reducer コード例

Reducer は、Reducer クラスを拡張して、少なくとも reduce メソッドをオーバーライドして、望ましい還元関数を実装する必要があります。 以下の例では、reducing 関数は受信したすべてのペアを出力に出すことです。

入力と出力の key-value ペアのタイプを確認してください。 いくつかの (Text,Text) ペアは、iterable オブジェクトとして reduce メソッドに渡されます。これらは mappers がアルゴリズムの前のステップで出力したペアです。reduce 関数の結果としていくつかの(NullWritable、Text) 対が出力されます。 NullWritable の使用は、フィルタ処理の結果全体を含む最終的な HDFS ファイルにキーをシリアル化したくないためです。 最後に、 RecordWriter メソッドを介した FileOutputFormat は、1つの reducer だけがセットアップされているので、結果のペアのそれぞれを1つの HDFS ファイルにシリアライズします。

/**
 * Copyright 2015 Telefonica Investigación y Desarrollo, S.A.U
 *
 * This file is part of fiware-tidoop (FI-WARE project).
 *
 * fiware-tidoop is free software: you can redistribute it and/or modify it under the terms of the GNU Affero
 * General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 * fiware-tidoop is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
 * implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
 * for more details.
 *
 * You should have received a copy of the GNU Affero General Public License along with fiware-tidoop. If not, see
 * http://www.gnu.org/licenses/.
 *
 * For those usages not covered by the GNU Affero General Public License please contact with
 * francisco.romerobueno at telefonica dot com
 */

package com.telefonica.iot.tidoop.mrlib.reducers;

import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 *
 * @author frb
 */

/**
 * LinesJoiner.
 */
public class LinesJoiner extends Reducer<Text, Text, NullWritable, Text> {

   @Override
   public void reduce(Text key, Iterable<Text> filteredLines, Context context)
       throws IOException, InterruptedException {
       for (Text filteredLine : filteredLines) {
           context.write(NullWritable.get(), filteredLine);
      } // for
  } // reduce

} // LinesJoiner

トップ

Compilation

上記のクラスがコード化されたら、それらをコンパイルして jar ファイルを作成するだけです。現代の Java 用 IDE はそれをあなたのために行います。とにかく、ソースファイルが src フォルダにあると仮定すると、コンパイルされたクラスのデスティネーションは classes で、jar ファイルのデスティネーションは dist です。次の Java コマンドですべてを作成できます :

$ rm dist/*
$ rm classes/es/tid/*
$ javac -classpath <path_to_hadoop_core_jar> -d classes/ src/*.java
$ jar -cvf dist/<my_app>.jar -C classes/ .

作成された jar は、呼び出されると Hadoop によってすべてのクラスタ・ノードに自動的に配布されます。この wiki の MapReduce ジョブのアップロードと実行のセクションを参照してください。Hadoop classpath のどこかにそれをあなた自身でコピーすることもできます :

$ cp dist/word-count.jar /usr/lib/hadoop/lib

トップ

MapReduceジョブのアップロードと実行

Hadoop MapReduce ジョブは Java で書かれ、Java jar ファイルとしてパッケージ化されています。このセクションでは、入力データファイルを含むフォルダと、結果が使用可能になるはずの場所を指定することによって、これらのジョブを実行する方法について説明します。両方のフォルダは HDFS ユーザ空間の下に配置されます。入力フォルダは実行前に存在する必要がありますが、ジョブが完了すると出力フォルダーが作成されます。例えば :

$ hadoop fs -ls /user/myuserspace
Found 2 items
drwxr-----   - myuserspace myuserspace      0 2015-07-09 12:04 /user/frb/.staging
drwxr-xr-x   - myuserspace myuserspace      0 2013-06-21 09:41 /user/myuserspace/input
$ hadoop fs -ls /user/myuserspace/input
Found 1 items
-rw-r--r--   3 myuserspace myuserspace   1234 2013-06-21 09:41 /user/myuserspace/input/mydatafile.txt
$ hadoop fs -cat /user/myuserspace/input/mydatafile.txt
these are some lines of data
this is not real data
but it is useful for demostration purposes
these are some lines of data
this is not real data
but it is useful for demostration purposes
these are some lines of data
this is not real data
but it is useful for demostration purposes
...

デモ目的のためだけに tidoop-mr-lib から Filter アプリケーションを実行すると仮定すると、これは次のコマンドを入力した後の出力になります :

$ hadoop jar target/tidoop-mr-lib-0.0.0-SNAPSHOT-jar-with-dependencies.jar com.telefonica.iot.tidoop.mrlib.jobs.Filter -libjars target/tidoop-mr-lib-0.0.0-SNAPSHOT-jar-with-dependencies.jar input output ^.*\bdata\b.*$
15/08/11 16:02:59 INFO impl.TimelineClientImpl: Timeline service address: http://dev-fiwr-bignode-12.hi.inet:8188/ws/v1/timeline/
15/08/11 16:02:59 INFO client.RMProxy: Connecting to ResourceManager at dev-fiwr-bignode-12.hi.inet/10.95.236.44:8050
15/08/11 16:03:00 INFO input.FileInputFormat: Total input paths to process : 1
15/08/11 16:03:01 INFO mapreduce.JobSubmitter: number of splits:1
15/08/11 16:03:01 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1438089170493_0002
15/08/11 16:03:01 INFO impl.YarnClientImpl: Submitted application application_1438089170493_0002
15/08/11 16:03:01 INFO mapreduce.Job: The url to track the job: http://dev-fiwr-bignode-12.hi.inet:8088/proxy/application_1438089170493_0002/
15/08/11 16:03:01 INFO mapreduce.Job: Running job: job_1438089170493_0002
15/08/11 16:03:08 INFO mapreduce.Job: Job job_1438089170493_0002 running in uber mode : false
15/08/11 16:03:08 INFO mapreduce.Job:  map 0% reduce 0%
15/08/11 16:03:13 INFO mapreduce.Job:  map 100% reduce 0%
15/08/11 16:03:19 INFO mapreduce.Job:  map 100% reduce 100%
15/08/11 16:03:20 INFO mapreduce.Job: Job job_1438089170493_0002 completed successfully
15/08/11 16:03:20 INFO mapreduce.Job: Counters: 49
   File System Counters
       FILE: Number of bytes read=6
       FILE: Number of bytes written=204841
       FILE: Number of read operations=0
       FILE: Number of large read operations=0
       FILE: Number of write operations=0
       HDFS: Number of bytes read=1924
       HDFS: Number of bytes written=0
       HDFS: Number of read operations=6
       HDFS: Number of large read operations=0
       HDFS: Number of write operations=2
   Job Counters
       Launched map tasks=1
       Launched reduce tasks=1
       Rack-local map tasks=1
       Total time spent by all maps in occupied slots (ms)=3659
       Total time spent by all reduces in occupied slots (ms)=3497
       Total time spent by all map tasks (ms)=3659
       Total time spent by all reduce tasks (ms)=3497
       Total vcore-seconds taken by all map tasks=3659
       Total vcore-seconds taken by all reduce tasks=3497
       Total megabyte-seconds taken by all map tasks=14987264
       Total megabyte-seconds taken by all reduce tasks=14323712
   Map-Reduce Framework
       Map input records=57
       Map output records=0
       Map output bytes=0
       Map output materialized bytes=6
       Input split bytes=138
       Combine input records=0
       Combine output records=0
       Reduce input groups=0
       Reduce shuffle bytes=6
       Reduce input records=0
       Reduce output records=0
       Spilled Records=0
       Shuffled Maps =1
       Failed Shuffles=0
       Merged Map outputs=1
       GC time elapsed (ms)=25
       CPU time spent (ms)=1100
       Physical memory (bytes) snapshot=1407291392
       Virtual memory (bytes) snapshot=8527355904
       Total committed heap usage (bytes)=1702363136
   Shuffle Errors
       BAD_ID=0
       CONNECTION=0
       IO_ERROR=0
       WRONG_LENGTH=0
       WRONG_MAP=0
       WRONG_REDUCE=0
   File Input Format Counters
       Bytes Read=1786
   File Output Format Counters
       Bytes Written=0

このコマンドは次のように構成されています :

hadoop jar <jar_file> <main_class> <existing_input_folder> <non_existing_output_folder>

ジョブが完了すると(実際のジョブはタスクを完了するのに数時間または数日かかることがあります)、結果は指定された出力フォルダにあります :

$ hadoop fs -ls output
Found 2 items
-rw-r--r--   3 myuserspace myuserspace      0 2015-08-11 16:03 output/_SUCCESS
-rw-r--r--   3 myuserspace myuserspace    969 2015-08-11 16:03 output/part-r-00000

Filter アプリケーションは単一の reducer (前のセクションを参照)を実行するようにセットアップされているので、出力フォルダには1つの part-r-0000 ファイルが表示されます。複数の reducer がセットアップされていた場合、それぞれの reducer にファイルが表示されます。その出力ファイルの内容を表示して、単語のデータが格納されている行だけが維持されていることを確認できます :

$ hadoop fs -cat output/part-r-00000
these are some lines of data
this is not real data
these are some lines of data
this is not real data
these are some lines of data
this is not real data
...

トップ

Tidoop MapReduce ジョブの実行

Github の README のジョブ・リファレンスのセクションを参照してください。

トップ

CKAN データを消費する MapReduce ジョブのプログラミング

Github の README の usage のセクションを参照してください。

トップ

Hive

Hiveは、Hadoop のデータウェアハウス・システムであり、簡単なデータ集計、SQL と同様のクエリのアドホック、および Hadoop 互換ファイルシステムに格納された大規模データセットの分析を容易にします[4]。Hive を使用すると MapReduce のプログラミングは不要です。なぜなら、すべての MapReduce は自動的に Hive によって処理されるからです。

Hive が動作する方法は、すべてのデータを SQL のようなテーブルにロードし、HiveQL で記述された内部的な (Hive の cli を使用した) または 外部の (Java ベースの Hive クライアントを使用した) SQL のようなクエリを可能にすることです。既に述べたように、ほとんどの場合、クエリが実行されるたびに、必要なデータを選択、フィルタリング、結合、グループ化するために、事前定義された Hive MapReduce ジョブが実行されます。単純な select * from table が実行された場合、MapReduce ジョブは実行されません (テーブル内のすべてのデータが返されるため)。

トップ

Hive CLI

Hive CLI [6]は、テスト目的または Oozie を介したリモート実行のみを考慮する必要があります。これは、あなたの資格情報を使ってヘッド・ノードに ssh し、シェルに hive をタイプすることによって使用できます :

$ hive
$ Hive history file=/tmp/myuser/hive_job_log_opendata_201310030912_2107722657.txt
$ hive>select column1,column2,otherColumns from mytable where column1='whatever' and columns2 like '%whatever%';
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapred.reduce.tasks=<number>
Starting Job = job_201308280930_0953, Tracking URL = http://cosmosmaster-gi:50030/jobdetails.jsp?jobid=job_201308280930_0953
Kill Command = /usr/lib/hadoop/bin/hadoop job  -Dmapred.job.tracker=cosmosmaster-gi:8021 -kill job_201308280930_0953
2013-10-03 09:15:34,519 Stage-1 map = 0%,  reduce = 0%
2013-10-03 09:15:36,545 Stage-1 map = 67%,  reduce = 0%
2013-10-03 09:15:37,554 Stage-1 map = 100%,  reduce = 0%
2013-10-03 09:15:44,609 Stage-1 map = 100%,  reduce = 33%
2013-10-03 09:15:45,631 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_201308280930_0953
OK

the result set...

トップ

カスタム Hive クライアントのプログラミング

通常、Hive サーバはコンピューティング・サービスノードで実行され、Hive クライアントからの接続は通常どおり TCP/10000 ポートで処理されます。いくつかの一般的なプログラミング言語 (Java, Pythonなど) で独自の Hive クライアントを書く方法を学ぶために、次のセクションを見てください。独自の Hive クライアントを作成する際にも、このリンクを参考にしてください。

トップ

Java

以下の例では、Hive 0.13.0 がコンピューティング・サービスノードに配備され、サーバの実行が HiveServer2 バージョンであると想定します。さらに、Maven がクライアントの構築に使用されていると仮定します。

したがって、依存関係を解決するために、まず pom.xml に次の行を追加します :

<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-core</artifactId>
   <version>0.20.0</version>
</dependency>
<dependency>
   <groupId>org.apache.hive</groupId>
   <artifactId>hive-exec</artifactId>
   <version>0.13.0</version>
</dependency>
<dependency>
   <groupId>org.apache.hive</groupId>
   <artifactId>hive-jdbc</artifactId>
   <version>0.13.0</version>
</dependency>

これが最小コードです :

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.DriverManager;

public class HiveClient {

   // JDBC driver required for Hive connections
   private static String driverName = "org.apache.hive.jdbc.HiveDriver";
   private static Connection con;

   private static Connection getConnection(String ip, String port, String db, String user, String password) {
      try {
         // dynamically load the Hive JDBC driver
         Class.forName(driverName);
      } catch (ClassNotFoundException e) {
         System.out.println(e.getMessage());
         return null;
      } // try catch

      try {
         // return a connection based on the Hive JDBC driver
         return DriverManager.getConnection("jdbc:hive2://" + ip + ":" + port + "/" + db, user, password);
      } catch (SQLException e) {
         System.out.println(e.getMessage());
         return null;
      } // try catch
   } // getConnection

   private static void doQuery() {
      try {
         // from here on, everything is SQL!
         Statement stmt = con.createStatement();
         ResultSet res = stmt.executeQuery("select column1,column2,otherColumns "
            + "from mytable where column1='whatever' and columns2 like '%whatever%'");

         // iterate on the result
         while (res.next()) {
            String column1 = res.getString(1);
            Integer column2 = res.getInt(2);

            // whatever you want to do with this row, here
         } // while

         // close everything
         res.close();
         stmt.close();
         con.close();
      } catch (SQLException ex) {
         System.exit(0);
      } // try catch
   } // doQuery

   public static void main(String[] args) {
      // get a connection to the Hive server running on the specified IP address, listening on 10000/TCP port
      // authenticate using my credentials
      con = getConnection("cosmos.lab.fiware.org", "10000", "default", "myuser", "mypasswd");

      // do a query, querying the Hive server will automatically imply the execution of one or more MapReduce jobs
      doQuery();
   } // main

} // HiveClient

トップ

Python

以下の例では、Hive 0.13.0 がコンピューティング・サービス・ノードに配備され、サーバの実行が HiveServer2 バージョンであると想定します。また、pip, Python パッケージ・マネージャーとしてインストールされているものとします。

したがって、HiveServer2 用の 3 driver をインストールすることから始めます :

$ pip install pyhs2`

次のコードは Python を使って基本的な Hive クライアントを実装しています :

import pyhs2

with pyhs2.connect(host='cosmos.lab.fiware.org',
                port=10000,
                authMechanism="PLAIN",
                user='myuser',
                password='mypassword',
                database='default') as conn:
 with conn.cursor() as cur:
     cur.execute("select * from table")
     print cur.getSchema()

     for i in cur.fetch():
         print i`

トップ