Monday, September 7, 2015

Spark RDD to Hive using Dataframes

My goal is to write out a Spark RDD into Hive table using Spark SQL Dataframes API.

Hive Table

I wanted my Spark job to write into a Hive External Table. So I first created an External Hive Table on my cluster.

CREATE EXTERNAL TABLE dataframe_test(
product string,
page string,
count int
 )
PARTITIONED BY (partitiondate String)
ROW FORMAT DELIMITED
STORED AS ORC
location '/user/dkhera/dataframe_test/';

Spark Job

Lets see how an RDD is converted into a dataframe and then written into a Hive Table.

We define a case class that defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. 



case class MyResult(product: String, page: String = "", count: Int = 0, datadate: String = "")

We will need to import sqlContext implicits, so we can implicitly convert RDDs to Dataframes.


import sqlContext.implicits._


Once this is done, we can convert our RDD to a dataframe using the toDF() function.

val result = myRDD.map(record => {
      val intermediateRecord = record.asInstanceOf[myRDD]
      (
MyResult(intermediateRecord.product, intermediateRecord.pageName), 1)
    })
      .reduceByKey(_ + _, 300).map(rec => { MyResult(rec._1.product, rec._1.page, rec._2, dateString) })
.toDF()


The last thing we need to do now is write the dataframe to the Hive Table. We write the RDD in ORC format. It is partitioned by field "partitiondate". We are setting the mode to be Append here, so if the table exists, data can be appended. Note that with saveAsTable the default location that Spark saves to is controlled by the HiveMetastore. This made me run into a ConnectionException (details at the end). Hence I have used options to add the path to my hive table.


val options = Map("path" -> hiveTablePath)
result.write.format("orc").partitionBy("partitiondate").options(options).mode(SaveMode.Append).saveAsTable(hiveTable)



Here is the complete Spark Job.
package com.spark.job
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.hive.HiveContext

object SparkHiveGenerator {
    
case class MyResult(product: String, page: String = "", count: Int = 0, datadate: String = "")
             def main(args: Array[String]): Unit = { 


val myRDD = ....<read your rdd here>...... \\RDD[Any]
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("set hive.exec.dynamic.partition.mode=nonstrict") //we need this property to be set so we can add partitions dynamically to the external table
import sqlContext.implicits._ //added so we are able to convert RDD to dataframe
val result = myRDD.map(record => {
      val intermediateRecord = record.asInstanceOf[myRDD]
      (
MyResult(intermediateRecord.product, intermediateRecord.pageName), 1)
    })
      .reduceByKey(_ + _, 300).map(rec => { MyResult(rec._1.product, rec._1.page, rec._2, dateString) })
.toDF()

//add path as an option. This is because With saveAsTable the default location that Spark saves to is controlled by the HiveMetastore.
//Throws a connectionException if this is not done.
val options = Map("path" -> hiveTablePath) //write data as partition to the given table and path in orc format. Append data if table exists   
result.write.format("orc").partitionBy("
partitiondate").options(options).mode(SaveMode.Overwrite).saveAsTable(hiveTable)
  }
 

Exceptions

If I write the table without specifying the path in Options, I run into a ConnectException as described below. The exception is thrown to be on is the wrong HDFS host. This is with saveAsTable the default location that Spark saves to is controlled by the HiveMetastore . Specifying the path in Options as shown above fixed this issue.

Exception in thread "main" java.net.ConnectException: Call From host0006.lvs.paypal.com/1.2.3.4 to host0006.lvs.paypal.com:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:783)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:730)
at org.apache.hadoop.ipc.Client.call(Client.java:1414)
at org.apache.hadoop.ipc.Client.call(Client.java:1363)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy17.getFileInfo(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
at com.sun.proxy.$Proxy17.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:699)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1762)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1124)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1120)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1120)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:100)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:950)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:950)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:336)
at org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:245)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:950)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:950)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:211)
at com.paypal.iscore.job.IScoreGeneratorNew$.main(IScoreGeneratorNew.scala:112)
at com.paypal.iscore.job.IScoreGeneratorNew.main(IScoreGeneratorNew.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

9 comments:

  1. It will help if you can replace hiveTablePath and hiveTable with actual input. I tried to follow the steps and I get error: value not found for the saveAsTable code even though I have created the external table.

    ReplyDelete
  2. This comment has been removed by the author.

    ReplyDelete
  3. This comment has been removed by the author.

    ReplyDelete
  4. This comment has been removed by the author.

    ReplyDelete
  5. This comment has been removed by the author.

    ReplyDelete
  6. This comment has been removed by the author.

    ReplyDelete
  7. This comment has been removed by the author.

    ReplyDelete
  8. which versions of hive and spark you used?

    ReplyDelete