pyspark 学习笔记

pypark用法记录。

配置环境变量

  • copy以下文件到python安装目录下的Lib\site-packages目录中

    1
    2
    {SPARK_HOME}\python\lib\py4j-0.10.7-src.zip
    {SPARK_HOME}\python\lib\pyspark.zip
  • 解压上述文件到当前目录

pyspark shell

1
2
cd {SPARK_HOME}/bin
./pyspark

统计示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.4
/_/

Using Python version 3.7.1 (v3.7.1:260ec2c36a, Oct 20 2018 14:57:15)
SparkSession available as 'spark'.
>>> import random
>>> lst = random.choices(range(5), k=10)
>>> lst
[1, 4, 3, 1, 4, 1, 2, 4, 1, 2]
>>> rdd = sc.parallelize(lst)
>>> rdd1 = rdd.map(lambda x: (x, 1))
>>> rdd1.reduceByKey(lambda x, y: x + y).collect()
[(4, 3), (1, 4), (2, 2), (3, 1)]

pyspark读取路径问题

读取hdfs路径内容

1
2
>>> path = "hdfs://ip:port/bigdata/hello.txt"
>>> rdd = sc.textFile(path)

读取local路径内容

1
2
>>> path = "file:///opt/bigdata/hello.txt"
>>> rdd = sc.textFile(path)

查看分区

  • sc.textFile方式默认分区计算公式

    1
    2
    math.min(defaultParallelism, 2)
    math.max(totalCoreCount.get(), 2)
  • sc.parallelize方式默认分区计算公式

    1
    math.max(totalCoreCount.get(), 2)
  • 查看rdd分区个数

    • 方式一

      1
      2
      3
      >>> rdd = sc.parallelize(range(10))
      >>> rdd.getNumPartitions()
      4
    • 方式二

      1
      2
      3
      >>> rdd = sc.parallelize(range(10))
      >>> rdd.glom().collect() # 数据按照分区形式打印
      [[0, 1], [2, 3, 4], [5, 6], [7, 8, 9]]

常用算子

map

1
2
3
>>> rdd = sc.parallelize(range(5))
>>> rdd.map(lambda i: range(i)).collect()
[range(0, 0), range(0, 1), range(0, 2), range(0, 3), range(0, 4)]

flatMap

1
2
3
>>> rdd = sc.parallelize(range(5))
>>> rdd.flatMap(lambda i: range(i)).collect()
[0, 0, 1, 0, 1, 2, 0, 1, 2, 3]

reduce

1
2
3
>>> rdd = sc.parallelize(range(1, 6))
>>> rdd.reduce(lambda x, y: x + y)
15

fold

1
2
3
>>> rdd = sc.parallelize(range(1, 6))
>>> rdd.fold(0, lambda x, y: x + y)
15

aggregate

1
2
3
4
5
6
>>> rdd = sc.parallelize([2, 5, 3, 1], 2)
>>> rdd.glom().collect()
[[2, 5], [3, 1]]

>>> rdd.aggregate(0, lambda x, y: x + y, lambda x, y: x + y)
11

注:lambda x, y: x + y中,x为临时聚合值,y为当前元素值。

filter

1
2
3
>>> rdd = sc.parallelize(range(1, 10))
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4, 6, 8]

distinct

1
2
3
4
5
6
>>> lst = random.choices(range(1, 6), k=10)
>>> rdd = sc.parallelize(lst)
>>> rdd.collect()
[2, 1, 4, 5, 4, 3, 3, 1, 2, 5]
>>> rdd.distinct().collect()
[4, 1, 5, 2, 3]

intersection、union

1
2
3
4
5
6
7
8
9
10
>>> rdd1 = sc.parallelize(["C", "A", "B", "B"])
>>> rdd2 = sc.parallelize(["A", "A", "B", "B", "D"])

# 交集
>>> rdd1.intersection(rdd2).collect()
['B', 'A']

# 并集
>>> rdd.union(rdd2).collect()
['C', 'A', 'B', 'B', 'A', 'A', 'B', 'B', 'D']

sortBy

1
2
3
4
5
6
7
8
9
10
11
>>> lst = random.choices(range(20), k=6)
[19, 6, 13, 19, 2, 3]

>>> rdd = sc.parallelize(lst)
# 正序排列
>>> rdd.sortBy(lambda x: x).collect()
[2, 3, 6, 13, 19, 19]

# 倒序排列
>>> rdd.sortBy(lambda x: x, ascending=False).collect()
[19, 19, 13, 6, 3, 2]

groupByKey

