Skip to content

Unity Catalog Apache Spark™ Integration

This page explains how to create Unity Catalog tables with Apache Spark™.

Apache Spark is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters.

Integrating Apache Spark with Unity Catalog offers significant advantages over traditional catalog solutions. Unity Catalog provides unified governance across both data and AI assets, fine-grained access control down to the column level, automated data lineage tracking, and seamless interoperability with various lakehouse formats and compute engines. It enables centralized metadata management, simplified data discovery, and enhanced security. The credential vending capability of Unity Catalog is particularly noteworthy as it allows Apache Spark to securely access data stored in Unity Catalog through a controlled mechanism.

  • Neatly organizing data in tables and volumes in the Unity Catalog hierarchy makes it a lot easier to write Spark code.
  • Make it easier to decouple business logic from file paths.
  • Provides easy access to different file formats without end users needing to know how the data is stored.

Prerequisites

For Apache Spark and Delta Lake to work together with Unity Catalog, you will need atleast Apache Spark 3.5.3 and Delta Lake 3.2.1.

Download and Configure Unity Catalog for Apache Spark

The following steps are required to download and configure Unity Catalog for Apache Spark.

Download Apache Spark

Download the latest version of Apache Spark >= 3.5.3 or using the following command.

Download Apache Spark 3.5.3 using curl
curl -O https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
tar xzf spark-3.5.3-bin-hadoop3.tgz

[Optional] Configure server properties for cloud storage

To have Unity Catalog work with cloud object storage as the storage location for tables, configure the etc/conf/server.properties to add configuration:

## S3 Storage Config (Multiple configs can be added by incrementing the index)
s3.bucketPath.0=<S3_BUCKET>
s3.region.0=<S3_REGION>
s3.awsRoleArn.0=<S3_ROLE>
# Optional (If blank, it will use DefaultCredentialsProviderChain)
s3.accessKey.0=<SECRET>
s3.secretKey.0=<SECRET>
## ADLS Storage Config (Multiple configs can be added by incrementing the index)
adls.storageAccountName.0=<ADLS_STORAGEACCOUNTNAME>
adls.tenantId.0=<ADLS_TENANTID>
adls.clientId.0=<ADLS_CLIENTID>
adls.clientSecret.0=<SECRET>
## GCS Storage Config (Multiple configs can be added by incrementing the index)
gcs.bucketPath.0=<GCS_BUCKET>
# Optional (If blank, it will use Default Application chain to find credentials)
gcs.jsonKeyFilePath.0=/path/to/<SECRET>/gcp-key-uc-testing.json

[Optional] Restart Unity Catalog Server

If the UC Server is already started, please restart it to account for the cloud storage server properties.

cd unitycatalog/
bin/start-uc-server

Working with Unity Catalog Tables with Apache Spark and Delta Lake Locally

Let’s start running some Spark SQL queries in the Spark SQL shell (bin/spark-sql) or PySpark shell (bin/pyspark) within the terminal of your Apache Spark 3.5.3 folder against your local UC.

bin/spark-sql --name "local-uc-test" \
    --master "local[*]" \
    --packages "io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
    --conf "spark.sql.catalog.unity.token=" \
    --conf "spark.sql.defaultCatalog=unity"
bin/pyspark --name "local-uc-test" \
    --master "local[*]" \
    --packages "io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
    --conf "spark.sql.catalog.unity.token=" \
    --conf "spark.sql.defaultCatalog=unity"

Tip

Initially, this may take a few minutes to run to download the necessary dependencies. Afterwards, you can run some quick commands to see your UC assets within Spark SQL shell.

Notice the following packages (--packages) and configurations (--conf)

  • --packages points to the delta-spark and unitycatalog-spark packages; update the version numbers to your current versions.
  • spark.sql.catalog.unity.uri points to your local development UC instance
  • spark.sql.catalog.unity.token is empty indicating there is no authentication; refer to auth for more information.
  • spark.sql.defaultCatalog=unity must be filled out to indicate the default catalog.
Three-part and two-part naming conventions

As noted in Unity Catalog 101, UC has a three-part naming convention of [catalog].[schema].[asset]. In the following examples, you can use the three-part notation such as SELECT * FROM unity.default.marksheet; or the two-part notation SELECT * FROM default.marksheet; as the defaultCatalog is already configured.

[Optional] Running Spark SQL for Cloud Object Stores

If you would like to run this against cloud object storage, the following versions of the bin/spark-sql shell command.

