sql - How to aggregate data using computed groups -
my data stored in spark data frame in form (roughly)
col1 col2 a1 -5 b1 -20 c1 7 a2 3 b2 -4 c2 17
i want turn into:
col3 col4 2 b -24 c 24
(adding numbers , concatenating x1 , x1 x)
how can using data frame api?
edit:
the col1
values arbitrary strings (endpoints) want concatenate 1 column (span), maybe in form "a1-a2". planning on mapping endpoints other endpoints in map , querying in udf. can udf return none? - let's didn't want include a
in col3
@ all, did want include b
, c
, add case example a
rows skipped when mapping col1 col3?
you can extract group column , use group aggregation. assuming data follows pattern in example:
with raw sql:
case class record(col1: string, col2: int) val df = sqlcontext.createdataframe(seq( record("a1", -5), record("b1", -20), record("c1", 7), record("a2", 3), record("b2", -4), record("c2", 17))) df.registertemptable("df") sqlcontext.sql( """select col3, sum(col2) col4 ( select col2, substr(col1, 1, 1) col3 df ) tmp group col3""").show +----+----+ |col3|col4| +----+----+ | a| -2| | b| -24| | c| 24| +----+----+
with scala api:
import org.apache.spark.sql.functions.{udf, sum} val getgroup = udf((s: string) => s.substring(0, 1)) df .select(getgroup($"col1").alias("col3"), $"col2") .groupby($"col3") .agg(sum($"col2").alias("col4")) +----+----+ |col3|col4| +----+----+ | a| -2| | b| -24| | c| 24| +----+----+
if group pattern more complex can adjust substr
or getgroup
function. example:
val getgroup = { val pattern = "^[a-z]+".r udf((s: string) => pattern.findfirstin(s) match { case some(g) => g case none => "unknown" }) }
edit :
if want ignore groups add where
clause. raw sql straightforward scala api requires effort:
import org.apache.spark.sql.functions.{not, lit} df .select(...) // before .where(not($"col3".in(lit("a")))) .groupby(...).agg(...) // before
if want discard multiple columns can use varargs:
val todiscard = list("a", "b").map(lit(_)) df .select(...) .where(not($"col3".in(todiscard: _*))) .groupby(...).agg(...) // before
can udf return none?
it cannot can return null
:
val getgroup2 = udf((s: string) => s.substring(0, 1) match { case x if x != "a" => x case _ => null: string }) df .select(getgroup2($"col1").alias("col3"), $"col2") .where($"col3".isnotnull) .groupby(...).agg(...) // before +----+----+ |col3|col4| +----+----+ | b| -24| | c| 24| +----+----+
Comments
Post a Comment