Running SparkSQL on Databricks via Airflow's JDBC operator
4 minutes read | 682 words by Ruben BerenguelThe one where Airflow messes with you.
This will be a short one. I want to move some SQL from AWS Redshift to Databricks for performance and cost reasons. Currently, all the SQL is running in a pretty dense Airflow DAG (Directed Acyclic Graph), and my cunning plan was:
- Swap the current uses of
PostgresOperator
byJdbcOperator
, by connecting to a Databricks cluster via JDBC - Enjoy the savings in execution time and cost.
The best part of this project is that the cluster does not need to be active, just be valid: Databricks has autostart and autoterminate for their clusters (at least in AWS and Azure). This means you don’t need to take care of starting or closing the cluster, jus
Airflow has this tendency to ruin the best plans by just its own presence in the stack.
The first step is getting the Databricks JDBC driver from here. In the same page you can find all the details to be able to connect from BI tools, although I guess nobody had tried to use Airflow for this before.
Then, make sure your Airflow installation has the jdbc
optional (this is for 1.10.X
, for 2.0
the contributed operators change locations) and make sure you install OpenJDK
, since the JDBC driver requires Java (the first J should be a hint).
Then, create a very small DAG:
from airflow.operators.jdbc_operator import JdbcOperator
from airflow.models import DAG
from datetime import datetim
default_args = {
"start_date": datetime(2020, 10, 5),
"email_on_failure": False,
"retries": 0,
}
dag = DAG(
"databricks_test",
catchup=False,
max_active_runs=1,
concurrency=1,
description="Run some SQL on Databricks via JDBC",
default_args=default_args
)
sql = """
select count(distinct a_heavy_one) from somewhere
join something_very_expensive
"""
foo = JdbcOperator(sql=sql,
autocommit=False,
dbc_conn_id="databricks",
dag=dag,
task_id="databricks_task")
In the Connections
menu (under Admin
) you can configure your JDBC
connection according to the Databricks documentation. It is easy as long as you realise token
is the word token
and not the token, and you add the correct class name, com.simba.spark.jdbc.Driver
. Your connection string should look like this (I have added new lines for readability):
jdbc:spark://some_code.cloud.databricks.com:443/default; ↵
transportMode=http;ssl=1; ↵
httpPath=sql/protocolv1/o/0/some_path; ↵
AuthMech=3;UID=token;PWD=your_api_token_or_password
Then, you will trigger the DAG and just get:
SQLFeatureNotSupportedException
And not much more. After 5 minutes of wondering what the issue was (and some mild Googling), I realised:
- The stracktrace from Airflow was actually telling me where but I was not paying attention,
- It was related to this issue in another JDBC driver, but this time, Hive.
The issue is with the AutoCommit
feature in databases. Databricks (and in general I think Hive and some others) don’t support autocommit. But of course Airflow doesn’t care:
- JdbcOperator uses JdbcHook (which extends the hook to connect to databases) to execute SQL,
- JdbcHook says all JDBC based databases support autocommit.
You may be wondering how the Hive operator works. More often than not you connect to a Hive cluster via JDBC and run queries there, right? In the case of Airflow, it uses the hive
CLI or the beeline
CLI, but defers the process to them, so it’s not a JDBC connection. Go figure why.
There are two ways to address this issue:
- Patch the hook,
- Write our own operator.
Since the Airflow execution model is pretty weird (it loads the Python code from the DAG bag and interprets it internally), patching could actually not even work, so writing your own operator is probably the recommended approach. You will need a way of deploying Airflow packages to your Airflow installation though.
from airflow.hooks.jdbc_hook import JdbcHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class JdbcDatabricksHook(JdbcHook):
supports_autocommit = False
class JdbcDatabricksOperator(BaseOperator):
"""Just override where we use the hook"""
def execute(self, context):
self.log.info('Executing: %s', self.sql)
self.hook = JdbcDatabricksHook(jdbc_conn_id=self.jdbc_conn_id)
self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
And now, the query worked and took 1.1 hours. Which also proved long-running queries don’t hit timeouts or connection drops (this was also part of what I wanted to check).
As you have seen, if you use Airflow you eventually get to know its internals. Probably too much.