HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonModerate

Generic “reduceBy” or “groupBy + aggregate” functionality with Spark DataFrame

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
genericwithgroupbyreducebysparkdataframefunctionalityaggregate

Problem

Maybe I totally reinvented the wheel, or maybe I've invented something new and useful. Can one of you tell me if there's a better way of doing this? Here's what I'm trying to do:

I want a generic reduceBy function, that works like an RDD's reduceByKey, but will let me group data by any column in a Spark DataFrame. You may say that we already have that, and it's called groupBy, but as far as I can tell, groupBy only lets you aggregate using some very limited options. I want to groupBy, and then run an arbitrary function to aggregate. Has anyone already done that?

Basically, I'm taking a Spark DataFrame that looks like this:

+----------+---------+-----+-------------+------------+-------------------+
| birthdate|favecolor| name|twitterhandle|facebookpage|           favesong|
+----------+---------+-----+-------------+------------+-------------------+
|2000-01-01|     blue|Alice|     allyblue|        null|               null|
|1999-12-31|     null|  Bob|         null|      BobbyG| Gangsters Paradise|
|      null|     null|Alice|         null|        null|Rolling in the Deep|
+----------+---------+-----+-------------+------------+-------------------+


and reducing by the column 'name' with a custom function to get this:

+----------+---------+-------------------+-----+-------------+------------+
| birthdate|favecolor|           favesong| name|twitterhandle|facebookpage|
+----------+---------+-------------------+-----+-------------+------------+
|2000-01-01|     blue|Rolling in the Deep|Alice|     allyblue|        null|
|1999-12-31|     null| Gangsters Paradise|  Bob|         null|      BobbyG|
+----------+---------+-------------------+-----+-------------+------------+


I just noticed the change in column order. I think I can fix that pretty quickly by taking note of the schema before beginning. But anyway, I had to write a ton of code to get that to work, and this seems like such a simple operation somebody else should have done it by no

Solution

The biggest issue here is a following chunk of code:

unique_entries = df.select(col).distinct().collect()


which unfortunately renders your approach useless in a general case:

  • it assumes that the result can fit into driver memory which may or may not be true.



  • finding distinct elements is an expensive process in a distributed application.



  • collect has to transfer data to the driver and pass data to the local Python interpreter converting from internal representation to PythonRDD somewhere on the way.



All of that can work pretty well if number of unique keys is small but can become prohibitively expensive otherwise. One possible improvement is to use to toLocalIterator instead of collect. It is more expensive because it triggers multiple jobs but fetches only a single partition at the time.

Another problem I see is a subsequent loop:

  • depending on a distribution of the keys reduce part can result in suboptimal resource usage up to the point when execution becomes completely sequential.



  • once again it has to collect data to the driver with all the related issues



  • iterative union can generate long lineages. It makes failure recovery expensive and can simply fail due to stack overflow



  • using parallelize on the small datasets (like a single row) is far from optimal especially combined with iterative union. It will result in a large number of empty partitions and growing number of total partitions and can show a similar behavior to the one described in Spark iteration time increasing exponentially when using join



  • last but not least iterative createDataFrame without specifying schema requires expensive schema inference.



These issues could be partially addressed (assuming data fits in the memory) by using only a single SparkContext.union, or even better single parallelize, followed by single a createDataFrame.


I want to groupBy, and then run an arbitrary function to aggregate. Has anyone already done that?

Kind of. Since 1.5.0 Spark supports UDAFs (User Defined Aggregate Functions) which can be used to apply any commutative and associative function. These can defined only using Scala / Java but with some effort can be used from Python. See How to map Python with Scala or Java User Defined Functions?.

If you're not very fond of an idea of writing Scala code an alternative approach is use RDD methods like this:

from pyspark.sql import Row
from pyspark.sql.functions import struct
from pyspark.sql import DataFrame
from collections import OrderedDict

