Scalding Recipes 
Retain some additional fields while grouping by on certain fields
Retain some additional fields while grouping by on certain fields
This can be achieved by using the
TypedAPI.  Below is an example where a
TypedPipe is over a Tuple of (String, String, String,
Long). We want to group the result by field 1 & 2, find the sum over field
4 and then return as result the tuple with 3 strings, one of which was to be
retained after the group by.
import com.twitter.scalding.TypedPipe
def groupBy(pipe: TypedPipe[(String, String, String, Long)]) = {
  val grouped = pipe.groupBy { data: (String, String, String, Long) => (data._1, data._2) }  
  val agg = grouped.mapValues { case (data) => data._4 }.sum   
  val result = grouped.join(agg).toTypedPipe  
  val f = result.values.map { case (data, aggr) => (data._1, data._2, data._3, aggr) }  
}
If we’d like to aggregate multiple fields ,
we can create an aggregate class and then group by as below.
import com.sample.scalding.job.Aggregates
import com.twitter.scalding.TypedPipe
def groupBy (pipe: TypedPipe[(String, String, String, Long)]) = {
 val grouped = pipe.groupBy { data: (String, String, String, Long) => (data._1, data._2) } 
    val agg: TypedPipe[((String, String), Aggregates)] = pipe.map { t: (String, String, String, Long) => ((t._1, t._2), Aggregates(t._4, 1.0)) }
      .group
      .reduce { (partialAgg, current) =>
        val Aggregates(lsum, lsize) = partialAgg
        val Aggregates(rsum, rsize) = current
        Aggregates(lsum + rsum, lsize + 1)
      }
    val result = grouped.join(agg).toTypedPipe
    val f = result.values.map { case (data, aggr) => (data._1, data._2, data._3, aggr.sum, aggr.count) }
  }
package com.sample.scalding.job.Aggregates case class Aggregates(sum: Double, count: Double)
 Cumulative Sum of Aggregated Field
Given a TypedPipe with T = (GroupField, (SortField, SummableField)), we can find the cumulative sum of a SummableField accumulated according to the sort field.
Given a TypedPipe with T = (GroupField, (SortField, SummableField)), we can find the cumulative sum of a SummableField accumulated according to the sort field.
import com.twitter.scalding.TypedPipe
import com.twitter.scalding.typed.CumulativeSum._
def findCumulativeSum(pipe: TypedPipe[(String, String, Double, String)]): TypedPipe[(String, String, String, Double, Double)] = {
    val g = pipe
      .map {
        case (tup) =>
          ((tup._1, tup._2), ((tup._3, tup._4), tup._3)) //(GroupField, (SortField, SummableField))
      }
   
      .cumulativeSum
      
      .map {
        case ((region, vertical), ((amount, account), cumulative)) =>
          (region, vertical, account, amount, cumulative)
      }
    return g
  }
No comments:
Post a Comment