a lot of people here are you're working
with a spark
right your hands yeah it's nice so I
will start with what is pandas but if
you are working with Python you probably
already know this one then what is our
role this may be Neil or maybe it's not
Neil then a bit of how this part works I
mean we are in sparks emits so probably
you all know this but it's always good
to remind how the pieces fit together
to remind how the pieces fit together
because I when I explained how Pais part
works because maybe you don't know that
one it useful and then finally I'll
cover how our row is making a pie spark
way way faster so I'm Ruben a
mathematician I work as a consultant
from advertising company and I think
writing Python for something like 15
years I mean I don't know an amp I
didn't exist back then then I move to
other languages but I still write a lot
of Python day-to-day scale and Python
of Python day-to-day scale and Python
now so I'm Pi spark code base it's kind
of a sweet spot because I have a bit of
Python every Java Scala it's a cool
place to be so what is pandas I mean you
may know it already it's a Python that
analysis library if you see a job offer
that says Python data pandas is involved
in the code base somewhere and it's
efficient wide efficient it because it's
columnar when it has a CNS - back and so
columnar when it has a CNS - back and so
the mechanism written is specifically in
Python I mean the high-level API is but
the rest is written in a very efficient
implementation corner appears a lot in
modern databases modern storage systems
and what's the point of something being
columnar same machine we have a table
with a lot of columns and we want to sum
one of the columns if you Resta curve
you - you have a cpu and you have a row
you have a cache line inside of the CPU
if you're thinking in rows and you want
to some one column you need to go row by
row loading the data and a little bit to
the point where you can actually sum it
when you can something is stored column
wise you don't need to do it slowly you
can just get everything from
the column bringing it into the cash and
some that was a bit first if you were in
the previous table ie you kick that's
really heartening to pronounce but he
really heartening to pronounce but he
was doing the integration of spark are
with ro and he also explained how CN the
instructions make this fast so Panda is
columnar how does it work internally in
a high-level because I don't want to get
into the details because I they are kind
of complicated you have a data frame if
you use pandas you know what it is it's
basically a table with some names for
the columns you have different types
internally pan that is going to group
these types in memory by column index s
of up to one place and then you bring
together all the floats put them
sideways in the fitting this way and
bring them in memory same for Strings
objects anything that is not an internal
data type for pandas it's stored as an
object block and integers there are
several pandas data types that are
stored differently in memory altogether
the blocks form a hundred by a block
manager and when you request something
being done to a problem you tell to the
block manager give me this column and
run this operation now we've done
quickly to our Oh what is our always a
library so you are unlikely to be using
it unless you are writing your own
library I mean you are just a client
through pandas or through spark to the
other library it cause language it's
columnar it in memory the key point is
that it's across language so you can use
the same internal structure in different
languages and pass from one to the other
and it's a bit optimized I mean the
whole point of it it is written in C and
there are implementations in Grasse go C
Java obviously are
there are not a lot of projects that are
using it internally but pandas is one of
using it internally but pandas is one of
using it internally but pandas is one of
them
them
spark park were to use it is for
something and Romero for what task which
is a distributed computation framework
kind of similar to spark but written
entirely in Python gray which is basic
something similar to dusk there is a POC
of data processing written in rust using
only a row there's a connector from Java
to Scala with Python that is still
experimental and it's using our role as
experimental and it's using our role as
well an integration between our own
pandas is seamless this by design I mean
ro was created by the same developers
that sorry that started pandas so the
internal memory layouts are similar
enough that the operations are really
enough that the operations are really
fast
fast
how does it look like in a row again you
have a table this time I didn't bother
with the types and each set of rows is
called a tracker patch I mean you can
always call our track or throw a
recording database link also throws
records and you bring all the columns
put them sideways and submit the data
and consider that track or batch that's
done for a certain number of rows and
this can be done in a streaming
basically the metadata says up to this
point is this column up to this point is
point is this column up to this point is
that other column and you can
essentially skip the first if you only
one the last one so you do can do it in
streaming by just getting blocks of rows
one to the other so at the high level
you can think of a narrow table as set
of record matches it's not exactly that
that internal internal it's more
complicated than this one from a high
level this is good enough to understand
how our looks like well our olaf's
if you think about the destruction so
we're using record matches in one side
so blocks of growth on the other side we
have a block manager that its handling
vertical blocks they don't look exactly
the same but in the end you can think of
them as the same you can convert another
table into a pandas dataframe real
easily there's a method called two
pandas I mean if you think of it that
way it's super easy but internally if
you look at implementation it's like you
you look at implementation it's like you
get something from one site and shop it
around the idea is that if you squint
like hard enough these are just vertical
things that have similar colors that
motors the idea you're can convert
cracker batches to block managers and
you can convert block managers to
freaking batches now further what is a
spark we're gonna spark some it's you
know this I'll do this quicker than the
know this I'll do this quicker than the
previous one which was looking off
interest with a computation framework
open source it's somewhat easy to use
and scale horizontally and vertically
how does it work you have a cluster
manager that is going to give you
computational resources you have some
kind of distributed storage can be
Hadoop in general HDFS in general your
another is going to request resources to
the cluster manager to compute some
tasks so you have tasks you record
resources and then some executors appear
in the resources that you have been
given but this doesn't really tell us
much about spark the important part that
made spark spark is the RDD it's the
ability to recompute things when they
failed that before in MapReduce wasn't
as easy so here we have a happier
as easy so here we have a happier
leading
leading
and when you have an RDD you have a
basically five properties or five if you
check out the class at the class level
you have five required but three
required methods that you need to
provide for an RDD to be not abstract
basically one is partitions this is
where the data is or how do they make
where the data is or how do they make
sense
this one is optional it's the preferred
locations so when you have these data
you have here this can be I don't know
you have low data file or you have
created something in memory so it gets
partitioned by executor preferred
location is when you have some kind of
sort or some operation that it's closer
to the load the loaded data or the
process data and basically it says where
process data and basically it says where
you can find it because sometimes you
want to compute something on a group
it's better if you do the computation in
the machine that holds that group you
need a computed a computing method
trying to bring partitions to other
partitions after you have computation
for each partition the new result of the
computation of compute is a new RDD that
has a computation being applied to the
previous video you have the dependencies
this is the key point of the RDD when
one of the partition disappears because
the machine that was holding it
disappears because of the internet being
large you can recompute only the
partition because you can trace back
through compute and the dependencies
where this was coming from so eventually
you can go back to the first original
data source and compute just only that
until you make it to the lost machine
and dependencies it's basically RT these
tracing back in time we are compute
tracing back in time we are compute
until you go through some data source or
data provider and fellow you have a
partition er as part is going to do
something sensible if you don't provide
it I need to be a one-to-one method that
can distribute the partitions across the
different machines so
go back to buy spark I'll refer to PI
spark as the Python API on top the spark
otherwise I will say just a spark or a
Scala spark a spark is a Python API for
the core of a spark which is waiting in
a scalos you already know how is it
working inside on one side you are
wearing Python Python does not have a
JVM I mean - runs in the Python built on
machine using a library known as a PI 4j
which as the Jay can tell you it's
something to do with Java every time you
create some object in PI spark these get
associated an object in the JVM so when
you start the Python driver in Python in
PI spark this happens at some point I
wanted to show the code because this
super short thing it's basically telling
to the JVM start spark and keep
listening and that's it
and I'm super super sure because this is
doing a lot of work basically the
doing a lot of work basically the
starting spark it's connecting back to
your Python driver and it's keeping a
reference in memory to that JVM so you
can keep track of whatever you dream
down there now we have a happy RDD in
Python like like we've had our diseases
classes in a Scala we have a class
called RDD in Python you may have
inside of it you can look it up by
creating your I mean you can open a bit
oknot booklet an RTD and check this each
other it is going to have a private in
Python style underscore jRD this is a
reference to the object in JVM so you
can always trace back the oddity that
you created in Python didn't really
create it in Python you created dignity
in the JVM and it had a reference to
and well these J's appear everywhere in
a lot of private properties on methods
in the Python API and they are standard
for Java I mean I'm not a huge fan of
jvm languages I like a Scala and I like
Python and that's it a Python it's not
in the IBM of course the connection
between these two is through this
gateway this could be object it's
created at the beginning and if it's
passed through every time you create a
new RDD
I mean it's the main connection that
I mean it's the main connection that
gets you back from Python to whatever is
in spark in past part they are two entry
points to create this so you create an
RDD that is it you just type blah blah
blah did he paralyzed whatever and then
you have pi blind RDD this one it's kind
of internal so every time you want to
map the partitions of an RDD in python a
pipeline target is going to be created
and pipeline that leads the one that is
actually doing the drawing the work so
it's a bit like the this matreoshka
animated gif it because uh you are
creating things in python and they are
inside other things that are inside
other things inside of the JVM it gets a
bit confusing in the end you have Python
are deities in the scholar side and
pipelined rdd's in the python side this
is transparent for the PI spark user it
is just right Rd didn't map F when it
works but it's important to know what it
inside this material I think because why
not get into the play into the picture
it will explain why it's Astro so we
have an angry RDD because he's a bit it
in Python we want to apply a map F F is
a method in Python whatever this create
a pipeline that ID its internal the F
it's pulled into our internal property
called funk
and we get a pointer to the previous RTD
and we have a J or diddly of course this
J our DD it's where the magic is
happening we have a Python or DD in the
JVM these are passing a Scala
obviously it has some dependencies
because all our T's need to have a
starting point this dependency goes back
to the our DD we had originally because
we are going to map it with an F it in
Python whatever it is still an RDD in
scholars you need to have dependencies
and we have a computer methods because I
need to be there the computing method
needs to do something with F even if
it's in Python you need to compute it it
doesn't matter it what the magic is
happening in the end all the
complication it's how do you compute
something in the JVM that it's actually
in Python well you want to run something
in the executors and that's going to be
done by something called Python rather
Python run is going to start a Volker
it basically creates Python process
through a connects to be a socket send
through the socket a lot of the data
it makes the worker load all the
libraries that you need and I had
drawings for this one but otherwise it
makes no sense we are in Scala this time
Python Runner is inside of call it
created by compute so as soon as you
want to materialize the the computation
this creates a Python worker in the
executor and data goes through a socket
so the F is being applied in the Python
worker and the data is being sent back
to the JVM as you can imagine there's a
lot of serialization going both ways and
as you can imagine serializing is
expensive always I mean basically you
are sending pickles down and pickles up
in Python and pickling and unpicking is
a slow very slow so workers process the
streams of data connects back to the JVM
load all the libraries the libraries
need to match the driver
this utilized pickle things and applies
the function at Cynthia pull back yeah
do it to back this was covering our
disease but dying yes you are not using
our do this directly anymore
I'm not use data frames I use data
frames because then I can optimize my
well I don't need to optimize it I mean
catalyst is going to optimize code for
me I want I defined a D a G and the D a
G get optimized so if you're using data
frames it when you get the advantages of
using a spark after 2.0 basically a plan
is generated so you write all the
transformations you want eventually Duke
right or your count or you basically
create an action this plan will convert
into a logical plan optimized logical
plan physical plan when you were to the
physical plan something get executed and
all the compute methods start running
because underlying data frames that are
still are get is the planner optimize my
catalyst which is basically something
that removes branches from trees if
possible so it when to it gets this
logical plan and when you have a filter
at the end it has to push it down as
much as possible because the less data
you move around the less data you're
computing now depending on what you did
in Python catalyst is going to choose
either the Python UTF runner or the
Python ro runner that's the good one the
one that uses our row the other one it's
the old version of what is using all
these pickling things so you have UDF
Runner serialize be serialize and send
back this is basically the same as the
Python runner before and the only thing
is that this is the data frame
implementation that the first two they
are good implementation but when you're
using the other one I mean we just
change the names we are sending record
change the names we are sending record
matches
matches
so what internally the in the worker
we're converting onto pandas remember
this is super fast because our own
pandas look the same internally we play
f2 the call
it is fast because it's columnar so
everything it's kind of optimized for
speed and we send a letter back that's
again kind of cheap because arrow has a
serializing format so all this movement
and the computation in the Python side
are faster already and it's like well
according to the kind of official /
unofficial benchmarks you can get from
3x to 100x optimization just by changing
a setting which is enabling arrow and
using UV f defined as pandas either
scholar or group maps or group what
there are forth and you can use
basically take the advantage of what you
can do in pandas by converting to a rule
I want to show a couple of examples so
you see what you can do and what speed
ups you get basic example is just two
pandas basically if you do have two
pandas in general I mean what I'm doing
here is just creating a range a dataset
with a power of two amount of later so
here it's a two to the 20 with a random
color column and just converting it to
pandas converting to pandas brings all
the data to the driver so it's always a
kind of a bad idea but sometimes you
kind of a bad idea but sometimes you
need to do something in pandas I need to
do in the driver and that's what you get
I'm just doing this this is with arrow
disabled and I want to compare how much
faster is when I just change that to
enabled the only thing you need to do if
you want to take advantage of ro
aside from having a spark at least two
well I time lists for different powers
of of - it was in my local machine with
local runners easiest benchmark possible
and this gets to 18 seconds I couldn't
do 23 or 24 because I the driver was
dying and then I did the reversion
darrel version is consistently 10 times
faster just in the local thing super
easy in just changing one setting so if
you ever do to pandas just do this you
get a awesome
get a awesome
speed up and it works for larger
datasets because of the compression of
how are you handle things now a little
bit of a more complicated one which is a
group map you may know that doing group
bias is always well it's not always
about idea if you're using the scale API
group I had a lot of improvement in 2.2
2.4 when the aggregations are critic are
computed they are computed
incrementalist so you never store the
whole group in memory one problem that
these still has is that it needs to hold
the whole group in memory so it's a bit
problematic but what anyway what we want
to do here is that I have created kind
of some fake transactions with some
spent amounts and I want to I don't know
do some photo analysis I compare each
transaction with a mean for the same
user because maybe if you are spending a
lot more than the mean it's like hey
you've been hacked and you are doing
something you shouldn't do this is very
easy with the new way you can write
these pandas aggregations with are
enabled they find a planner UDF it's a
group map because for each group you
want to map and the map is basically a
pandas method that it's assigning
something so in this case it's just the
mean what's this doing so here we have
some executors with a driver and we are
grouping and aggregating we have some
lettuce had lettuce ended it in the
driver because it was created in the
driver it is distributed because when
you do a group by operation you need to
problem everything is grouped in
memories so each of the groups need to
fit in the executors there's kind of
ongoing work or there should be ongoing
work soon about fixing this
so it's incremental and not needing to
usual the memory the thing is the design
in the workers
these are rows in DFS are they are row
are parallel rows so it being done by a
row it's computed in pandas and it's
sent back and sent in the example the
last thing was a two pandas to the
driver did you send to the rival this is
already computed it's on per group
already computed it's on per group
you're finished so I wrote a similar
version without using a pandas EDF I
wrote this it's kind of bad it could be
done a bit better but if you the problem
is that if you want to do it well it
gets really really messy so this is just
group everything correct with a collect
list so you have for each group or so
list so you have for each group or so
for each user all the transactions for
that user and then you convert
everything to pandas pass-through numpy
and get the mean the problem is that the
group dot two pandas it's applied to the
groups that are coming with a collect
list and collect list if it works it is
lo one when it's too large it blows
executors well yeah in this case we are
still grouping so we are at least in the
data as before we are sending the
computation the problem is that right
now we are doing the group in each
machine as before this time it
incremental so you don't need to fit
everything in memory it's fine but when
you get here you are sending all the
data to the driver and still a lot of
work to be done in the driver because
you just have grouped you still need to
do the mean for each group so this needs
to be sent to panders to the worker and
is actually
blows up I've it a quick benchmark of
this one material in graphene because
it's kind of a slow let me go back
because it was in the so this
implementation for two to the twenty
five fails it can do it
and for twenty million records which is
what's written here it takes around
what's written here it takes around
three minutes the producing the the
other implementation with pandas was
taking a couple of minutes for the two
ten twenty two two two two two twenty
five and just took one minute for twenty
millions it's not a huge speed up it's
like a three times x it's a bad code
wise it's much better and actually works
because you can do to to to to to to the
twenty five which you couldn't do in
this case so the too long didn't read or
too long didn't watch is that you just
need to use our oh right pandas UDF's
usually the problem is that you need to
write your UDF's pandas UDS that may not
be straightforward I mean that may be a
problem but if you are doing two pandas
just a navel down rule I mean that's a
team win you can get ten paint
improvements just for that part it
depends on your use case I mean if you
are using PI spark for long enough you
can use you can kind of figure out how
to do this otherwise I mean there are
other solutions but I think using PI's
part it's getting better and better
really really fast so I mean when I
started with a spark that was like three
and something years ago I used this
color because back then it was like
Python it to its low even if I was a
seasoned Python developer if I had to
seasoned Python developer if I had to
choose now I would use spice part
because why not it's almost as fast as a
scholar well that some reason you can
find some resources in the presentation
I recommend that you become a
contributor to spark if you are not one
already I mean you learn a lot of the
internals and what's nice what's not
I think it's a it's a good way to to be
part of the community and I think it's
time for questions thank you so much for
this session I had a question so when
using pandas UDF it would use arrow for
its optimization for the serialization
and deserialization of data but in the
actual execution plan of a spark job is
there any way to validate that it is
actually using apache arrow because when
you for example do two pandas or create
a part data frame from a panda's data
frame it actually states in the
execution plan that it's using arrow
evil python but not while in the pandas
UDF that I've seen I don't remember for
this color one but for the group map
that flood map group in pandas that's
the physical execution note okay and the
flat map group it basically suggests
that it is using Apache arrow I think so
maybe it has all robots all in the name
because because when I turned the
basically configuration off for the
arrow execution enabled it still uses
the same in the execution plan the flat
map group yeah that that's a good
question I didn't check that I didn't
try to check the plan I just check the
plan when I was using a robot either and
check what it's doing I mean yeah this
check what it's doing I mean yeah this
was something that I was working on then
yeah yeah I don't know if there's a way
to confirm that if you are using it from
the plan only okay there should but
maybe there's not sure all right okay
let's let's check later maybe we can
open a pull request or something yeah
hi spark and sparked airframes and not
pandas or maybe the working class is
there any advantage to era or is arrow
only if you trying to put anything into
only if you trying to put anything into
pandas koalas is going to use our row if
you enable it and it may be enabling it
by default I don't know if it's by
default but internally the colors code
has the art row settings for the tests
at least so it went to be an advantage
here because it's doing two pandas at
some point any other questions I think
for the walkthrough to the basic of ice
for the walkthrough to the basic of ice
part was very useful one of the things
we found using panda UDF's even enabling
arrows is the problem supporting complex
data types as you move between Scotland
Python even more I know the current
workaround is put everything with JSON
serializes as fast as possible do you
have any updates or any clue other is
moving in the error
I've seen some cheetah tickets for a
spark 3 that maybe are working around
that but I haven't checked in a few
that but I haven't checked in a few
months so I'm not sure but yeah I mean
data types I mean our data type has had
a lot of JIRA tickets on its own so yeah
that's a problematic one definitive so
you're using not pandas what improvement
you can get there I mean if you are not
using the PI part of a spark it
shouldn't affect you so much because it
never get into Python so it's just in