1
2
3
4
5
6
7
8
9
10
11
>>> rdd = sc.parallelize("abccda")
>>> rdd.collect()
['a', 'b', 'c', 'c', 'd', 'a']
>>> rdd1 = rdd.map(lambda x: (x, 1))
>>> rdd1.collect()
[('a', 1), ('b', 1), ('c', 1), ('c', 1), ('d', 1), ('a', 1)]

>>> rdd1.groupByKey().collect()
[('b', <pyspark.resultiterable.ResultIterable object at 0x0000015412DE3EF0>), ('c', <pyspark.resultiterable.ResultIterable object at 0x0000015412DE3F98>), ('a', <pyspark.resultiterable.ResultIterable object at 0x0000015412DE8080>), ('d', <pyspark.resultiterable.ResultIterable object at 0x0000015412DE80B8>)]

注:<pyspark.resultiterable.ResultIterable object at 0x0000015412DE3EF0>包含属性 'data', 'index', 'maxindex'

reduceByKey

1
2
3
4
>>> rdd = sc.parallelize("abccda")
>>> rdd1 = rdd.map(lambda x: (x, 1))
>>> rdd1.reduceByKey(lambda x, y: x + y).collect()
[('b', 1), ('c', 2), ('a', 2), ('d', 1)]

reduceByKeyLocally

1
2
3
4
5
6
>>> rdd = sc.parallelize("abccda")
>>> rdd1 = rdd.map(lambda x: (x, 1))
>>> rdd1.reduceByKeyLocally(lambda x, y: x + y)
{'a': 2, 'b': 1, 'c': 2, 'd': 1}

注: reduceByKeyLocally返回结果不是rdd,返回字典。

aggregateByKey

1
2
3
4
5
6
7
8
>>> rdd = sc.parallelize("abccda")
>>> rdd1 = rdd.map(lambda x: (x, 1))

>>> zeroValue = 0
>>> seqFunc = lambda x, y: x + y
>>> seqFunc = lambda x, y: x + y
>>> rdd1.aggregateByKey(zeroValue, seqFunc, seqFunc).collect()
[('b', 1), ('c', 2), ('a', 2), ('d', 1)]

异常处理

Exception: It appears that you are attempting to reference

  • 在pyspark中调用类方法,报错,如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    from pyspark.conf import SparkConf
    from pyspark.context import SparkContext

    conf = SparkConf()
    conf.setMaster("local").setAppName("my app")
    sc = SparkContext(conf=conf)

    class Test:
    def add(self, x):
    return [x + 5]

    def test(self):
    rdd = sc.parallelize(range(5)).flatMap(self.add).collect()

    Test().test()

    错误信息:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    During handling of the above exception, another exception occurred:

    Traceback (most recent call last):
    File "E:/Python37/spark_test.py", line 17, in <module>
    Test().test()
    File "E:/Python37/spark_test.py", line 14, in test
    rdd = sc.parallelize(range(5)).map(self.add).collect()
    File "E:\Python37\lib\site-packages\pyspark\rdd.py", line 816, in collect
    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    File "E:\Python37\lib\site-packages\pyspark\rdd.py", line 2532, in _jrdd
    self._jrdd_deserializer, profiler)
    File "E:\Python37\lib\site-packages\pyspark\rdd.py", line 2434, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    File "E:\Python37\lib\site-packages\pyspark\rdd.py", line 2420, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
    File "E:\Python37\lib\site-packages\pyspark\serializers.py", line 600, in dumps
    raise pickle.PicklingError(msg)
    _pickle.PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
  • 说明

    spark不允许在action或transformation中访问SparkContext,如果你的action或transformation中引用了self,那么spark会将整个对象进行序列化,并将其发到工作节点上,这其中就保留了SparkContext,即使没有显式的访问它,它也会在闭包内被引用,所以会出错。

  • 处理

    将调用的类方法定义为静态方法 @staticmethod

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    from pyspark.conf import SparkConf
    from pyspark.context import SparkContext

    conf = SparkConf()
    conf.setMaster("local").setAppName("my app")
    sc = SparkContext(conf=conf)


    class Test:
    @staticmethod
    def add(x):
    return [x + 5]

    def test(self):
    rdd = sc.parallelize(range(5)).flatMap(Test.add)
    print(rdd.collect())

    Test().test()
    # [5, 6, 7, 8, 9]

拓展

Spark & PySpark 使用手册

Spark 2.2.x 中文官方参考文档

子雨大叔据之Spark入门教程(Python版)

Spark性能优化指南——基础篇

Spark性能优化指南——高级篇: 解决数据倾斜(通常出现在distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition)的几个方案。

aokoInychyi/spark-streaming-kafka-example: scala版本的示例代码

-------------本文结束感谢您的阅读-------------