Accessing Data Stored in Amazon S3 through Spark

To access data stored in Amazon S3 from Spark applications, you use Hadoop file APIs (SparkContext.hadoopFile, JavaHadoopRDD.saveAsHadoopFile, SparkContext.newAPIHadoopRDD, and JavaHadoopRDD.saveAsNewAPIHadoopFile) for reading and writing RDDs, providing URLs of the form s3a://bucket_name/path/to/file. You can read and write Spark SQL DataFrames using the Data Source API.

You can access Amazon S3 by the following methods:

Without credentials:

This mode of operation associates the authorization with individual EC2 instances instead of with each Spark app or the entire cluster.

Run EC2 instances with instance profiles associated with IAM roles that have the permissions you want. Requests from a machine with such a profile authenticate without credentials.

With credentials:
You can use one of the following methods described below to set up AWS credentials.
  • Set up AWS Credentials Using the Hadoop Credential Provider - Cloudera recommends you use this method to set up AWS access because it provides system-wide AWS access to a single predefined bucket, without exposing the secret key in a configuration file or having to specify it at runtime.
    1. Create the Hadoop credential provider file with the necessary access and secret keys:
      hadoop credential create fs.s3a.access.key -provider jceks://hdfs/path_to_hdfs_file -value aws_access_id
      

      For example:

      hadoop credential create fs.s3a.access.key -provider jceks://hdfs/user/root/awskeyfile.jceks -value AKI***********************
      
      Add the AWS secret key to the .jceks credential file.
      hadoop credential create fs.s3a.secret.key -provider jceks://hdfs/path_to_hdfs_file -value aws_secret_key
      

      For example:

      hadoop credential create fs.s3a.secret.key -provider jceks://hdfs/user/root/awskeyfile.jceks -value +pla**************************************************
      
    2. AWS access for users can be set up in two ways. You can either provide a global credential provider file that will allow all Spark users to submit S3 jobs, or have each user submit their own credentials every time they submit a job.
      • For Per-User Access - Provide the path to your specific credential store on the command line when submitting a Spark job. This means you do not need to modify the global settings for core-site.xml. Each user submitting a job can provide their own credentials at runtime as follows:
        spark-submit --conf spark.hadoop.hadoop.security.credential.provider.path=PATH_TO_JCEKS_FILE ...
      • For System-Wide Access - Point to the Hadoop credential file created in the previous step using the Cloudera Manager Server:
        1. Login to the Cloudera Manager server.
        2. On the main page under Cluster, click on HDFS. Then click on Configuration. In the search box, enter core-site.
        3. Click on the + sign next to Cluster-wide Advanced Configuration Snippet (Safety Valve) for core-site.xml. For Name, put spark.hadoop.security.credential.provider.path and for Value put jceks://hdfs/path_to_hdfs_file. For example, jceks://hdfs/user/root/awskeyfile.jceks.
        4. Click Save Changes and deploy the client configuration to all nodes of the cluster.

          After the services restart, you can use AWS filesystem with credentials supplied automatically through a secure mechanism.

    3. (Optional) Configure Oozie to Run Spark S3 Jobs - Set spark.hadoop.security.credential.provider.path to the path of the .jceks file in Oozie's workflow.xml file under the Spark Action's spark-opts section. This allows Spark to load AWS credentials from the .jceks file in HDFS.
      <action name="sparkS3job">
          <spark>
              ....
              <spark-opts>--conf spark.hadoop.hadoop.security.credential.provider.path=PATH_TO_JCEKS_FILE</spark-opts>
              ....
      </action>
      
      
      You can use the Oozie notation ${wf:user()} in the path to let Oozie use different AWS credentials for each user. For example:
      --conf spark.hadoop.hadoop.security.credential.provider.path=jceks://hdfs/user/${wf:user()}/aws.jceks
      
  • (Not Recommended) Specify the credentials at run time. For example:
    sc.hadoopConfiguration.set("fs.s3a.access.key", "...")
      sc.hadoopConfiguration.set("fs.s3a.secret.key", "...")
    This mode of operation is the most flexible, with each application able to access different S3 buckets. It might require extra work on your part to avoid making the secret key visible in source code. (For example, you might use a function call to retrieve the secret key from a secure location.)
  • (Not Recommended) Specify the credentials in a configuration file, such as core-site.xml:
    <property>
        <name>fs.s3a.access.key</name>
        <value>...</value>
    </property>
    <property>
        <name>fs.s3a.secret.key</name>
        <value>...</value>
    </property>
    This mode of operation is convenient if all, or most, apps on a cluster access the same S3 bucket. Any apps that need different S3 credentials can use one of the other S3 authorization techniques.

Reading and Writing Text Files From and To Amazon S3

  1. Specify Amazon S3 credentials.
  2. Perform the word count application on a sonnets.txt file stored in Amazon S3:



    scala> val sonnets = sc.textFile("s3a://s3-to-ec2/sonnets.txt")
    scala> val counts = sonnets.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
    scala> counts.saveAsTextFile("s3a://s3-to-ec2/output")

    Yielding the output:



Reading and Writing Data Sources From and To Amazon S3

The following example illustrates how to read a text file from Amazon S3 into an RDD, convert the RDD to a DataFrame, and then use the Data Source API to write the DataFrame into a Parquet file on Amazon S3:

  1. Specify Amazon S3 credentials.
  2. Read a text file in Amazon S3:
    scala> val sample_07 = sc.textFile("s3a://s3-to-ec2/sample_07.csv")
  3. Map lines into columns:
    scala> import org.apache.spark.sql.Row
    scala> val rdd_07 = sample_07.map(_.split('\t')).map(e ⇒ Row(e(0), e(1), e(2).trim.toInt, e(3).trim.toInt))
  4. Create a schema and apply to the RDD to create a DataFrame:
    scala> import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
    scala> val schema = StructType(Array(
      StructField("code",StringType,false),
      StructField("description",StringType,false),
      StructField("total_emp",IntegerType,false),
      StructField("salary",IntegerType,false)))
    
    scala> val df_07 = sqlContext.createDataFrame(rdd_07,schema)
  5. Write DataFrame to a Parquet file:
    scala> df_07.write.parquet("s3a://s3-to-ec2/sample_07.parquet")


    The files are compressed with the default gzip compression.