What is Pandas?
RecordBatches
BlockManager
Table
into a Pandas DataFrame
easily##__R__esilient __D__istributed __D__ataset
gateway = JavaGateway(
gateway_parameters=GatewayParameters(
port=gateway_port,
auth_token=gateway_secret,
auto_convert=True))
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
.
.
.
return gateway
RDD
and PipelinedRDD(RDD)
PipelinedRDD
PythonRDD
compute
compute
PythonRunner
Workers act as standalone processors of streams of data
…
DataFrame
PythonUDFRunner
PythonArrowRunner
PythonRunner
)Series
transformations we can speed up PySpark code from 3x to 100x!.groupBy
from pyspark.sql.functions import rand, randn, floor
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.range(20000000).toDF("row").drop("row") \
.withColumn("id", floor(rand()*10000)).withColumn("spent", (randn()+3)*100)
@pandas_udf("id long, spent double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
spent = pdf.spent
return pdf.assign(spent=spent - spent.mean())
df_to_pandas_arrow = df.groupby("id").apply(subtract_mean).toPandas()
from pyspark.sql.functions import rand, randn, floor
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.range(20000000).toDF("row").drop("row") \
.withColumn("id", floor(rand()*10000)).withColumn("spent", (randn()+3)*100)
@pandas_udf("id long, spent double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
spent = pdf.spent
return pdf.assign(spent=spent - spent.mean())
df_to_pandas_arrow = df.groupby("id").apply(subtract_mean).toPandas()
import numpy as np
from pyspark.sql.functions import collect_list
grouped = df2.groupby("id").agg(collect_list('spent').alias("spent_list"))
as_pandas = grouped.toPandas()
as_pandas["mean"] = as_pandas["spent_list"].apply(np.mean)
as_pandas["substracted"] = as_pandas["spent_list"].apply(np.array) - as_pandas["mean"]
df_to_pandas = as_pandas.drop(columns=["spent_list", "mean"]).explode("substracted")
import numpy as np
from pyspark.sql.functions import collect_list
grouped = df2.groupby("id").agg(collect_list('spent').alias("spent_list"))
as_pandas = grouped.toPandas()
as_pandas["mean"] = as_pandas["spent_list"].apply(np.mean)
as_pandas["substracted"] = as_pandas["spent_list"].apply(np.array) - as_pandas["mean"]
df_to_pandas = as_pandas.drop(columns=["spent_list", "mean"]).explode("substracted")
in pyspark
Get the slides from my github:
github.com/rberenguel/
The repository is
pyspark-arrow-pandas
Arrow’s home Arrow’s github Arrow speed benchmarks Arrow to Pandas conversion benchmarks Post: Streaming columnar data with Apache Arrow Post: Why Pandas users should be excited by Apache Arrow Code: Arrow-Pandas compatibility layer code Code: Arrow Table code PyArrow in-memory data model Ballista: a POC distributed compute platform (Rust) PyJava: POC on Java/Scala and Python data interchange with Arrow
Code: PySpark serializers JIRA: First steps to using Arrow (only in the PySpark driver) Post: Speeding up PySpark with Apache Arrow Original JIRA issue: Vectorized UDFs in Spark Initial doc draft Post by Bryan Cutler (leader for the Vec UDFs PR) Post: Introducing Pandas UDF for PySpark Code: org.apache.spark.sql.vectorized Post by Bryan Cutler: Spark toPandas() with Arrow, a Detailed Look
toPandas
2 to x | Direct (s) | With Arrow (s) | Factor |
---|---|---|---|
17 | 1,08 | 0,18 | 5,97 |
18 | 1,69 | 0,26 | 6,45 |
19 | 4,16 | 0,30 | 13,87 |
20 | 5,76 | 0,61 | 9,44 |
21 | 9,73 | 0,96 | 10,14 |
22 | 17,90 | 1,64 | 10,91 |
23 | (OOM) | 3,42 | |
24 | (OOM) | 11,40 |
EOF