Monday, September 7, 2015

Spark RDD to Hive using Dataframes

My goal is to write out a Spark RDD into Hive table using Spark SQL Dataframes API.

Hive Table

I wanted my Spark job to write into a Hive External Table. So I first created an External Hive Table on my cluster.

CREATE EXTERNAL TABLE dataframe_test(
product string,
page string,
count int
 )
PARTITIONED BY (partitiondate String)
ROW FORMAT DELIMITED
STORED AS ORC
location '/user/dkhera/dataframe_test/';

Spark Job

Lets see how an RDD is converted into a dataframe and then written into a Hive Table.

We define a case class that defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. 



case class MyResult(product: String, page: String = "", count: Int = 0, datadate: String = "")

We will need to import sqlContext implicits, so we can implicitly convert RDDs to Dataframes.


import sqlContext.implicits._


Once this is done, we can convert our RDD to a dataframe using the toDF() function.

val result = myRDD.map(record => {
      val intermediateRecord = record.asInstanceOf[myRDD]
      (
MyResult(intermediateRecord.product, intermediateRecord.pageName), 1)
    })
      .reduceByKey(_ + _, 300).map(rec => { MyResult(rec._1.product, rec._1.page, rec._2, dateString) })
.toDF()


The last thing we need to do now is write the dataframe to the Hive Table. We write the RDD in ORC format. It is partitioned by field "partitiondate". We are setting the mode to be Append here, so if the table exists, data can be appended. Note that with saveAsTable the default location that Spark saves to is controlled by the HiveMetastore. This made me run into a ConnectionException (details at the end). Hence I have used options to add the path to my hive table.


val options = Map("path" -> hiveTablePath)
result.write.format("orc").partitionBy("partitiondate").options(options).mode(SaveMode.Append).saveAsTable(hiveTable)



Here is the complete Spark Job.
package com.spark.job
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.hive.HiveContext

object SparkHiveGenerator {
    
case class MyResult(product: String, page: String = "", count: Int = 0, datadate: String = "")
             def main(args: Array[String]): Unit = { 


val myRDD = ....<read your rdd here>...... \\RDD[Any]
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("set hive.exec.dynamic.partition.mode=nonstrict") //we need this property to be set so we can add partitions dynamically to the external table
import sqlContext.implicits._ //added so we are able to convert RDD to dataframe
val result = myRDD.map(record => {
      val intermediateRecord = record.asInstanceOf[myRDD]
      (
MyResult(intermediateRecord.product, intermediateRecord.pageName), 1)
    })
      .reduceByKey(_ + _, 300).map(rec => { MyResult(rec._1.product, rec._1.page, rec._2, dateString) })
.toDF()

//add path as an option. This is because With saveAsTable the default location that Spark saves to is controlled by the HiveMetastore.
//Throws a connectionException if this is not done.
val options = Map("path" -> hiveTablePath) //write data as partition to the given table and path in orc format. Append data if table exists   
result.write.format("orc").partitionBy("
partitiondate").options(options).mode(SaveMode.Overwrite).saveAsTable(hiveTable)
  }
 

Exceptions

If I write the table without specifying the path in Options, I run into a ConnectException as described below. The exception is thrown to be on is the wrong HDFS host. This is with saveAsTable the default location that Spark saves to is controlled by the HiveMetastore . Specifying the path in Options as shown above fixed this issue.

Exception in thread "main" java.net.ConnectException: Call From host0006.lvs.paypal.com/1.2.3.4 to host0006.lvs.paypal.com:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:783)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:730)
at org.apache.hadoop.ipc.Client.call(Client.java:1414)
at org.apache.hadoop.ipc.Client.call(Client.java:1363)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy17.getFileInfo(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
at com.sun.proxy.$Proxy17.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:699)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1762)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1124)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1120)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1120)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:100)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:950)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:950)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:336)
at org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:245)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:950)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:950)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:211)
at com.paypal.iscore.job.IScoreGeneratorNew$.main(IScoreGeneratorNew.scala:112)
at com.paypal.iscore.job.IScoreGeneratorNew.main(IScoreGeneratorNew.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Friday, March 13, 2015


Scalding Recipes 

Retain some additional fields while grouping by on certain fields

This can be achieved by using the TypedAPI.  Below is an example where a TypedPipe is over a Tuple of (String, String, String, Long). We want to group the result by field 1 & 2, find the sum over field 4 and then return as result the tuple with 3 strings, one of which was to be retained after the group by.


import com.twitter.scalding.TypedPipe
def groupBy(pipe: TypedPipe[(String, String, String, Long)]) = {
  val grouped = pipe.groupBy { data: (String, String, String, Long) => (data._1, data._2) }  
  val agg = grouped.mapValues { case (data) => data._4 }.sum   
  val result = grouped.join(agg).toTypedPipe  
  val f = result.values.map { case (data, aggr) => (data._1, data._2, data._3, aggr) }  
}
If we’d like to aggregate multiple fields , we can create an aggregate class and then group by as below.

import com.sample.scalding.job.Aggregates
import com.twitter.scalding.TypedPipe
def groupBy (pipe: TypedPipe[(String, String, String, Long)]) = {
 val grouped = pipe.groupBy { data: (String, String, String, Long) => (data._1, data._2) } 

    val agg: TypedPipe[((String, String), Aggregates)] = pipe.map { t: (String, String, String, Long) => ((t._1, t._2), Aggregates(t._4, 1.0)) }
      .group
      .reduce { (partialAgg, current) =>
        val Aggregates(lsum, lsize) = partialAgg
        val Aggregates(rsum, rsize) = current
        Aggregates(lsum + rsum, lsize + 1)
      }

    val result = grouped.join(agg).toTypedPipe
    val f = result.values.map { case (data, aggr) => (data._1, data._2, data._3, aggr.sum, aggr.count) }
  }

package com.sample.scalding.job.Aggregates
case class Aggregates(sum: Double, count: Double)

 Cumulative Sum of Aggregated Field

Given a TypedPipe with T = (GroupField, (SortField, SummableField)), we can find the cumulative sum of a SummableField accumulated according to the sort field.
import com.twitter.scalding.TypedPipe
import com.twitter.scalding.typed.CumulativeSum._
def findCumulativeSum(pipe: TypedPipe[(String, String, Double, String)]): TypedPipe[(String, String, String, Double, Double)] = {
    val g = pipe
      .map {
        case (tup) =>
          ((tup._1, tup._2), ((tup._3, tup._4), tup._3)) //(GroupField, (SortField, SummableField))
      }
   
      .cumulativeSum
      
      .map {
        case ((region, vertical), ((amount, account), cumulative)) =>
          (region, vertical, account, amount, cumulative)
      }
    return g
  }