Using Apache Iceberg in AWS Glue Jobs
What is Apache Iceberg?
Apache Iceberg is a data lake file format that provides features including ACID transactions and snapshot history. By default, it saves data as parquet, along with metadata files that the Iceberg library uses to enable its features.
Even if you don’t need the features it provides, it is still a good default choice for tables in your data lake because it is a standard format with first-class support from several AWS services and Spark.
Iceberg requires Glue 3.0 or later. Glue 4.0 is the latest version at the time of writing.
Creating Iceberg Tables in Glue
First, make sure that your table’s database has a default location. Your table does not need to be in the default location, but the Iceberg library works smoother if the database has a default location nonetheless. I will use Terraform to illustrate infrastructure settings. Feel free to make these settings in the AWS console if you prefer.
resource "aws_glue_catalog_database" "example" { name = "example" location_uri = "s3://service-bucket/example"}
Now you can create your table.
spark.sql("""CREATE TABLE IF NOT EXISTS example.my_table ( id bigint, name string, city string, size bigint, categories array<string>, date string) USING iceberg PARTITIONED BY (date) TBLPROPERTIES ( 'table_type' = 'ICEBERG', 'format-version' = '2', 'format' = 'parquet', 'write_compression' = 'snappy', 'vacuum_min_snapshots_to_keep' = '2', 'write.target-file-size-bytes' = '536870912', 'write.distribution-mode' = 'hash')""")
The TBLPROPERTIES
are optional, but highly recommended. I will take a deeper dive into these settings in a future post.
Spark Configuration
There are two routes you can go to enable Iceberg support in your Glue jobs. You can use a job parameter to enable Iceberg support, or add the depenedencies yourself.
Pros to using a job parameter:
- AWS docs assume this option. The docs will always reference the same Iceberg version.
- Less room for human error. You don’t need to worry about choosing compatible jars for your Glue and Spark versions.
Cons to using a job parameter:
- You can’t use the latest version of Iceberg.
- Spark configuration is a little more complicated.
- Querying Iceberg tables is a little less ergonomic.
I have recently switched to managing the Iceberg dependencies myself. If you are less comfortable with Spark, feel free to use the job parameter.
Using a Job Parameter
Add the following parameter to your job:
resource "aws_glue_job" "job" { # ... glue_version = "4.0"
default_arguments = { # ... "--datalake-formats" = "iceberg" "--enable-glue-datacatalog" = true }}
This will include Iceberg 1.0.0 in your job. You need to set a few Spark configuration options to enable Iceberg support.
In 1.0.0, you must access Iceberg tables using a qualified catalog name. The name does not matter. This is a little annoying, but not a big deal. The Glue docs tend to use glue_catalog
as the catalog name, so I will use that in examples.
You must also specify a warehouse location. This is where Iceberg will store its metadata if the table does not already have a location set. The Glue database default location takes precedence over this setting. I recommend setting the warehouse location to the same location as the database default location. If you are accessing Iceberg tables from more than one database, you can create multiple catalogs per database. However, this is not necessary if your databases have their default locations set.
spark = SparkSession.builder \ .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ .config("spark.sql.catalog.glue_catalog.warehouse", "s3://service-bucket/example") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .getOrCreate()
Now you can access Iceberg tables using the catalog name.
my_table = spark.table("glue_catalog.example.my_table")
val spark = SparkSession.builder .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") .config("spark.sql.catalog.glue_catalog.warehouse", "s3://service-bucket/example") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .getOrCreate()
Now you can access Iceberg tables using the catalog name.
val myTable = spark.table("glue_catalog.example.my_table")
Self-Managed Dependencies
If you want to use the latest version of Iceberg, you need to manage the dependencies yourself. This is not too difficult, especially in Scala.
Regardless of your language, DO NOT set datalake-formats = iceberg
in your job parameters. This will cause the versions to clash.
You still need enable-glue-datacatalog
set to true.
Unfortunately, python jobs require a bit of effort to add the dependencies. You need to download the jars and add them to S3. Then you can include them in your job with the --extra-jars
parameter. The jars are available on maven central. I have included links to the latest version of iceberg at the time of writing.
After uploading to S3, update your job in terraform:
resource "aws_glue_job" "job" { # ... glue_version = "4.0"
default_arguments = { # ... "--datalake-formats" = "iceberg" "--extra-jars" = "s3://service-bucket/jars/iceberg-spark-runtime-3.3_2.12-1.4.3.jar,s3://service-bucket/jars/iceberg-aws-bundle-1.4.3.jar" "--enable-glue-datacatalog" = "true" }}
Now that you have your dependencies added, set these configs in your Spark session:
spark = SparSession.builder \ .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \ .config("spark.sql.catalog.spark_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ .config("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .getOrCreate()
If you are packaging your job logic into a JAR, Just add the following to your libraryDependencies
in build.sbt:
"org.apache.iceberg" % "iceberg-aws-bundle" % "<ICEBERG_VERSION>","org.apache.iceberg" %% "iceberg-spark-runtime-3.3" % "<ICEBERG_VERSION>",
Now that you have your dependencies added, set these configs in your Spark session:
val spark = SparSession.builder .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") .config("spark.sql.catalog.spark_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") .config("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .getOrCreate()
The nice thing is that these do not require a separate, named catalog. Assuming your databases have a default location set, you do not need to provide a warehouse either. These values can be copied and pasted from job-to-job without any modifications.
You can access Iceberg tables like any other table in the Glue catalog now.
spark.table("example.my_table")
It is a minor thing, but it is nice to treat all tables the same. This way you don’t need to know which tables are Iceberg and which are not.
Writing Data in Spark
Iceberg uses Spark’s DataSource V2 API. If you are used to writing data directly to an S3 location, you will need to make some changes.
When you write to an existing table, you do not need to know the location or partitioning scheme. You can just write to the table by name:
df.writeTo("example.my_table").overwritePartitions()
overwritePartitions()
will overwrite all the data in partitions of the table that exist in the dataframe. For example, if the dataframe contains rows where date = '2023-12-14
, all existing data within that partition will be deleted.
I find this to be my most used method. It also works fine if you are writing to a partition that does not exist yet (e.g. running a daily job that adds a new date
partition value). If the table is not partitioned at all, it will overwrite the entire table.
Other methods include:
overwrite(condition)
- Overwrite all data in the table matching a condition.replace()
- Overwrite the entire table, keeping the same schema.append()
- Append data to the table.
What’s Next?
In my next post, I will cover how to create Iceberg tables that don’t explode in file count.