Spark的优化怎么做

2025-10-27 08:10:09

1、2. repartition and coalesce

Spark provides the `repartition()` function, which shuffles the data

across the network to create a new set of partitions. Keep in mind

that repartitioning your data is a fairly expensive operation. Spark

also has an optimized version of `repartition()` called `coalesce()`

that allows avoiding data movement, but only if you are decreasing

the number of RDD partitions. To know whether you can safely call

oalesce(), you can check the size of the RDD using `rdd.partitions.size()`

in Java/Scala and `rdd.getNumPartitions()` in Python and make sure

that you are coalescing it to fewer partitions than it currently has.

总结:当要对 rdd 进行重新分片时,如果目标片区数量小于当前片区数量,那么用coalesce,不要用repartition。关于partition 的更多优化细节,参考chapter 4 of Learning Spark

2、Passing Functions to Spark

In Python, we have three options for passing functions into Spark.

lambda expressions

   word = rdd.filter(lambda s: "error" in s)

top-level functions

     import my_personal_lib

     word = rdd.filter(my_personal_lib.containsError)

locally defined functions

   

   def containsError(s):

   return "error" in s

   word = rdd.filter(containsError)

One issue to watch out for when passing functions is inadvertently serializing the object containing the function. When you pass a function that is the member of an object, or contains references to fields in an object (e.g., self.field), Spark sends the entire object to worker nodes, which can be much larger than the bit of information you need. Sometimes this can also cause your program to fail, if your class contains objects that Python can’t figure out how to pickle.

 

### wrong way

class SearchFunctions(object):

def __init__(self, query):

self.query = query

def isMatch(self, s):

return self.query in s

def getMatchesFunctionReference(self, rdd):

# Problem: references all of "self" in "self.isMatch"

return rdd.filter(self.isMatch)

def getMatchesMemberReference(self, rdd):

# Problem: references all of "self" in "self.query"

return rdd.filter(lambda x: self.query in x)

### the right way

class WordFunctions(object):

...

def getMatchesNoReference(self, rdd):

# Safe: extract only the field we need into a local variable

query = self.query

return rdd.filter(lambda x: query in x)

声明:本网站引用、摘录或转载内容仅供网站访问者交流或参考,不代表本站立场,如存在版权或非法内容,请联系站长删除,联系邮箱:site.kefu@qq.com。
猜你喜欢