bin/spark-sql --name "s3-uc-test" \
    --master "local[*]" \
    --packages "org.apache.hadoop:hadoop-aws:3.3.4,io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem" \
    --conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
    --conf "spark.sql.catalog.unity.token=" \
    --conf "spark.sql.defaultCatalog=unity"
bin/spark-sql --name "azure-uc-test" \
    --master "local[*]" \
    --packages "org.apache.hadoop:hadoop-azure:3.3.6,io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
    --conf "spark.sql.catalog.unity.token=" \
    --conf "spark.sql.defaultCatalog=unity"
bin/spark-sql --name "gcs-uc-test" \
    --master "local[*]" \
    --jars "https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/3.0.2/gcs-connector-3.0.2-shaded.jar" \
    --packages "io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0" \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" \
    --conf "spark.hadoop.fs.AbstractFileSystem.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS" \
    --conf "spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog" \
    --conf "spark.sql.catalog.unity.uri=http://localhost:8080" \
    --conf "spark.sql.catalog.unity.token=" \
    --conf "spark.sql.defaultCatalog=unity"

Using Spark SQL to query Unity Catalog schemas and tables

Let’s start by running some quick commands from the Spark SQL and pyspark shells.

The following SHOW SCHEMA shows the default schema that is included in the initial UC configuration.

-- Show schemas (output = default)
SHOW SCHEMAS;

-- Show tables
SHOW TABLES IN default;
# Show schemas (output = default)
sql("SHOW SCHEMAS").show()

# Show tables
sql("SHOW TABLES IN default").show()        

with the output similar to:

+---------+-----------------+-----------+
|namespace|        tableName|isTemporary|
+---------+-----------------+-----------+
|  default|        marksheet|      false|
|  default|marksheet_uniform|      false|
|  default|          numbers|      false|
|  default|   user_countries|      false|
+---------+-----------------+-----------+

Let’s query the first five rows of the marksheet table.

SELECT * FROM default.marksheet LIMIT 5;
sql("SELECT * FROM default.marksheet LIMIT5;").show()

With the output looking similar to the following.

+---+----------+-----+
| id|      name|marks|
+---+----------+-----+
|  1|nWYHawtqUw|  930|
|  2|uvOzzthsLV|  166|
|  3|WIAehuXWkv|  170|
|  4|wYCSvnJKTo|  709|
|  5|VsslXsUIDZ|  993|
+---+----------+-----+

Running CRUD Operations on a Unity Catalog table

Let’s extend this example by executing various CRUD operations on our UC tables.

Create new schema
-- Create new schema
CREATE SCHEMA demo;

-- Should now show two schemas: default and demo
SHOW SCHEMAS;
Create new table
-- Create a new table
CREATE TABLE
demo.mytable (id INT, desc STRING) 
USING delta 
LOCATION '<LOCATION>';
-- Example location:
-- LOCATION '/tmp/tables/mytable';
Insert new rows into table
-- Insert new rows
INSERT INTO demo.mytable VALUES (1, "test 1");
INSERT INTO demo.mytable VALUES (2, "test 2");
INSERT INTO demo.mytable VALUES (3, "test 3");
INSERT INTO demo.mytable VALUES (4, "test 4");

-- Read table
SELECT * FROM demo.mytable;
Update row in table
-- Update row in table
UPDATE demo.mytable SET id = 5 WHERE id = 4;
Delete row from table
-- Delete rows
DELETE FROM demo.mytable WHERE id = 5;
Merge mytable with srctable
-- Create secondary table (we will use this as the source for merge)
CREATE TABLE
demo.srctable (id INT, desc STRING) 
USING delta
LOCATION '<LOCATION>';
-- Example location:
-- LOCATION '/tmp/tables/srctable';

-- Insert new rows
INSERT INTO demo.srctable VALUES (3, "updated");
INSERT INTO demo.srctable VALUES (4, "inserted");

-- Merge
MERGE INTO demo.mytable as target
USING demo.srctable as source
   ON target.id = source.id
 WHEN MATCHED THEN
      UPDATE SET *
 WHEN NOT MATCHED THEN
      INSERT *
;

-- Check results
SELECT * FROM demo.mytable;
Merged Results
3       updated
4       inserted
1       test 1
2       test 2
Drop table
-- Drop tables
DROP TABLE demo.srctable;

-- Check results
SHOW TABLES IN default;

Warning

Note, this action will only drop the table from UC, it will not remove the data from the file system