原文:
towardsdatascience.com/make-your-way-from-pandas-to-pyspark-c50d5928f6c3
简介
我在 LinkedIn 和其他地方的一些数据科学社区中,经常看到人们质疑 PySpark。
让我们面对现实:数据科学是一个过于广泛的领域,任何人都不可能了解所有内容。所以,当我加入关于统计学的课程/社区时,有时人们会问PySpark 是什么,如何在 PySpark 中计算一些统计数据,以及许多其他类型的问题。
通常,那些已经使用 Pandas 的人特别感兴趣 Spark。我相信这有几个原因:
Pandas 当然非常著名,被数据科学家使用,但肯定不是最快的包。随着数据量的增加,速度成比例地下降。
对于已经精通 Pandas 的人来说,想要学习一个新的整理数据选项是自然的。随着数据的可用性和体积的增加,了解 Spark 是处理大数据的一个很好的选择。
Databricks 非常著名,PySpark 可能是平台上使用最广泛的语言,与 SQL 一起。
在这篇文章中,我们将学习和比较 Pandas 和 PySpark 的主要整理函数的代码片段:
总结
切片
过滤
分组
替换
安排
让我们深入探讨。
准备环境
为了简化,我们将在这个练习中使用 Google Colab。
一旦你在工具中打开一个新的笔记本,安装pyspark和py4j,你应该就可以开始了。
!pip install pyspark py4j这些是我们需要导入的模块。我们将导入SparkSession和functions模块,这样我们就可以使用mean、sum、max、when等方法。对于 Python 部分,只需pandas和numpy。
# import spark and functionsfrompyspark.sqlimportSparkSessionfrompyspark.sqlimportfunctionsasFfrompyspark.sql.functionsimportcol,mean,count,when# Imports from Pythonimportpandasaspdimportnumpyasnp由于我们正在使用 Jupyter Notebook 进行工作,在使用之前需要初始化 Spark 会话。请注意,如果你使用 Databricks 进行编码(社区版 – 免费),spark 会话已经与集群一起初始化,你不需要这样做。但在 Colab 中,我们需要这一行代码。
# Create a spark sessionspark=SparkSession.builder.appName("tests").getOrCreate()这是将要使用的这个数据集:加利福尼亚住房,一个在创意共享许可下的样本数据集。
Pandas 与 PySpark
现在的练习将是展示每个代码片段,并评论两种语法之间的差异。
加载数据和查看数据
加载数据时,这两种语法都很相似。在 Pandas 中,我们使用pd别名,而在 Spark 中,我们使用 Spark Session 的名称,在我们的例子中,我们将其命名为spark。我建议你使用这个名称作为默认值,因为许多教程会给你带有 spark 作为 Spark Session 名称的代码片段。
Pandas 使用带下划线的snake_case,而 Spark 将使用点.。
# Load data with Pandasdfp=pd.read_csv(pth)# View datadfp.head()https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/43f449f18fcaf215eedb8f9ca5dd3457.png
Pandas 显示的数据集。图片由作者提供。
此外,在 Spark 中,(模式)变量类型推断并不总是自动的,所以使用参数inferSchema=True。现在,让我们使用 Pyspark 加载并查看数据。.limit(n)方法将结果限制为我们想要显示的行数n。
# Load data to sessiondf=spark.read.csv(pth,header=True,inferSchema=True)# Visualizing the Datadf.limit(5).show()https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/e1afaecfe49a515360c64cc8ea1abc97.png
Spark 显示的数据集。图片由作者提供。
汇总
下一个比较是汇总。通常,使用.describe()方法总结数据集的统计信息是一种简单的方法。在 Spark 中,方法相同。
这是 Pandas。
# Summarizing Data in Pandasdfp.describe()https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/32b45400429145c2cc669c3a6ae13c28.png
从 Pandas 描述。图片由作者提供。
然而,请注意,Spark 提供的信息较少。百分位数没有显示。这可能是因为它旨在处理大量数据,因此简化输出意味着更少的计算,这应该使其更快。
# Summarizing Data with Sparkdf.describe().show()https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/40149b27e59525ecf12ef3ffa734c41f.png
从 Spark 描述。图片由作者提供。
下一个片段将显示数据的百分位数,以防你想查看。
# Percentiles Spark(df.agg(*[F.percentile(col,[.25,.5,.75])forcolindf.columns]).show())切片
切片是切割数据集以查看其特定部分。它与过滤不同,因为它不携带条件。
在 Pandas 中,代码如下。使用.loc[row,col]我们可以确定要显示的行号和列。
# Slicing (Selecting) Data in Pandasdfp.loc[10:20,['households','housing_median_age','median_house_value']]https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/e5caa8748549066707c629356ae805a5.png
Pandas 的切片数据。图片由作者提供。
注意,在 Spark 中,你不能直接通过行切片数据框,因为它不像 Pandas 那样索引。
Spark DataFrame 不像 Pandas 那样按行索引。
当你开始使用 Spark 时,你会注意到它的语法非常受 SQL 的影响。因此,为了切片数据,我们将从数据中选择我们想要看到的列select()。
# Slicing in Spark(df# dataset.select('households','housing_median_age','median_house_value')#select columns.limit(10)# limit how many rows to display.show()#show data)https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/9f55a3547ad30701c7298f34eb477f62.png
Spark 的切片数据。图片由作者提供。
下一个代码添加了行号列和按行切片的能力。然而,这是一个 PySpark 的中级代码,使用Window函数。我将在这里留下它,你可以通过参考文献部分的链接了解更多。
frompyspark.sql.windowimportWindowfrompyspark.sql.functionsimportrow_number# Slicing by columns and rows in Spark(df# dataset.select('households','housing_median_age','median_house_value')#select columns.withColumn('row_n',row_number().over(Window.orderBy('households')))#create row number column.filter(col('row_n').between(10,20))#slice by row number.show()#show data)过滤
过滤器允许我们只显示符合某些条件的行和列。
在 Pandas 中,我更喜欢使用.[query](https://medium.com/gustavorsantos/pandas-query-the-easiest-way-to-filter-data-39e0163ef35a)()函数。它更加直观且易于使用。但,当然,你也可以使用传统的切片表示法。
# Filtering data Pandasdfp.query('housing_median_age < 20').head(10)代码片段正在过滤自建设以来不到 20 年的房屋。
https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/5bb60796ad16f5dfb866fc089ebc6a7b.png
Pandas 的过滤数据。图片由作者提供。
在 Spark 中,过滤的方法可以是.filter()或.where()(类似于 SQL)。在过滤函数中,你可以添加col('col_name')和条件,如示例中的< 20。要添加其他条件,只需使用&表示 AND 和|表示 OR。
注意:在 Spark 中,每次我们对列应用转换或函数时,都必须使用
col()函数来使其成为一个对象。例如,在这里我们正在对列应用条件,因此需要使用col。
# Filtering in Spark(df#dataset.filter(col('housing_median_age')<20)# Filter.show()#display data)https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/c74e169260804be17c88f56fa21ea401.png
来自 Spark 的过滤数据。图片由作者提供。
分组
现在,让我们来看看最常用的数据整理函数之一,即group by,它允许我们对数据进行聚合。
在 Pandas 中,我们可以使用切片表示法来编写,也可以使用类似于 Spark 语法的字典风格。
# Grouping in Pandas(dfp.groupby('housing_median_age')['median_house_value'].mean().reset_index().sort_values('housing_median_age').head(10))# Get different aggregation values for different variables(dfp.groupby('housing_median_age').agg({'median_house_value':'mean','population':'max','median_income':'median'}).reset_index().sort_values('housing_median_age').head(10))https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/6b0a347df0296ae985106c212c17f252.png
使用 Pandas 分组的数据。图片由作者提供。
接下来是 Spark 代码。注意第二个片段与 Pandas 语法中的字典非常相似。
另一个观察结果是,对于mean()函数,我们不是使用F.mean,而是使用它来处理F.max和F.median。原因是所有的数据整理函数都在pyspark.sql.functions模块中。
在我们的代码开始时,我们单独导入了mean(from pyspark.sql.function import mean),而其他则作为from pyspark.sql import functions as F导入。因此,我们必须调用F.来调用那些没有单独导入的。这个规则就像 Python 代码中的常规模块导入规则一样。
# Grouping in Spark(df#dataset.groupBy('housing_median_age')#grouping.agg(mean('median_house_value').alias('median_house_value'))#aggregation func.sort('housing_median_age')#sort.show()#display)# Grouping different variables in Spark(df#dataset.groupBy('housing_median_age')#grouping.agg(mean('median_house_value').alias('median_house_value'),#aggregation funcsF.max('population').alias('population'),F.median('median_income').alias('median_income')).sort('housing_median_age')#sort.show()#display)https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/04d7ffcc8a0180fdd8ac01fdd09fc9eb.png
来自 Spark 的分组数据。图片由作者提供。
另一个需要说明的是,PySpark 中的聚合函数会改变输出中的变量名。例如,如果我们没有使用.alias()函数重命名输出列,输出名称将是mean('median_house_value')。
替换
要替换值,有 使用 Pandas 的不同方法来完成此操作。在接下来的代码中,我们使用了.assign和.where的组合。
# Replacing values in Pandas(dfp#dataset.assign(housing_median_age=dfp['housing_median_age'].where(dfp.housing_median_age>15,other="potential buy"))#assign replaced values to variable)https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/d4276f09d617bf916971497d6ce757ac.png
使用 Pandas 替换的值。图片由作者提供。
使用 Spark,一个简单的方法是通过重写列(或添加一个新列)来使用 Spark 的.when函数中的条件。下面的代码使用.when函数重写了_housing_median*age*列,将 15 岁以下的房屋替换为“潜在购买”,否则就重复当前值。
# Replace values in Spark(df#dataset.withColumn('housing_median_age',when(col('housing_median_age')<=15,'potential buy').otherwise(col('housing_median_age')))#new column.show()#display)https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/90ce8ad18b97988ced638833436675b1.png
使用 Spark 替换的值。图片由作者提供。
排列
最后,整理数据只是对其进行排序。在 Pandas 中,我们可以使用sort_values()方法来实现这一点。
# Arrange values in Pandas(dfp.sort_values('median_house_value').head(10))https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/75ca397047489f22c781e806f411d6da.png
按 Pandas 中的价格排序。图片由作者提供。
使用 Spark,唯一的不同是orderBy()函数替换了sort_values。如果我们想按降序排序,可以添加列指示函数和降序函数,例如col('col_name').desc()。
# Arrange values in Spark(df#dataset.orderBy('median_house_value')#order data.show()#display)# Arrange values in descending order(df.orderBy(col('median_house_value').desc()).show())https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/1e8abd044a37227f68cb05c9f98ec57f.png
按 Pandas 中的价格排序。图片由作者提供。
在离开之前
就到这里吧。
在这篇文章中,我们比较了 Pandas 和 PySpark 的主要处理函数,并对其语法差异进行了评论。
这对于刚开始学习 PySpark 并处理大数据的人来说很有用。记住,Spark 并不难。如果您已经使用 Python 进行工作,那么语法很容易掌握。
知道 SQL 代码和 Python 的人会在 PySpark 方法中看到很多 SQL 语言的影响。对于那些已经喜欢 Polars 的人来说,知识迁移将带来最大的好处,因为 Polars 和 PySpark 在语法上有很多相似之处。
学习更多
想了解更多关于 PySpark 的信息吗?
好吧,幸运的是,我有一个完整的在线课程在 Udemy 上,并且我正在应用一个优惠券代码,通过这个链接提供最佳价格 通过此链接。
联系方式
如果您喜欢这个内容,请关注我以获取更多信息。
古斯塔沃·桑托斯 – Medium
此外,让我们在 LinkedIn 上建立联系。
代码
您可以从这个练习的 GitHub 仓库中找到代码:
github.com/gurezende/Studying/blob/master/PySpark/PySpark_in_Colab.ipynb
参考文献
如何在 Pandas 中替换值
函数 – PySpark 3.5.3 文档
DataFrame – pandas 2.2.3 文档
PySpark 窗口函数
Pandas 查询:过滤数据的简单方法