Accelerating Data Processing in Spark SQL with Pandas UDFs

Created with glancer

engineer at Quantcast accelerating data processing some development tips, a few minutes talking how we use Spark SQL some of the modeling problems
and we're going to discuss order to make this data fast with Pandas UDFs, and very aggressively optimize
SQL intermediate rows. in memory as possible. try to aggregate keys. our specific data problem
very well with sparse data for this example problem, talk about Python libraries. now that you have written case, the factor of about three. feedback is very important.
review these sessions with our first section. is a user defined function working with structured data. a great way of writing
that isn't supported way to do all of this. types of Pandas UDFs. struck types in Spark SQL,
to sort or shuffle your data. return a variable number complicated return types. because anything you can write
this Pi Spark framework and tricks if possible, development environment,
while developing code. one thing I typically use command to do performance something more heavyweight, tool that's called pdp.
scale, and even more scale. of models that refresh depending on the type of model. of primarily first party data
there's about 400 terabytes rate in each day. clean and compress the data five terabytes a day on disc. typically try to train off neighborhood of several days
we have about 10,000 models because intuitively
model on everybody in the US, except San Francisco, to know how many unique IDs
this is because eventually models to behave in some way set of inclusion regions. examples, these are like the US each ID must be found considered part of a model. set of exclusion regions
in none of these regions satisfy the geo constraints, the US and moved to Canada valid user for a US model. prefer looking at tables,
feature store table. feature IDs correspond to,
feature ID one or 101, at the bottom left, have seen you in the US. you're in the US and not SF,
feature store table, for every model for every row, where you associated with. here for a little bit
looked at the example problem how to actually solve
go to the optimization question that we need to answer SQL and in fact, we can. to do everything we need. model pairs using a cross join.
inclusion, exclusion logic. to get all of the counts. writing this it's really simple. on my local cluster.
all of the model data. at least one inclusion hash, by modelId under count.
and it's not totally obvious terrible, so I'll go into it. about 25 rows per computer, point typically at Quantcast, thing is to look at the graph. have 100,000 input rows
generate about 700 times of advertising things in the US of us rows, in general,
has to be a better way. in order to solve this. we'll just use Pandas UDFs
in a nested for loop. data from a single ID as nested for loop, for loop and then returned.
the ID deduplication here. going to try to do, are some commonly used filters on the table to the right,
that we have a lot of models off of events in the US. sure are you in the US, instead is we're going to count
less than the 10,000 models. Pandas function to iterate of the unique models. really, really powerful. about 10 times faster the code in a nutshell,
we're going to write our UDF going to go over each filter the actual model IDs 10,000 unique models, with all of the filter ID, use a broadcast hash join.
to dedupe all of the keys of work but we can go better. trick we're going to use aggregate keys in batches.
have to group by IDs if we grouped by something less intermediate rows a user defined function
to get the final answer. also a really powerful idea. solution about three times faster those of you keeping score, faster than the naive solution. looks like in a nutshell, on the table on the right, instead of 100,000 rows, code is running too fast
size, that's pretty good. of an ID modular batch size, group by the hash your ID. functions .hash ID module 100.
that satisfy each filter. return a partial count is that we group by operation aggregating and counting, aggregating over partial counts
or may not apply as strongly such positive results,
about 500 unique filters. we had some way of iterating
to how sparse the data is. to use an inverted index
more detail on the next slide. really powerful for our data set. and a half times faster the code is fairly complex. I did for the Spark SQL.
there's this high level idea or precompute dictionaries your actual UDF run faster. this trick to create maps it's associated with.
inclusion ID filters included in the slide, feature you might care about,
all of the feature IDs, set of inclusion, filter IDs, those features belong to. inclusion filter IDs IDs where the comments
part is you must be included and zero exclusion filters. all inclusion filter IDs and take that difference to get the set of filter IDs.
but you could imagine makes this really, really fast logic I do at the end. the final outer for loop,
group all my data for each ID compute all of the filter IDs here for a little bit
Python code run a lot faster. cases using reduce in NumPy
into the code really fast, code it's very, very similar. just a couple of changes default Python libraries Pandas data frame .values
the top where the comments
exclusion filter IDs, and using a Python for loop itertools.chain.from_iterable make our example code run
speed up a Spark SQL. our example problems 1000 times speed up
watch things accelerate. stock for a little bit tricks we use at Quantcast. in this example problem.