pypark用法记录。
配置环境变量
copy以下文件到python安装目录下的Lib\site-packages目录中
1
2{SPARK_HOME}\python\lib\py4j-0.10.7-src.zip
{SPARK_HOME}\python\lib\pyspark.zip解压上述文件到当前目录
pyspark shell
1 | cd {SPARK_HOME}/bin |
统计示例
1 | Welcome to |
pyspark读取路径问题
读取hdfs路径内容
1 | >>> path = "hdfs://ip:port/bigdata/hello.txt" |
读取local路径内容
1 | >>> path = "file:///opt/bigdata/hello.txt" |
查看分区
sc.textFile方式默认分区计算公式
1
2math.min(defaultParallelism, 2)
math.max(totalCoreCount.get(), 2)sc.parallelize方式默认分区计算公式
1
math.max(totalCoreCount.get(), 2)
查看rdd分区个数
方式一
1
2
3>>> rdd = sc.parallelize(range(10))
>>> rdd.getNumPartitions()
4方式二
1
2
3>>> rdd = sc.parallelize(range(10))
>>> rdd.glom().collect() # 数据按照分区形式打印
[[0, 1], [2, 3, 4], [5, 6], [7, 8, 9]]
常用算子
map
1 | >>> rdd = sc.parallelize(range(5)) |
flatMap
1 | >>> rdd = sc.parallelize(range(5)) |
reduce
1 | >>> rdd = sc.parallelize(range(1, 6)) |
fold
1 | >>> rdd = sc.parallelize(range(1, 6)) |
aggregate
1 | >>> rdd = sc.parallelize([2, 5, 3, 1], 2) |
注:lambda x, y: x + y中,x为临时聚合值,y为当前元素值。
filter
1 | >>> rdd = sc.parallelize(range(1, 10)) |
distinct
1 | >>> lst = random.choices(range(1, 6), k=10) |
intersection、union
1 | >>> rdd1 = sc.parallelize(["C", "A", "B", "B"]) |
sortBy
1 | >>> lst = random.choices(range(20), k=6) |
groupByKey
1 | >>> rdd = sc.parallelize("abccda") |
reduceByKey
1 | >>> rdd = sc.parallelize("abccda") |
reduceByKeyLocally
1 | >>> rdd = sc.parallelize("abccda") |
aggregateByKey
1 | >>> rdd = sc.parallelize("abccda") |
异常处理
Exception: It appears that you are attempting to reference
在pyspark中调用类方法,报错,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15from pyspark.conf import SparkConf
from pyspark.context import SparkContext
conf = SparkConf()
conf.setMaster("local").setAppName("my app")
sc = SparkContext(conf=conf)
class Test:
def add(self, x):
return [x + 5]
def test(self):
rdd = sc.parallelize(range(5)).flatMap(self.add).collect()
Test().test()错误信息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "E:/Python37/spark_test.py", line 17, in <module>
Test().test()
File "E:/Python37/spark_test.py", line 14, in test
rdd = sc.parallelize(range(5)).map(self.add).collect()
File "E:\Python37\lib\site-packages\pyspark\rdd.py", line 816, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "E:\Python37\lib\site-packages\pyspark\rdd.py", line 2532, in _jrdd
self._jrdd_deserializer, profiler)
File "E:\Python37\lib\site-packages\pyspark\rdd.py", line 2434, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "E:\Python37\lib\site-packages\pyspark\rdd.py", line 2420, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "E:\Python37\lib\site-packages\pyspark\serializers.py", line 600, in dumps
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.说明
spark不允许在action或transformation中访问SparkContext,如果你的action或transformation中引用了self,那么spark会将整个对象进行序列化,并将其发到工作节点上,这其中就保留了SparkContext,即使没有显式的访问它,它也会在闭包内被引用,所以会出错。
处理
将调用的类方法定义为静态方法 @staticmethod
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19from pyspark.conf import SparkConf
from pyspark.context import SparkContext
conf = SparkConf()
conf.setMaster("local").setAppName("my app")
sc = SparkContext(conf=conf)
class Test:
def add(x):
return [x + 5]
def test(self):
rdd = sc.parallelize(range(5)).flatMap(Test.add)
print(rdd.collect())
Test().test()
# [5, 6, 7, 8, 9]
拓展
Spark性能优化指南——高级篇: 解决数据倾斜(通常出现在distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition)的几个方案。
aokoInychyi/spark-streaming-kafka-example: scala版本的示例代码