Friday, March 13, 2015


Scalding Recipes 

Retain some additional fields while grouping by on certain fields

This can be achieved by using the TypedAPI.  Below is an example where a TypedPipe is over a Tuple of (String, String, String, Long). We want to group the result by field 1 & 2, find the sum over field 4 and then return as result the tuple with 3 strings, one of which was to be retained after the group by.


import com.twitter.scalding.TypedPipe
def groupBy(pipe: TypedPipe[(String, String, String, Long)]) = {
  val grouped = pipe.groupBy { data: (String, String, String, Long) => (data._1, data._2) }  
  val agg = grouped.mapValues { case (data) => data._4 }.sum   
  val result = grouped.join(agg).toTypedPipe  
  val f = result.values.map { case (data, aggr) => (data._1, data._2, data._3, aggr) }  
}
If we’d like to aggregate multiple fields , we can create an aggregate class and then group by as below.

import com.sample.scalding.job.Aggregates
import com.twitter.scalding.TypedPipe
def groupBy (pipe: TypedPipe[(String, String, String, Long)]) = {
 val grouped = pipe.groupBy { data: (String, String, String, Long) => (data._1, data._2) } 

    val agg: TypedPipe[((String, String), Aggregates)] = pipe.map { t: (String, String, String, Long) => ((t._1, t._2), Aggregates(t._4, 1.0)) }
      .group
      .reduce { (partialAgg, current) =>
        val Aggregates(lsum, lsize) = partialAgg
        val Aggregates(rsum, rsize) = current
        Aggregates(lsum + rsum, lsize + 1)
      }

    val result = grouped.join(agg).toTypedPipe
    val f = result.values.map { case (data, aggr) => (data._1, data._2, data._3, aggr.sum, aggr.count) }
  }

package com.sample.scalding.job.Aggregates
case class Aggregates(sum: Double, count: Double)

 Cumulative Sum of Aggregated Field

Given a TypedPipe with T = (GroupField, (SortField, SummableField)), we can find the cumulative sum of a SummableField accumulated according to the sort field.
import com.twitter.scalding.TypedPipe
import com.twitter.scalding.typed.CumulativeSum._
def findCumulativeSum(pipe: TypedPipe[(String, String, Double, String)]): TypedPipe[(String, String, String, Double, Double)] = {
    val g = pipe
      .map {
        case (tup) =>
          ((tup._1, tup._2), ((tup._3, tup._4), tup._3)) //(GroupField, (SortField, SummableField))
      }
   
      .cumulativeSum
      
      .map {
        case ((region, vertical), ((amount, account), cumulative)) =>
          (region, vertical, account, amount, cumulative)
      }
    return g
  }