CLIでのジョブの実行とモニタリング

Cloudera Altusクライアントでは、ジョブの投入やジョブのプロパティの表示が行えます。以下のコマンドは、Altusコマンドの使用例です。

Altusクライアントで使用できるコマンドの詳しい情報は、以下のコマンドで表示されます。

altus dataeng help 

Sparkジョブの投入

Sparkのジョブは、以下のコマンドで投入できます。

altus dataeng submit-jobs
 --cluster-name ClusterName 
 --jobs '{ "sparkJob": {
             "jars": [
               "PathAndFilenameOfJar1",
               "PathAndFilenameOfJar2"
             ]
        }}'

パラメーターとしてapplicationArgumentssparkArgumentsを含めて、メインクラスのmainメソッドに値を渡すこともできます。

アプリケーションとSparkの両方に引数としてパラメーターを渡す場合は、引数のリストをエスケープしなければなりません。あるいは引数をファイルに書き、そのファイルのパスとファイル名を引数パラメーターとして渡すこともできます。

また、mainClassパラメーターを渡してアプリケーションのエントリーポイントを指定することもできます。

Sparkジョブの例

Pi Estimation

Sparkは、Sparkの動作の様子を紹介するコードサンプルのライブラリを提供しています。以下のサンプルでは、このSparkのライブラリからPIの計算を例とし、Altus CLIを使ったSparkジョブの投入方法を紹介します。

以下のコマンドで、Pi を計算するサンプルプログラムをSparkジョブとして投入できます。
altus dataeng submit-jobs \
 --cluster-name ClusterName \
 --jobs '{ "sparkJob": {
             "jars": [
               "local:///opt/cloudera/parcels/CDH/lib/spark/lib/spark-examples.jar"
             ],
             "sparkArguments" : "--executor-memory 1G --num-executors 2",
             "mainClass": "org.apache.spark.examples.SparkPi"
        }}'

--cluster-nameパラメーターにはSparkクラスターの名前を渡さなければなりません。

Medicare
以下のサンプルは、一般公開されているデータを処理して医療データの分析を行います。このSparkジョブはCloudera Altusのサンプルジョブ用のS3バケットに置かれており、同じくサンプル用のS3バケットから入力データを読み取ります。ユーザーのアカウントでS3バケットを作成し、そこに出力データを書き出すことができます。
このサンプルを使うには、ユーザーのAWSアカウントでS3バケットをセットアップし、ジョブの実行時に書き込めるようにパーミッションを設定してください。
このSparkジョブを実行するには、以下のようにします。
  1. このSparkジョブを実行するためのSparkクラスターを作成します。
    クラスターはサービスタイプをSpark 2.xもしくはSpark 1.6として作成できます。クラスターのSparkサービスのバージョンは、Sparkのjarファイルのバージョンと一致していなければなりません。
    • Spark 2.xの場合はaltus-sample-medicare-spark2x.jarという名前のサンプルjarファイルを使用してください。
    • Spark 1.6の場合はaltus-sample-medicare-spark1x.jarという名前のサンプルjarファイルを使用してください。

    クラスターの作成に関する詳しい情報については、クラスターの作成を参照してください。

  2. ユーザーのAWSのアカウントでS3のバケットを作成します。
  3. 以下のコマンドで、Medicareジョブを投入します。
    altus dataeng submit-jobs \
    --cluster-name ClusterName \
    --jobs '{ "sparkJob": {
                    "jars": [
                      "s3a://cloudera-altus-data-engineering-samples/spark/medicare/program/NameOfJarForSparkVersion"],
                    "mainClass": "com.cloudera.altus.sample.medicare.transform",
                    "applicationArguments": [
                       "s3a://cloudera-altus-data-engineering-samples/spark/medicare/input/",
                       "s3a://NameOfOutputS3Bucket/OutputPath/"
                    ]
                }}'

    --cluster-nameパラメーターには、Sparkクラスターの名前を、サンプルのjarファイルにマッチするバージョンのSparkとともに渡さなければなりません。

Hiveジョブの投入

Hiveのジョブは、以下のコマンドで投入できます。

altus dataeng submit-jobs
--cluster-name ClusterName
--jobs '{ "hiveJob": { 
             "script": "PathAndFilenameOfHQLScript" 
        }}'

jobXml パラメーターを使い、Hiveジョブのための設定を渡すこともできます。

