Internals of Speeding up PySpark with Arrow - Ruben Berenguel (Consultant)

Created with glancer

a lot of people here are you're working with a spark right your hands yeah it's nice so I will start with what is pandas but if you are working with Python you probably already know this one then what is our role this may be Neil or maybe it's not Neil then a bit of how this part works I mean we are in sparks emits so probably you all know this but it's always good to remind how the pieces fit together
to remind how the pieces fit together because I when I explained how Pais part works because maybe you don't know that one it useful and then finally I'll cover how our row is making a pie spark way way faster so I'm Ruben a mathematician I work as a consultant from advertising company and I think writing Python for something like 15 years I mean I don't know an amp I didn't exist back then then I move to other languages but I still write a lot of Python day-to-day scale and Python
of Python day-to-day scale and Python now so I'm Pi spark code base it's kind of a sweet spot because I have a bit of Python every Java Scala it's a cool place to be so what is pandas I mean you may know it already it's a Python that analysis library if you see a job offer that says Python data pandas is involved in the code base somewhere and it's efficient wide efficient it because it's columnar when it has a CNS - back and so
columnar when it has a CNS - back and so the mechanism written is specifically in Python I mean the high-level API is but the rest is written in a very efficient implementation corner appears a lot in modern databases modern storage systems and what's the point of something being columnar same machine we have a table with a lot of columns and we want to sum one of the columns if you Resta curve you - you have a cpu and you have a row you have a cache line inside of the CPU
if you're thinking in rows and you want to some one column you need to go row by row loading the data and a little bit to the point where you can actually sum it when you can something is stored column wise you don't need to do it slowly you can just get everything from the column bringing it into the cash and some that was a bit first if you were in the previous table ie you kick that's really heartening to pronounce but he
really heartening to pronounce but he was doing the integration of spark are with ro and he also explained how CN the instructions make this fast so Panda is columnar how does it work internally in a high-level because I don't want to get into the details because I they are kind of complicated you have a data frame if you use pandas you know what it is it's basically a table with some names for the columns you have different types internally pan that is going to group
these types in memory by column index s of up to one place and then you bring together all the floats put them sideways in the fitting this way and bring them in memory same for Strings objects anything that is not an internal data type for pandas it's stored as an object block and integers there are several pandas data types that are
stored differently in memory altogether the blocks form a hundred by a block manager and when you request something being done to a problem you tell to the block manager give me this column and run this operation now we've done quickly to our Oh what is our always a library so you are unlikely to be using it unless you are writing your own library I mean you are just a client through pandas or through spark to the
other library it cause language it's columnar it in memory the key point is that it's across language so you can use the same internal structure in different languages and pass from one to the other and it's a bit optimized I mean the whole point of it it is written in C and there are implementations in Grasse go C Java obviously are there are not a lot of projects that are using it internally but pandas is one of
using it internally but pandas is one of using it internally but pandas is one of them them spark park were to use it is for something and Romero for what task which is a distributed computation framework kind of similar to spark but written entirely in Python gray which is basic something similar to dusk there is a POC of data processing written in rust using only a row there's a connector from Java to Scala with Python that is still experimental and it's using our role as
experimental and it's using our role as well an integration between our own pandas is seamless this by design I mean ro was created by the same developers that sorry that started pandas so the internal memory layouts are similar enough that the operations are really enough that the operations are really fast fast how does it look like in a row again you have a table this time I didn't bother
with the types and each set of rows is called a tracker patch I mean you can always call our track or throw a recording database link also throws records and you bring all the columns put them sideways and submit the data and consider that track or batch that's done for a certain number of rows and this can be done in a streaming basically the metadata says up to this point is this column up to this point is
point is this column up to this point is that other column and you can essentially skip the first if you only one the last one so you do can do it in streaming by just getting blocks of rows one to the other so at the high level you can think of a narrow table as set of record matches it's not exactly that that internal internal it's more complicated than this one from a high level this is good enough to understand how our looks like well our olaf's
if you think about the destruction so we're using record matches in one side so blocks of growth on the other side we have a block manager that its handling vertical blocks they don't look exactly the same but in the end you can think of them as the same you can convert another table into a pandas dataframe real easily there's a method called two pandas I mean if you think of it that way it's super easy but internally if you look at implementation it's like you
you look at implementation it's like you get something from one site and shop it around the idea is that if you squint like hard enough these are just vertical things that have similar colors that motors the idea you're can convert cracker batches to block managers and you can convert block managers to freaking batches now further what is a spark we're gonna spark some it's you know this I'll do this quicker than the
know this I'll do this quicker than the previous one which was looking off interest with a computation framework open source it's somewhat easy to use and scale horizontally and vertically how does it work you have a cluster manager that is going to give you computational resources you have some kind of distributed storage can be Hadoop in general HDFS in general your
another is going to request resources to the cluster manager to compute some tasks so you have tasks you record resources and then some executors appear in the resources that you have been given but this doesn't really tell us much about spark the important part that made spark spark is the RDD it's the ability to recompute things when they
failed that before in MapReduce wasn't as easy so here we have a happier as easy so here we have a happier leading leading and when you have an RDD you have a basically five properties or five if you check out the class at the class level you have five required but three required methods that you need to provide for an RDD to be not abstract basically one is partitions this is
where the data is or how do they make where the data is or how do they make sense this one is optional it's the preferred locations so when you have these data you have here this can be I don't know you have low data file or you have created something in memory so it gets partitioned by executor preferred location is when you have some kind of sort or some operation that it's closer to the load the loaded data or the process data and basically it says where
process data and basically it says where you can find it because sometimes you want to compute something on a group it's better if you do the computation in the machine that holds that group you need a computed a computing method trying to bring partitions to other partitions after you have computation for each partition the new result of the computation of compute is a new RDD that has a computation being applied to the previous video you have the dependencies
this is the key point of the RDD when one of the partition disappears because the machine that was holding it disappears because of the internet being large you can recompute only the partition because you can trace back through compute and the dependencies where this was coming from so eventually you can go back to the first original data source and compute just only that until you make it to the lost machine and dependencies it's basically RT these tracing back in time we are compute
tracing back in time we are compute until you go through some data source or data provider and fellow you have a partition er as part is going to do something sensible if you don't provide it I need to be a one-to-one method that can distribute the partitions across the different machines so go back to buy spark I'll refer to PI spark as the Python API on top the spark otherwise I will say just a spark or a
Scala spark a spark is a Python API for the core of a spark which is waiting in a scalos you already know how is it working inside on one side you are wearing Python Python does not have a JVM I mean - runs in the Python built on machine using a library known as a PI 4j which as the Jay can tell you it's
something to do with Java every time you create some object in PI spark these get associated an object in the JVM so when you start the Python driver in Python in PI spark this happens at some point I wanted to show the code because this super short thing it's basically telling to the JVM start spark and keep listening and that's it and I'm super super sure because this is doing a lot of work basically the
doing a lot of work basically the starting spark it's connecting back to your Python driver and it's keeping a reference in memory to that JVM so you can keep track of whatever you dream down there now we have a happy RDD in Python like like we've had our diseases classes in a Scala we have a class called RDD in Python you may have
inside of it you can look it up by creating your I mean you can open a bit oknot booklet an RTD and check this each other it is going to have a private in Python style underscore jRD this is a reference to the object in JVM so you can always trace back the oddity that you created in Python didn't really create it in Python you created dignity in the JVM and it had a reference to
and well these J's appear everywhere in a lot of private properties on methods in the Python API and they are standard for Java I mean I'm not a huge fan of jvm languages I like a Scala and I like Python and that's it a Python it's not in the IBM of course the connection between these two is through this gateway this could be object it's created at the beginning and if it's passed through every time you create a new RDD I mean it's the main connection that
I mean it's the main connection that gets you back from Python to whatever is in spark in past part they are two entry points to create this so you create an RDD that is it you just type blah blah blah did he paralyzed whatever and then you have pi blind RDD this one it's kind of internal so every time you want to map the partitions of an RDD in python a pipeline target is going to be created and pipeline that leads the one that is
actually doing the drawing the work so it's a bit like the this matreoshka animated gif it because uh you are creating things in python and they are inside other things that are inside other things inside of the JVM it gets a bit confusing in the end you have Python are deities in the scholar side and pipelined rdd's in the python side this is transparent for the PI spark user it is just right Rd didn't map F when it works but it's important to know what it
inside this material I think because why not get into the play into the picture it will explain why it's Astro so we have an angry RDD because he's a bit it in Python we want to apply a map F F is a method in Python whatever this create a pipeline that ID its internal the F
it's pulled into our internal property called funk and we get a pointer to the previous RTD and we have a J or diddly of course this J our DD it's where the magic is happening we have a Python or DD in the JVM these are passing a Scala obviously it has some dependencies because all our T's need to have a
starting point this dependency goes back to the our DD we had originally because we are going to map it with an F it in Python whatever it is still an RDD in scholars you need to have dependencies and we have a computer methods because I need to be there the computing method needs to do something with F even if it's in Python you need to compute it it doesn't matter it what the magic is happening in the end all the complication it's how do you compute something in the JVM that it's actually
in Python well you want to run something in the executors and that's going to be done by something called Python rather Python run is going to start a Volker it basically creates Python process through a connects to be a socket send through the socket a lot of the data it makes the worker load all the libraries that you need and I had
drawings for this one but otherwise it makes no sense we are in Scala this time Python Runner is inside of call it created by compute so as soon as you want to materialize the the computation this creates a Python worker in the executor and data goes through a socket so the F is being applied in the Python worker and the data is being sent back to the JVM as you can imagine there's a
lot of serialization going both ways and as you can imagine serializing is expensive always I mean basically you are sending pickles down and pickles up in Python and pickling and unpicking is a slow very slow so workers process the streams of data connects back to the JVM load all the libraries the libraries need to match the driver this utilized pickle things and applies the function at Cynthia pull back yeah
do it to back this was covering our disease but dying yes you are not using our do this directly anymore I'm not use data frames I use data frames because then I can optimize my well I don't need to optimize it I mean catalyst is going to optimize code for me I want I defined a D a G and the D a G get optimized so if you're using data frames it when you get the advantages of
using a spark after 2.0 basically a plan is generated so you write all the transformations you want eventually Duke right or your count or you basically create an action this plan will convert into a logical plan optimized logical plan physical plan when you were to the physical plan something get executed and all the compute methods start running because underlying data frames that are still are get is the planner optimize my catalyst which is basically something that removes branches from trees if
possible so it when to it gets this logical plan and when you have a filter at the end it has to push it down as much as possible because the less data you move around the less data you're computing now depending on what you did in Python catalyst is going to choose either the Python UTF runner or the Python ro runner that's the good one the one that uses our row the other one it's the old version of what is using all
these pickling things so you have UDF Runner serialize be serialize and send back this is basically the same as the Python runner before and the only thing is that this is the data frame implementation that the first two they are good implementation but when you're using the other one I mean we just change the names we are sending record change the names we are sending record matches matches so what internally the in the worker we're converting onto pandas remember this is super fast because our own
pandas look the same internally we play f2 the call it is fast because it's columnar so everything it's kind of optimized for speed and we send a letter back that's again kind of cheap because arrow has a serializing format so all this movement and the computation in the Python side are faster already and it's like well according to the kind of official /
unofficial benchmarks you can get from 3x to 100x optimization just by changing a setting which is enabling arrow and using UV f defined as pandas either scholar or group maps or group what there are forth and you can use basically take the advantage of what you can do in pandas by converting to a rule I want to show a couple of examples so you see what you can do and what speed
ups you get basic example is just two pandas basically if you do have two pandas in general I mean what I'm doing here is just creating a range a dataset with a power of two amount of later so here it's a two to the 20 with a random color column and just converting it to pandas converting to pandas brings all the data to the driver so it's always a kind of a bad idea but sometimes you
kind of a bad idea but sometimes you need to do something in pandas I need to do in the driver and that's what you get I'm just doing this this is with arrow disabled and I want to compare how much faster is when I just change that to enabled the only thing you need to do if you want to take advantage of ro aside from having a spark at least two well I time lists for different powers
of of - it was in my local machine with local runners easiest benchmark possible and this gets to 18 seconds I couldn't do 23 or 24 because I the driver was dying and then I did the reversion darrel version is consistently 10 times faster just in the local thing super easy in just changing one setting so if you ever do to pandas just do this you get a awesome
get a awesome speed up and it works for larger datasets because of the compression of how are you handle things now a little bit of a more complicated one which is a group map you may know that doing group bias is always well it's not always about idea if you're using the scale API group I had a lot of improvement in 2.2 2.4 when the aggregations are critic are
computed they are computed incrementalist so you never store the whole group in memory one problem that these still has is that it needs to hold the whole group in memory so it's a bit problematic but what anyway what we want to do here is that I have created kind of some fake transactions with some spent amounts and I want to I don't know do some photo analysis I compare each transaction with a mean for the same
user because maybe if you are spending a lot more than the mean it's like hey you've been hacked and you are doing something you shouldn't do this is very easy with the new way you can write these pandas aggregations with are enabled they find a planner UDF it's a group map because for each group you want to map and the map is basically a pandas method that it's assigning something so in this case it's just the
mean what's this doing so here we have some executors with a driver and we are grouping and aggregating we have some lettuce had lettuce ended it in the driver because it was created in the driver it is distributed because when you do a group by operation you need to problem everything is grouped in memories so each of the groups need to
fit in the executors there's kind of ongoing work or there should be ongoing work soon about fixing this so it's incremental and not needing to usual the memory the thing is the design in the workers these are rows in DFS are they are row are parallel rows so it being done by a row it's computed in pandas and it's sent back and sent in the example the last thing was a two pandas to the driver did you send to the rival this is already computed it's on per group
already computed it's on per group you're finished so I wrote a similar version without using a pandas EDF I wrote this it's kind of bad it could be done a bit better but if you the problem is that if you want to do it well it gets really really messy so this is just group everything correct with a collect list so you have for each group or so
list so you have for each group or so for each user all the transactions for that user and then you convert everything to pandas pass-through numpy and get the mean the problem is that the group dot two pandas it's applied to the groups that are coming with a collect list and collect list if it works it is lo one when it's too large it blows executors well yeah in this case we are
still grouping so we are at least in the data as before we are sending the computation the problem is that right now we are doing the group in each machine as before this time it incremental so you don't need to fit everything in memory it's fine but when you get here you are sending all the data to the driver and still a lot of work to be done in the driver because you just have grouped you still need to
do the mean for each group so this needs to be sent to panders to the worker and is actually blows up I've it a quick benchmark of this one material in graphene because it's kind of a slow let me go back because it was in the so this implementation for two to the twenty five fails it can do it and for twenty million records which is what's written here it takes around
what's written here it takes around three minutes the producing the the other implementation with pandas was taking a couple of minutes for the two ten twenty two two two two two twenty five and just took one minute for twenty millions it's not a huge speed up it's like a three times x it's a bad code wise it's much better and actually works because you can do to to to to to to the twenty five which you couldn't do in
this case so the too long didn't read or too long didn't watch is that you just need to use our oh right pandas UDF's usually the problem is that you need to write your UDF's pandas UDS that may not be straightforward I mean that may be a problem but if you are doing two pandas just a navel down rule I mean that's a team win you can get ten paint improvements just for that part it
depends on your use case I mean if you are using PI spark for long enough you can use you can kind of figure out how to do this otherwise I mean there are other solutions but I think using PI's part it's getting better and better really really fast so I mean when I started with a spark that was like three and something years ago I used this color because back then it was like Python it to its low even if I was a seasoned Python developer if I had to
seasoned Python developer if I had to choose now I would use spice part because why not it's almost as fast as a scholar well that some reason you can find some resources in the presentation I recommend that you become a contributor to spark if you are not one already I mean you learn a lot of the internals and what's nice what's not I think it's a it's a good way to to be part of the community and I think it's
time for questions thank you so much for this session I had a question so when using pandas UDF it would use arrow for its optimization for the serialization and deserialization of data but in the actual execution plan of a spark job is there any way to validate that it is
actually using apache arrow because when you for example do two pandas or create a part data frame from a panda's data frame it actually states in the execution plan that it's using arrow evil python but not while in the pandas UDF that I've seen I don't remember for this color one but for the group map that flood map group in pandas that's the physical execution note okay and the
flat map group it basically suggests that it is using Apache arrow I think so maybe it has all robots all in the name because because when I turned the basically configuration off for the arrow execution enabled it still uses the same in the execution plan the flat map group yeah that that's a good question I didn't check that I didn't try to check the plan I just check the plan when I was using a robot either and check what it's doing I mean yeah this
check what it's doing I mean yeah this was something that I was working on then yeah yeah I don't know if there's a way to confirm that if you are using it from the plan only okay there should but maybe there's not sure all right okay let's let's check later maybe we can open a pull request or something yeah hi spark and sparked airframes and not pandas or maybe the working class is there any advantage to era or is arrow only if you trying to put anything into
only if you trying to put anything into pandas koalas is going to use our row if you enable it and it may be enabling it by default I don't know if it's by default but internally the colors code has the art row settings for the tests at least so it went to be an advantage here because it's doing two pandas at some point any other questions I think for the walkthrough to the basic of ice
for the walkthrough to the basic of ice part was very useful one of the things we found using panda UDF's even enabling arrows is the problem supporting complex data types as you move between Scotland Python even more I know the current workaround is put everything with JSON serializes as fast as possible do you have any updates or any clue other is moving in the error I've seen some cheetah tickets for a spark 3 that maybe are working around that but I haven't checked in a few
that but I haven't checked in a few months so I'm not sure but yeah I mean data types I mean our data type has had a lot of JIRA tickets on its own so yeah that's a problematic one definitive so you're using not pandas what improvement you can get there I mean if you are not using the PI part of a spark it shouldn't affect you so much because it never get into Python so it's just in