sql - How to aggregate data using computed groups -
my data stored in spark data frame in form (roughly)
col1 col2 a1 -5 b1 -20 c1 7 a2 3 b2 -4 c2 17 i want turn into:
col3 col4 2 b -24 c 24 (adding numbers , concatenating x1 , x1 x)
how can using data frame api?
edit:
the col1 values arbitrary strings (endpoints) want concatenate 1 column (span), maybe in form "a1-a2". planning on mapping endpoints other endpoints in map , querying in udf. can udf return none? - let's didn't want include a in col3 @ all, did want include b , c, add case example a rows skipped when mapping col1 col3?
you can extract group column , use group aggregation. assuming data follows pattern in example:
with raw sql:
case class record(col1: string, col2: int) val df = sqlcontext.createdataframe(seq( record("a1", -5), record("b1", -20), record("c1", 7), record("a2", 3), record("b2", -4), record("c2", 17))) df.registertemptable("df") sqlcontext.sql( """select col3, sum(col2) col4 ( select col2, substr(col1, 1, 1) col3 df ) tmp group col3""").show +----+----+ |col3|col4| +----+----+ | a| -2| | b| -24| | c| 24| +----+----+ with scala api:
import org.apache.spark.sql.functions.{udf, sum} val getgroup = udf((s: string) => s.substring(0, 1)) df .select(getgroup($"col1").alias("col3"), $"col2") .groupby($"col3") .agg(sum($"col2").alias("col4")) +----+----+ |col3|col4| +----+----+ | a| -2| | b| -24| | c| 24| +----+----+ if group pattern more complex can adjust substr or getgroup function. example:
val getgroup = { val pattern = "^[a-z]+".r udf((s: string) => pattern.findfirstin(s) match { case some(g) => g case none => "unknown" }) } edit :
if want ignore groups add where clause. raw sql straightforward scala api requires effort:
import org.apache.spark.sql.functions.{not, lit} df .select(...) // before .where(not($"col3".in(lit("a")))) .groupby(...).agg(...) // before if want discard multiple columns can use varargs:
val todiscard = list("a", "b").map(lit(_)) df .select(...) .where(not($"col3".in(todiscard: _*))) .groupby(...).agg(...) // before can udf return none?
it cannot can return null:
val getgroup2 = udf((s: string) => s.substring(0, 1) match { case x if x != "a" => x case _ => null: string }) df .select(getgroup2($"col1").alias("col3"), $"col2") .where($"col3".isnotnull) .groupby(...).agg(...) // before +----+----+ |col3|col4| +----+----+ | b| -24| | c| 24| +----+----+
Comments
Post a Comment