以下に示すのは、jobXml パラメーターで利用できるHiveジョブのXMLの例です。
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
  <property>
    <name>hive.auto.convert.join</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.auto.convert.join.noconditionaltask.size</name>
    <value>20971520</value>
  </property>
  <property>
    <name>hive.optimize.bucketmapjoin.sortedmerge</name>
    <value>false</value>
  </property>
  <property>
    <name>hive.smbjoin.cache.rows</name>
    <value>10000</value>
  </property>
  <property>
    <name>mapred.reduce.tasks</name>
    <value>-1</value>
  </property>
  <property>
    <name>hive.exec.reducers.max</name>
    <value>1099</value>
  </property>
</configuration>

Hiveジョブの例

以下に示すHiveジョブのサンプルは、データをCSVファイルから読み出し、そのデータをClouderaのAWSアカウント内のS3バケットに書き出します。そして、カンマをコロンに置き換えて、同じデータをユーザーのAWSアカウントに書き出します。

このサンプルを実行するには、ユーザーのAWSのアカウントでS3のバケットを作成し、サンプルのHiveスクリプトの実行時に書き込みを許可するようパーミッションを設定してください。

このHiveジョブのサンプルを実行するには、以下のようにします。

  1. Hiveジョブを実行するクラスターを作成します。

    Hiveジョブは、Hive on MapReduceもしくはHive on Sparkクラスター上で実行できます。クラスターの作成に関する詳しい情報についてはクラスターの作成を参照してください。

  2. ユーザーのAWSアカウントでS3のバケットを作成します。
  3. ローカルドライブ上にHiveスクリプトを作成します。

    ここではhiveScript.hqlというファイルを使うものとします。

  4. 以下のスクリプトをコピーして、ファイルに貼り付けます。
    DROP TABLE input;
    DROP TABLE output;
    
    CREATE EXTERNAL TABLE input(f1 STRING, f2 STRING, f3 STRING)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    LOCATION 's3a://cloudera-altus-data-engineering-samples/hive/data/';
    
    CREATE TABLE output(f1 STRING, f2 STRING, f3 STRING)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ':'
    STORED AS TEXTFILE
    LOCATION 's3a://NameOfOutputS3Bucket/OutputPath/';
    
    INSERT OVERWRITE TABLE output SELECT * FROM input ORDER BY f1;
  5. スクリプトを修正して、出力先のS3バケットの名前とパスを自分のAWSアカウントに作成したS3バケットの名前とパスに置き換えます。
  6. 以下のコマンドを実行します。
    altus dataeng submit-jobs
    --cluster-name=ClusterName
    --jobs '{ "hiveJob": { 
                 "script": "PathToHiveScript/hiveScript.hql" 
            }}'

    --cluster-nameパラメーターには、HiveもしくはHive on Sparkクラスターの名前を渡さなければなりません。

    scriptパラメーターには、スクリプトファイルの絶対パスとファイル名に file://をプレフィックスとして付けて渡します。これはたとえば --jobs '{ "hiveJob": { "script": "file:///file/path/to/my/hiveScript.hql" }}'のようになるでしょう。

MapReduceジョブの投入

MapReduceのジョブは、以下のコマンドで投入できます。

altus dataeng submit-jobs
 --cluster-name ClusterName 
 --jobs '{ "mr2Job": {
             "mainClass": "main.class.file",
             "jars": [
               "s3a://PathAndFilenameOfJar1",
               "s3a://PathAndFilenameOfJar2"
             ]
        }}'

MapReduceジョブの例

以下の例に示すMapReduceは、Cloudera Altusのサンプルジョブ用のS3バケットにあります。このジョブはS3のバケットからファイルを読み取り、テキスト中の主な単語をカウントし、標準出力に単語の登場回数を出力します。

以下のコマンドでMapReduceのジョブを投入し、このサンプルを実行させることができます。
altus dataeng submit-jobs
 --cluster-name ClusterName 
 --jobs '{ "mr2Job": {
             "mainClass": "org.apache.hadoop.examples.WordCount",
             "jars": ["s3a://cloudera-altus-data-engineering-samples/wordcount/program/hadoop-examples.jar"],
             "arguments": "s3a://cloudera-altus-data-engineering-samples/wordcount/input/poetry" 
        }}'

--cluster-nameパラメーターには、MapReduceクラスターの名前を渡さなければなりません。