patternpythonMinor
Implementing an inner product using pyspark
Viewed 0 times
pysparkproductusinginnerimplementing
Problem
I'm trying to implement a dot product using pyspark in order to learn pyspark's syntax.
I've currently implemented the dot product like so:
My solution feels inelegant (particularly my lambda function). I'd like to know if there would be a more 'pysparkian' way of writing this.
Furthermore, are there performance considerations that I should be thinking about with regards to this problem (i.e. does my dot product solution not scale well)?
I've currently implemented the dot product like so:
import operator as op
from functools import reduce
def inner(rdd, rdd2):
return (rdd.zip(rdd2)
.map(lambda x: reduce(op.mul, x))
.reduce(lambda x,y: x + y)
)My solution feels inelegant (particularly my lambda function). I'd like to know if there would be a more 'pysparkian' way of writing this.
Furthermore, are there performance considerations that I should be thinking about with regards to this problem (i.e. does my dot product solution not scale well)?
Solution
Since zipped RDD contains only two-element tuples using using
with
or standalone function:
Final
The only part that remains is
It will be less efficient than your current solution but more robust.
reduce doesn't makes sense. You can safely replacelambda x: reduce(op.mul, x)with
lambda x: x[0] * x[1]or standalone function:
def mul(xy):
x, y = xy
return x * yFinal
reduce can be replaced with a simple sum. Putting these two pieces together:def inner(xs, ys):
return xs.zip(ys).map(lambda xy: xy[0] * xy[1]).sum()The only part that remains is
zip function. It requires both that both RDDs have the same number of partitions and elements per partition. While the first part is pretty easy to achieve keeping a correct order and balancing number of elements can be tricky. You can try to generalize your function using joins:def inner(xs, ys):
def swap(xi):
x, i = xi
return i, x
# We use sortBy key to avoid shuffling during join
xs_i = xs.zipWithIndex().map(swap).sortByKey()
ys_i = ys.zipWithIndex().map(swap).sortByKey()
return xs_i.join(ys_i, ).values().map(lambda xy: xy[0] * xy[1]).sum()It will be less efficient than your current solution but more robust.
Code Snippets
lambda x: reduce(op.mul, x)lambda x: x[0] * x[1]def mul(xy):
x, y = xy
return x * ydef inner(xs, ys):
return xs.zip(ys).map(lambda xy: xy[0] * xy[1]).sum()def inner(xs, ys):
def swap(xi):
x, i = xi
return i, x
# We use sortBy key to avoid shuffling during join
xs_i = xs.zipWithIndex().map(swap).sortByKey()
ys_i = ys.zipWithIndex().map(swap).sortByKey()
return xs_i.join(ys_i, ).values().map(lambda xy: xy[0] * xy[1]).sum()Context
StackExchange Code Review Q#117317, answer score: 4
Revisions (0)
No revisions yet.