Using Apache Iceberg in AWS Glue Jobs

What is Apache Iceberg?

Apache Iceberg is a data lake file format that provides features including ACID transactions and snapshot history. By default, it saves data as parquet, along with metadata files that the Iceberg library uses to enable its features.

Even if you don’t need the features it provides, it is still a good default choice for tables in your data lake because it is a standard format with first-class support from several AWS services and Spark.

Iceberg requires Glue 3.0 or later. Glue 4.0 is the latest version at the time of writing.

Creating Iceberg Tables in Glue

First, make sure that your table’s database has a default location. Your table does not need to be in the default location, but the Iceberg library works smoother if the database has a default location nonetheless. I will use Terraform to illustrate infrastructure settings. Feel free to make these settings in the AWS console if you prefer.

resource "aws_glue_catalog_database" "example" {
name = "example"
location_uri = "s3://service-bucket/example"
}

Now you can create your table.

spark.sql("""
CREATE TABLE
IF NOT EXISTS example.my_table (
id bigint,
name string,
city string,
size bigint,
categories array<string>,
date string
) USING iceberg PARTITIONED BY (date) TBLPROPERTIES (
'table_type' = 'ICEBERG',
'format-version' = '2',
'format' = 'parquet',
'write_compression' = 'snappy',
'vacuum_min_snapshots_to_keep' = '2',
'write.target-file-size-bytes' = '536870912',
'write.distribution-mode' = 'hash'
)
""")

The TBLPROPERTIES are optional, but highly recommended. I will take a deeper dive into these settings in a future post.

Spark Configuration

There are two routes you can go to enable Iceberg support in your Glue jobs. You can use a job parameter to enable Iceberg support, or add the depenedencies yourself.

Pros to using a job parameter:

Cons to using a job parameter:

I have recently switched to managing the Iceberg dependencies myself. If you are less comfortable with Spark, feel free to use the job parameter.

Using a Job Parameter

Add the following parameter to your job:

resource "aws_glue_job" "job" {
# ...
glue_version = "4.0"
default_arguments = {
# ...
"--datalake-formats" = "iceberg"
"--enable-glue-datacatalog" = true
}
}

This will include Iceberg 1.0.0 in your job. You need to set a few Spark configuration options to enable Iceberg support.

In 1.0.0, you must access Iceberg tables using a qualified catalog name. The name does not matter. This is a little annoying, but not a big deal. The Glue docs tend to use glue_catalog as the catalog name, so I will use that in examples.

You must also specify a warehouse location. This is where Iceberg will store its metadata if the table does not already have a location set. The Glue database default location takes precedence over this setting. I recommend setting the warehouse location to the same location as the database default location. If you are accessing Iceberg tables from more than one database, you can create multiple catalogs per database. However, this is not necessary if your databases have their default locations set.

spark = SparkSession.builder \
.config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
.config("spark.sql.catalog.glue_catalog.warehouse", "s3://service-bucket/example") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.getOrCreate()

Now you can access Iceberg tables using the catalog name.

my_table = spark.table("glue_catalog.example.my_table")

Self-Managed Dependencies

If you want to use the latest version of Iceberg, you need to manage the dependencies yourself. This is not too difficult, especially in Scala.

Regardless of your language, DO NOT set datalake-formats = iceberg in your job parameters. This will cause the versions to clash.

You still need enable-glue-datacatalog set to true.

Unfortunately, python jobs require a bit of effort to add the dependencies. You need to download the jars and add them to S3. Then you can include them in your job with the --extra-jars parameter. The jars are available on maven central. I have included links to the latest version of iceberg at the time of writing.

After uploading to S3, update your job in terraform:

resource "aws_glue_job" "job" {
# ...
glue_version = "4.0"
default_arguments = {
# ...
"--datalake-formats" = "iceberg"
"--extra-jars" = "s3://service-bucket/jars/iceberg-spark-runtime-3.3_2.12-1.4.3.jar,s3://service-bucket/jars/iceberg-aws-bundle-1.4.3.jar"
"--enable-glue-datacatalog" = "true"
}
}

Now that you have your dependencies added, set these configs in your Spark session:

spark = SparSession.builder \
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
.config("spark.sql.catalog.spark_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.getOrCreate()

The nice thing is that these do not require a separate, named catalog. Assuming your databases have a default location set, you do not need to provide a warehouse either. These values can be copied and pasted from job-to-job without any modifications.

You can access Iceberg tables like any other table in the Glue catalog now.

spark.table("example.my_table")

It is a minor thing, but it is nice to treat all tables the same. This way you don’t need to know which tables are Iceberg and which are not.

Writing Data in Spark

Iceberg uses Spark’s DataSource V2 API. If you are used to writing data directly to an S3 location, you will need to make some changes.

When you write to an existing table, you do not need to know the location or partitioning scheme. You can just write to the table by name:

df.writeTo("example.my_table").overwritePartitions()

overwritePartitions() will overwrite all the data in partitions of the table that exist in the dataframe. For example, if the dataframe contains rows where date = '2023-12-14, all existing data within that partition will be deleted.

I find this to be my most used method. It also works fine if you are writing to a partition that does not exist yet (e.g. running a daily job that adds a new date partition value). If the table is not partitioned at all, it will overwrite the entire table.

Other methods include:

What’s Next?

In my next post, I will cover how to create Iceberg tables that don’t explode in file count.