def reduce_by(self, by, cols, f, schema=None):
    """
    :param self DataFrame
    :param by a list of grouping columns 
    :param cols a list of columns to aggregate
    :param aggregation function Row => Row
    :return DataFrame
    """
    def merge_kv(kv):
        key, value = kv
        return Row(**OrderedDict(zip(
            key.__fields__ + value.__fields__, key + value)
        ))

    return (self
        .select(struct(*by), struct(*cols))
        .rdd
        .reduceByKey(f)
        .map(merge_kv)
        .toDF(schema))

DataFrame.reduce_by = reduce_by  # A quick monkey patch


Which can be used as follows:

def foo(row1, row2):
    """ A dummy function
    >>> foo(Row(x=1, y=None), Row(x=None, y=2))
    Row(x=1, y=2)
    """
    return Row(**OrderedDict(zip(
      row1.__fields__, (x if x else y for (x, y) in zip(row1, row2))
    )))

# Example data
df = sc.parallelize([
    ("a", None, 1), ("a", None, 2), ("a", 3, None),
    ("b", None, 2), ("b", None, None), ("c", 1, -1)
]).toDF(["k", "v1", "v2"])

df.reduce_by(by=["k"], cols=["v1", "v2"], f=foo).show()

## +---+----+---+
## |  k|  v1| v2|
## +---+----+---+
## |  a|   3|  1|
## |  c|   1| -1|
## |  b|null|  2|
## +---+----+---+


It still has to move data between JVM an Python but doesn't suffer from other issues.

Finally to answer a question from the comments


reduce function (...) Why did Python 3 get rid of it?

Because Guido van Rossum hates reduce :) To quote All Things Pythonic [1]:


This is actually the one I've always hated most, because, apart from a few examples involving + or *, almost every time I see a reduce() call with a non-trivial function argument, I need to grab pen and paper to diagram what's actually being fed into that function before I understand what the reduce() is supposed to do.

Actually it is still out there but it has been moved to functools. Personally I would recommend toolz instead which provides a comprehensive set of functional utilities and as a bonus can serve as compatibility layer between Python 2.6+ and 3.3+.

  • Guido van van Rossum (2005, March 10). All Things Pythonic. The fate of reduce() in Python 3000. Retrieved from http://www.artima.com/weblogs/viewpost.jsp?thread=98196

Code Snippets

unique_entries = df.select(col).distinct().collect()
from pyspark.sql import Row
from pyspark.sql.functions import struct
from pyspark.sql import DataFrame
from collections import OrderedDict

def reduce_by(self, by, cols, f, schema=None):
    """
    :param self DataFrame
    :param by a list of grouping columns 
    :param cols a list of columns to aggregate
    :param aggregation function Row => Row
    :return DataFrame
    """
    def merge_kv(kv):
        key, value = kv
        return Row(**OrderedDict(zip(
            key.__fields__ + value.__fields__, key + value)
        ))

    return (self
        .select(struct(*by), struct(*cols))
        .rdd
        .reduceByKey(f)
        .map(merge_kv)
        .toDF(schema))

DataFrame.reduce_by = reduce_by  # A quick monkey patch
def foo(row1, row2):
    """ A dummy function
    >>> foo(Row(x=1, y=None), Row(x=None, y=2))
    Row(x=1, y=2)
    """
    return Row(**OrderedDict(zip(
      row1.__fields__, (x if x else y for (x, y) in zip(row1, row2))
    )))

# Example data
df = sc.parallelize([
    ("a", None, 1), ("a", None, 2), ("a", 3, None),
    ("b", None, 2), ("b", None, None), ("c", 1, -1)
]).toDF(["k", "v1", "v2"])

df.reduce_by(by=["k"], cols=["v1", "v2"], f=foo).show()

## +---+----+---+
## |  k|  v1| v2|
## +---+----+---+
## |  a|   3|  1|
## |  c|   1| -1|
## |  b|null|  2|
## +---+----+---+

Context

StackExchange Code Review Q#115082, answer score: 11

Revisions (0)

No revisions yet.