Scalding Recipes
Retain some additional fields while grouping by on certain fields
Retain some additional fields while grouping by on certain fields
This can be achieved by using the
TypedAPI. Below is an example where a
TypedPipe is over a Tuple of (String, String, String,
Long). We want to group the result by field 1 & 2, find the sum over field
4 and then return as result the tuple with 3 strings, one of which was to be
retained after the group by.
import com.twitter.scalding.TypedPipe def groupBy(pipe: TypedPipe[(String, String, String, Long)]) = { val grouped = pipe.groupBy { data: (String, String, String, Long) => (data._1, data._2) } val agg = grouped.mapValues { case (data) => data._4 }.sum val result = grouped.join(agg).toTypedPipe val f = result.values.map { case (data, aggr) => (data._1, data._2, data._3, aggr) } }
If we’d like to aggregate multiple fields ,
we can create an aggregate class and then group by as below.
import com.sample.scalding.job.Aggregates import com.twitter.scalding.TypedPipe def groupBy (pipe: TypedPipe[(String, String, String, Long)]) = { val grouped = pipe.groupBy { data: (String, String, String, Long) => (data._1, data._2) } val agg: TypedPipe[((String, String), Aggregates)] = pipe.map { t: (String, String, String, Long) => ((t._1, t._2), Aggregates(t._4, 1.0)) } .group .reduce { (partialAgg, current) => val Aggregates(lsum, lsize) = partialAgg val Aggregates(rsum, rsize) = current Aggregates(lsum + rsum, lsize + 1) } val result = grouped.join(agg).toTypedPipe val f = result.values.map { case (data, aggr) => (data._1, data._2, data._3, aggr.sum, aggr.count) } }
package com.sample.scalding.job.Aggregates case class Aggregates(sum: Double, count: Double)
Cumulative Sum of Aggregated Field
Given a TypedPipe with T = (GroupField, (SortField, SummableField)), we can find the cumulative sum of a SummableField accumulated according to the sort field.
Given a TypedPipe with T = (GroupField, (SortField, SummableField)), we can find the cumulative sum of a SummableField accumulated according to the sort field.
import com.twitter.scalding.TypedPipe import com.twitter.scalding.typed.CumulativeSum._ def findCumulativeSum(pipe: TypedPipe[(String, String, Double, String)]): TypedPipe[(String, String, String, Double, Double)] = { val g = pipe .map { case (tup) => ((tup._1, tup._2), ((tup._3, tup._4), tup._3)) //(GroupField, (SortField, SummableField)) } .cumulativeSum .map { case ((region, vertical), ((amount, account), cumulative)) => (region, vertical, account, amount, cumulative) } return g }
No comments:
Post a Comment