Keep Your File Count Under Control with AWS Glue Iceberg Tables

What’s the Problem?

AWS is pushing Iceberg as the default format for data lake tables. It is a great format, but it is not without its quirks. One issue is that Iceberg tables can quickly grow to have hundreds of thousands of files. This can cause performance issues or job failures due to S3 slow down errors.

Why Does This Happen?

It can happen for a number of reasons, but the most common is that the table is not configured properly. The default settings are not optimal for ACID transactions. Even for relatively small jobs, file count can quickly grow out of control.

AWS documentation on Iceberg is not great. It uses out-of-date library versions, and it does not cover all the options available. They also fail to acknowledge that the tool you use to create the table will affect its properties.

Here are the properties I have found to help control file count when working with Iceberg tables on AWS Glue.

Table Properties

These are TBLPROPERTIES you can I have found that limit the number of files created:

'table_type' = 'ICEBERG',
'format-version' = '2',
'format' = 'parquet',
'write_compression' = 'snappy',
'vacuum_min_snapshots_to_keep' = '2',
'write.target-file-size-bytes' = '536870912',
'write.distribution-mode' = 'hash'

Note: vacuum_min_snapshots_to_keep depends on your use case. This is discussed in more detail below.

Picking a Tool to Create the Table

If you create an Iceberg table in the Athena console, it will add a bunch of prefixes in the s3 location to optimize Athena queries. If this fits your use case, then feel free to go ahead and use Athena.

This strategy hurts Spark performance. If your table will regularly be used in Spark jobs, stick to creating the table in Spark. I have my table DDLs saved in git, and I run them in a Jupyter notebook.

In general, I recommend creating the table in Spark. It is more flexible, and Athena still works fine with tables created in Spark.

spark.sql("""
CREATE TABLE
IF NOT EXISTS example.my_table (
id bigint,
name string,
city string,
size bigint,
categories array<string>,
date string
) USING iceberg PARTITIONED BY (date) TBLPROPERTIES (
'table_type' = 'ICEBERG',
'format-version' = '2',
'format' = 'parquet',
'write_compression' = 'snappy',
'vacuum_min_snapshots_to_keep' = '2',
'write.target-file-size-bytes' = '536870912',
'write.distribution-mode' = 'hash'
)
""")

This is the most important take away. Without appropriate table properties, no amount of maintenance will keep your table performant.

Regardless, maintenance is still vital to keep your table’s file count in check and your jobs running smoothly.

Maintenance

Iceberg provides procedures to perform regular maintenance on your tables.

Rewrite Data Files

Your job may output files in a way that make downstream processes less performant. To optimize the files, you can run the rewrite_data_files procedure. Here are some options I’ve found to work well:

spark.sql("""
CALL system.rewrite_data_files(
table => 'example.my_table',
options => map(
'max-concurrent-file-group-rewrites', '100',
'partial-progress.enabled', 'true',
# This option is good for merge operations, otherwise leave it off
'rewrite-all', 'true'
)
)
""")

Note: These examples assume you’re managing Iceberg dependencies yourself. If you are using the job parameter, you need to use the qualified catalog name after system. (e.g. system.glue_catalog.rewrite_data_files). See my previous post on configuring your Glue jobs for Iceberg.

A good default is to rewrite data files once a day during off-peak hours. We have some batch processes that run once every night and they immediately run the procedure in the same Spark job after writing to the table.

If your table is append-only, you may not benefit from this procedure if you already manage your file count when you write data.

Extra Options

If you know that your table will often be filtered by a column that is not a partition, you can optimize the files for that filter:

spark.sql("""
CALL system.rewrite_data_files(
table => 'example.my_table',
options => map(
'max-concurrent-file-group-rewrites', '100',
'partial-progress.enabled', 'true',
),
strategy => 'sort',
sort_order => 'zorder(city, size)'
)
""")

You can also supply a filter to only rewrite files that may contain data that matches:

spark.sql("""
CALL system.rewrite_data_files(
table => 'example.my_table',
options => map(
'max-concurrent-file-group-rewrites', '100',
'partial-progress.enabled', 'true',
),
where => 'date > "2023-12-15"'
)
""")

Expire Snapshots

ACID operations create extra files in the table. These files do not affect query performance, but they do incur a storage cost. Iceberg saves snapshots of your table, and allows the user to time travel to previous snapshots. The storage cost vs. utility of snapshots will depend on your particular use case. The default behavior is to prevent deletion of snapshots younger than 7 days old.

Eventually you will need to expire snapshots that are no longer useful:

spark.sql("CALL system.expire_snapshots('example.my_table')")

You can delete snapshots older than a specific date, but it will not allow a timestamp within the last 48 hours:

spark.sql("""
CALL system.expire_snapshots(
table => 'example.my_table',
older_than => TIMESTAMP '2023-12-01 00:00:00.000'
)
""")

Depending on your storage costs, you don’t need to expire snapshots too aggressively. A good default is to expire snapshots once a month.

Again, if your table is append-only, you may not benefit much from this procedure. It would only delete metadata files.

Orphan Files

Files that are no longer referenced by any metadata files, and are therefore useless, are called orphan files. You should occasionally delete these as well:

spark.sql("CALL system.remove_orphan_files(table => 'example.my_table')")

A good default is to remove orphan files when you expire snapshots. If you do substantial data file rewrites, you may want to do this more often.