Unity Catalog Apache Spark™ Integration¶
This page explains how to create Unity Catalog tables with Apache Spark™.
Apache Spark is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters.
Integrating Apache Spark with Unity Catalog offers significant advantages over traditional catalog solutions. Unity Catalog provides unified governance across both data and AI assets, fine-grained access control down to the column level, automated data lineage tracking, and seamless interoperability with various lakehouse formats and compute engines. It enables centralized metadata management, simplified data discovery, and enhanced security. The credential vending capability of Unity Catalog is particularly noteworthy as it allows Apache Spark to securely access data stored in Unity Catalog through a controlled mechanism.
- Neatly organizing data in tables and volumes in the Unity Catalog hierarchy makes it a lot easier to write Spark code.
- Make it easier to decouple business logic from file paths.
- Provides easy access to different file formats without end users needing to know how the data is stored.
Prerequisites
For Apache Spark and Delta Lake to work together with Unity Catalog, you will need atleast Apache Spark 3.5.3 and Delta Lake 3.2.1.
Download and Configure Unity Catalog for Apache Spark¶
The following steps are required to download and configure Unity Catalog for Apache Spark.
Download Apache Spark¶
Download the latest version of Apache Spark >= 3.5.3 or using the following command.
curl -O https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
tar xzf spark-3.5.3-bin-hadoop3.tgz
[Optional] Configure server properties for cloud storage¶
To have Unity Catalog work with cloud object storage as the storage location for tables, configure the etc/conf/server.properties
to add configuration:
## S3 Storage Config (Multiple configs can be added by incrementing the index)
s3.bucketPath.0=<S3_BUCKET>
s3.region.0=<S3_REGION>
s3.awsRoleArn.0=<S3_ROLE>
# Optional (If blank, it will use DefaultCredentialsProviderChain)
s3.accessKey.0=<SECRET>
s3.secretKey.0=<SECRET>
## ADLS Storage Config (Multiple configs can be added by incrementing the index)
adls.storageAccountName.0=<ADLS_STORAGEACCOUNTNAME>
adls.tenantId.0=<ADLS_TENANTID>
adls.clientId.0=<ADLS_CLIENTID>
adls.clientSecret.0=<SECRET>
## GCS Storage Config (Multiple configs can be added by incrementing the index)
gcs.bucketPath.0=<GCS_BUCKET>
# Optional (If blank, it will use Default Application chain to find credentials)
gcs.jsonKeyFilePath.0=/path/to/<SECRET>/gcp-key-uc-testing.json
[Optional] Restart Unity Catalog Server¶
If the UC Server is already started, please restart it to account for the cloud storage server properties.
cd unitycatalog/
bin/start-uc-server
Working with Unity Catalog Tables with Apache Spark and Delta Lake Locally¶
Let’s start running some Spark SQL queries in the Spark SQL shell (bin/spark-sql
) or PySpark shell (bin/pyspark
) within the terminal of your Apache Spark 3.5.3 folder against your local UC.
bin/spark-sql --name "local-uc-test" \
--master "local[*]" \
--packages "io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
--conf "spark.sql.catalog.unity.token=" \
--conf "spark.sql.defaultCatalog=unity"
bin/pyspark --name "local-uc-test" \
--master "local[*]" \
--packages "io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
--conf "spark.sql.catalog.unity.token=" \
--conf "spark.sql.defaultCatalog=unity"
Tip
Initially, this may take a few minutes to run to download the necessary dependencies. Afterwards, you can run some quick commands to see your UC assets within Spark SQL shell.
Notice the following packages (--packages
) and configurations (--conf
)
--packages
points to the delta-spark and unitycatalog-spark packages; update the version numbers to your current versions.spark.sql.catalog.unity.uri
points to your local development UC instancespark.sql.catalog.unity.token
is empty indicating there is no authentication; refer to auth for more information.spark.sql.defaultCatalog=unity
must be filled out to indicate the default catalog.
Three-part and two-part naming conventions
As noted in Unity Catalog 101, UC has a three-part naming convention of [catalog
].[schema
].[asset
]. In the following examples, you can use the three-part notation such as SELECT * FROM unity.default.marksheet;
or the two-part notation SELECT * FROM default.marksheet;
as the defaultCatalog
is already configured.
[Optional] Running Spark SQL for Cloud Object Stores¶
If you would like to run this against cloud object storage, the following versions of the bin/spark-sql
shell command.
bin/spark-sql --name "s3-uc-test" \
--master "local[*]" \
--packages "org.apache.hadoop:hadoop-aws:3.3.4,io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem" \
--conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
--conf "spark.sql.catalog.unity.token=" \
--conf "spark.sql.defaultCatalog=unity"
bin/spark-sql --name "azure-uc-test" \
--master "local[*]" \
--packages "org.apache.hadoop:hadoop-azure:3.3.6,io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
--conf "spark.sql.catalog.unity.token=" \
--conf "spark.sql.defaultCatalog=unity"
bin/spark-sql --name "gcs-uc-test" \
--master "local[*]" \
--jars "https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/3.0.2/gcs-connector-3.0.2-shaded.jar" \
--packages "io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" \
--conf "spark.hadoop.fs.AbstractFileSystem.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS" \
--conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
--conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
--conf "spark.sql.catalog.unity.token=" \
--conf "spark.sql.defaultCatalog=unity"
Using Spark SQL to query Unity Catalog schemas and tables¶
Let’s start by running some quick commands from the Spark SQL and pyspark shells.
The following SHOW SCHEMA
shows the default
schema that is included in the initial UC configuration.
-- Show schemas (output = default)
SHOW SCHEMAS;
-- Show tables
SHOW TABLES IN default;
# Show schemas (output = default)
sql("SHOW SCHEMAS").show()
# Show tables
sql("SHOW TABLES IN default").show()
with the output similar to:
+---------+-----------------+-----------+
|namespace| tableName|isTemporary|
+---------+-----------------+-----------+
| default| marksheet| false|
| default|marksheet_uniform| false|
| default| numbers| false|
| default| user_countries| false|
+---------+-----------------+-----------+
Let’s query the first five rows of the marksheet
table.
SELECT * FROM default.marksheet LIMIT 5;
sql("SELECT * FROM default.marksheet LIMIT5;").show()
With the output looking similar to the following.
+---+----------+-----+
| id| name|marks|
+---+----------+-----+
| 1|nWYHawtqUw| 930|
| 2|uvOzzthsLV| 166|
| 3|WIAehuXWkv| 170|
| 4|wYCSvnJKTo| 709|
| 5|VsslXsUIDZ| 993|
+---+----------+-----+
Running CRUD Operations on a Unity Catalog table¶
Let’s extend this example by executing various CRUD operations on our UC tables.
-- Create new schema
CREATE SCHEMA demo;
-- Should now show two schemas: default and demo
SHOW SCHEMAS;
-- Create a new table
CREATE TABLE
demo.mytable (id INT, desc STRING)
USING delta
LOCATION '<LOCATION>';
-- Example location:
-- LOCATION '/tmp/tables/mytable';
-- Insert new rows
INSERT INTO demo.mytable VALUES (1, "test 1");
INSERT INTO demo.mytable VALUES (2, "test 2");
INSERT INTO demo.mytable VALUES (3, "test 3");
INSERT INTO demo.mytable VALUES (4, "test 4");
-- Read table
SELECT * FROM demo.mytable;
-- Update row in table
UPDATE demo.mytable SET id = 5 WHERE id = 4;
-- Delete rows
DELETE FROM demo.mytable WHERE id = 5;
-- Create secondary table (we will use this as the source for merge)
CREATE TABLE
demo.srctable (id INT, desc STRING)
USING delta
LOCATION '<LOCATION>';
-- Example location:
-- LOCATION '/tmp/tables/srctable';
-- Insert new rows
INSERT INTO demo.srctable VALUES (3, "updated");
INSERT INTO demo.srctable VALUES (4, "inserted");
-- Merge
MERGE INTO demo.mytable as target
USING demo.srctable as source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
;
-- Check results
SELECT * FROM demo.mytable;
3 updated
4 inserted
1 test 1
2 test 2
-- Drop tables
DROP TABLE demo.srctable;
-- Check results
SHOW TABLES IN default;
Warning
Note, this action will only drop the table from UC, it will not remove the data from the file system