patternpythonModerate
Generic “reduceBy” or “groupBy + aggregate” functionality with Spark DataFrame
Viewed 0 times
genericwithgroupbyreducebysparkdataframefunctionalityaggregate
Problem
Maybe I totally reinvented the wheel, or maybe I've invented something new and useful. Can one of you tell me if there's a better way of doing this? Here's what I'm trying to do:
I want a generic
Basically, I'm taking a Spark DataFrame that looks like this:
and reducing by the column 'name' with a custom function to get this:
I just noticed the change in column order. I think I can fix that pretty quickly by taking note of the schema before beginning. But anyway, I had to write a ton of code to get that to work, and this seems like such a simple operation somebody else should have done it by no
I want a generic
reduceBy function, that works like an RDD's reduceByKey, but will let me group data by any column in a Spark DataFrame. You may say that we already have that, and it's called groupBy, but as far as I can tell, groupBy only lets you aggregate using some very limited options. I want to groupBy, and then run an arbitrary function to aggregate. Has anyone already done that?Basically, I'm taking a Spark DataFrame that looks like this:
+----------+---------+-----+-------------+------------+-------------------+
| birthdate|favecolor| name|twitterhandle|facebookpage| favesong|
+----------+---------+-----+-------------+------------+-------------------+
|2000-01-01| blue|Alice| allyblue| null| null|
|1999-12-31| null| Bob| null| BobbyG| Gangsters Paradise|
| null| null|Alice| null| null|Rolling in the Deep|
+----------+---------+-----+-------------+------------+-------------------+and reducing by the column 'name' with a custom function to get this:
+----------+---------+-------------------+-----+-------------+------------+
| birthdate|favecolor| favesong| name|twitterhandle|facebookpage|
+----------+---------+-------------------+-----+-------------+------------+
|2000-01-01| blue|Rolling in the Deep|Alice| allyblue| null|
|1999-12-31| null| Gangsters Paradise| Bob| null| BobbyG|
+----------+---------+-------------------+-----+-------------+------------+I just noticed the change in column order. I think I can fix that pretty quickly by taking note of the schema before beginning. But anyway, I had to write a ton of code to get that to work, and this seems like such a simple operation somebody else should have done it by no
Solution
The biggest issue here is a following chunk of code:
which unfortunately renders your approach useless in a general case:
All of that can work pretty well if number of unique keys is small but can become prohibitively expensive otherwise. One possible improvement is to use to
Another problem I see is a subsequent loop:
These issues could be partially addressed (assuming data fits in the memory) by using only a single
I want to groupBy, and then run an arbitrary function to aggregate. Has anyone already done that?
Kind of. Since 1.5.0 Spark supports UDAFs (User Defined Aggregate Functions) which can be used to apply any commutative and associative function. These can defined only using Scala / Java but with some effort can be used from Python. See How to map Python with Scala or Java User Defined Functions?.
If you're not very fond of an idea of writing Scala code an alternative approach is use RDD methods like this:
Which can be used as follows:
It still has to move data between JVM an Python but doesn't suffer from other issues.
Finally to answer a question from the comments
reduce function (...) Why did Python 3 get rid of it?
Because Guido van Rossum hates
This is actually the one I've always hated most, because, apart from a few examples involving + or *, almost every time I see a reduce() call with a non-trivial function argument, I need to grab pen and paper to diagram what's actually being fed into that function before I understand what the reduce() is supposed to do.
Actually it is still out there but it has been moved to
unique_entries = df.select(col).distinct().collect()which unfortunately renders your approach useless in a general case:
- it assumes that the result can fit into driver memory which may or may not be true.
- finding
distinctelements is an expensive process in a distributed application.
collecthas to transfer data to the driver and pass data to the local Python interpreter converting from internal representation toPythonRDDsomewhere on the way.
All of that can work pretty well if number of unique keys is small but can become prohibitively expensive otherwise. One possible improvement is to use to
toLocalIterator instead of collect. It is more expensive because it triggers multiple jobs but fetches only a single partition at the time.Another problem I see is a subsequent loop:
- depending on a distribution of the keys
reducepart can result in suboptimal resource usage up to the point when execution becomes completely sequential.
- once again it has to collect data to the driver with all the related issues
- iterative
unioncan generate long lineages. It makes failure recovery expensive and can simply fail due to stack overflow
- using
parallelizeon the small datasets (like a single row) is far from optimal especially combined with iterativeunion. It will result in a large number of empty partitions and growing number of total partitions and can show a similar behavior to the one described in Spark iteration time increasing exponentially when using join
- last but not least iterative
createDataFramewithout specifying schema requires expensive schema inference.
These issues could be partially addressed (assuming data fits in the memory) by using only a single
SparkContext.union, or even better single parallelize, followed by single a createDataFrame.I want to groupBy, and then run an arbitrary function to aggregate. Has anyone already done that?
Kind of. Since 1.5.0 Spark supports UDAFs (User Defined Aggregate Functions) which can be used to apply any commutative and associative function. These can defined only using Scala / Java but with some effort can be used from Python. See How to map Python with Scala or Java User Defined Functions?.
If you're not very fond of an idea of writing Scala code an alternative approach is use RDD methods like this:
from pyspark.sql import Row
from pyspark.sql.functions import struct
from pyspark.sql import DataFrame
from collections import OrderedDict
def reduce_by(self, by, cols, f, schema=None):
"""
:param self DataFrame
:param by a list of grouping columns
:param cols a list of columns to aggregate
:param aggregation function Row => Row
:return DataFrame
"""
def merge_kv(kv):
key, value = kv
return Row(**OrderedDict(zip(
key.__fields__ + value.__fields__, key + value)
))
return (self
.select(struct(*by), struct(*cols))
.rdd
.reduceByKey(f)
.map(merge_kv)
.toDF(schema))
DataFrame.reduce_by = reduce_by # A quick monkey patchWhich can be used as follows:
def foo(row1, row2):
""" A dummy function
>>> foo(Row(x=1, y=None), Row(x=None, y=2))
Row(x=1, y=2)
"""
return Row(**OrderedDict(zip(
row1.__fields__, (x if x else y for (x, y) in zip(row1, row2))
)))
# Example data
df = sc.parallelize([
("a", None, 1), ("a", None, 2), ("a", 3, None),
("b", None, 2), ("b", None, None), ("c", 1, -1)
]).toDF(["k", "v1", "v2"])
df.reduce_by(by=["k"], cols=["v1", "v2"], f=foo).show()
## +---+----+---+
## | k| v1| v2|
## +---+----+---+
## | a| 3| 1|
## | c| 1| -1|
## | b|null| 2|
## +---+----+---+It still has to move data between JVM an Python but doesn't suffer from other issues.
Finally to answer a question from the comments
reduce function (...) Why did Python 3 get rid of it?
Because Guido van Rossum hates
reduce :) To quote All Things Pythonic [1]:This is actually the one I've always hated most, because, apart from a few examples involving + or *, almost every time I see a reduce() call with a non-trivial function argument, I need to grab pen and paper to diagram what's actually being fed into that function before I understand what the reduce() is supposed to do.
Actually it is still out there but it has been moved to
functools. Personally I would recommend toolz instead which provides a comprehensive set of functional utilities and as a bonus can serve as compatibility layer between Python 2.6+ and 3.3+.- Guido van van Rossum (2005, March 10). All Things Pythonic. The fate of reduce() in Python 3000. Retrieved from http://www.artima.com/weblogs/viewpost.jsp?thread=98196
Code Snippets
unique_entries = df.select(col).distinct().collect()from pyspark.sql import Row
from pyspark.sql.functions import struct
from pyspark.sql import DataFrame
from collections import OrderedDict
def reduce_by(self, by, cols, f, schema=None):
"""
:param self DataFrame
:param by a list of grouping columns
:param cols a list of columns to aggregate
:param aggregation function Row => Row
:return DataFrame
"""
def merge_kv(kv):
key, value = kv
return Row(**OrderedDict(zip(
key.__fields__ + value.__fields__, key + value)
))
return (self
.select(struct(*by), struct(*cols))
.rdd
.reduceByKey(f)
.map(merge_kv)
.toDF(schema))
DataFrame.reduce_by = reduce_by # A quick monkey patchdef foo(row1, row2):
""" A dummy function
>>> foo(Row(x=1, y=None), Row(x=None, y=2))
Row(x=1, y=2)
"""
return Row(**OrderedDict(zip(
row1.__fields__, (x if x else y for (x, y) in zip(row1, row2))
)))
# Example data
df = sc.parallelize([
("a", None, 1), ("a", None, 2), ("a", 3, None),
("b", None, 2), ("b", None, None), ("c", 1, -1)
]).toDF(["k", "v1", "v2"])
df.reduce_by(by=["k"], cols=["v1", "v2"], f=foo).show()
## +---+----+---+
## | k| v1| v2|
## +---+----+---+
## | a| 3| 1|
## | c| 1| -1|
## | b|null| 2|
## +---+----+---+Context
StackExchange Code Review Q#115082, answer score: 11
Revisions (0)
No revisions yet.