In modern data-driven organizations, a Data Warehouse (DWH) has become a foundational tool for storing, managing, and analyzing vast amounts of information. A DWH provides a centralized repository where data is collected from various sources and optimized for querying and reporting. This enables programmers, data scientists, and analysts to have immediate access to structured data, making their workflows more efficient and development cycles faster.
For football clubs, adopting best practices from other industries has led to the introduction of DWHs for handling their ever-increasing volumes of match data. While football clubs excel on the pitch, many face challenges off the field when it comes to their infrastructure. These clubs often have suboptimal internet connections at their remote match and training facilities, making real-time access to the data stored in the DWH a significant challenge.
In this blog post, we’ll explore how using Iceberg, a modern table format for DWHs, in combination with PyIceberg will allow for a 10x speed improvement in data access. To query Iceberg tables, tools like Spark or AWS Athena are commonly used. While these tools can handle large amounts of data, they are not particularly fast. In exploratory data analysis or local development, a slow read will cause longer development cycles. In this article, we explore the characteristics of the Iceberg format to see how we can get a 10x speedup by eliminating network requests, while still maintaining “always-fresh” data.
The data used in this blog is football data(soccer for the American people). It contains the data for one season: 300+ matches and 1.100.000+ rows in total. The table is partitioned per match, and an optimization runs after new data is inserted. This means the data is not fragmented.
The experiments are performed with a 20mbit internet connection.
When we load the data without our solution it takes 49,3 seconds.
[pyiceberg] scan for data took: 2,58ms [pyiceberg] plan files took: 8187,80ms [pyiceberg] projection took: 7,15us [pyiceberg] project_table took: 40453,05ms [pyiceberg] to polars took: 638,91ms load took: 49282,80ms loaded 1.117.921 rows x 140 columns - 1,17GB
When we add our solution the read takes 4.5 seconds, achieving a 10.9x speedup.
[pyiceberg] scan for data took: 3,03ms
[pyiceberg] plan files took: 903,44ms
[pyiceberg] projection took: 6,91us
[pyiceberg] project_table took: 3033,15ms
[pyiceberg] to polars took: 584,05ms
load took: 4524,14ms
loaded 1.117.921 rows x 140 columns - 1,17GB
History
In the early days of data management, simplicity sufficed—local CSV files were the norm. As datasets grew, this quickly became unmanageable, driving the shift toward cloud storage, distributed processing, and columnar formats optimized for large-scale querying.
Hadoop (2007) marked a turning point, enabling distributed data processing at scale. Later, Parquet (2013) became the go-to columnar storage format, offering efficient compression, encoding, and selective column reads, which drastically reduced storage costs and improved performance.
These innovations laid the groundwork for modern table formats like Iceberg, which further addressed challenges in handling evolving schemas, maintaining performance, and supporting ACID guarantees for consistent, reliable data.
The need for robust data management in environments with high concurrency and massive datasets led to the adoption of ACID principles, which are foundational for ensuring data integrity—especially in scenarios involving multiple writers and readers interacting with shared datasets.
ACID
In distributed systems, ensuring that data operations are consistent, isolated, and durable is crucial to maintaining data integrity, especially in high-concurrency environments. ACID (Atomicity, Consistency, Isolation, Durability) principles form the foundation of modern transactional guarantees.
For the purposes of this article, we’ll focus on Atomicity and Isolation, two key principles that prevent inconsistent reads and incomplete transactions:
- Atomicity ensures that each transaction is treated as an indivisible unit. Either all the steps in the transaction succeed, or none do, preventing partially completed operations from corrupting the data.
- Isolation guarantees that transactions occur independently. Even when multiple transactions are executed concurrently, the result is the same as if they were executed sequentially, ensuring no transaction is exposed to intermediate states of another.
In distributed storage formats like Iceberg, these principles are critical. Iceberg ensures that readers never see partial or uncommitted changes, and that transactions either fully succeed or fully fail, maintaining consistency across even the largest datasets.
Handling Parquet File Overwrites and Concurrent Reads in Distributed Systems
Consider a scenario where your data is stored in Parquet format on S3, and new data is ingested into your data warehouse (DWH) every hour. The DWH uses Hive partitioning for query optimization via partition pruning. To improve write performance, we use multiple writers to handle the incoming data, resulting in files that look like this:
some_partition=1/
file1-<timestamp>.parquet
file2-<timestamp>.parquet
Each writer appends a unique suffix to the file name. This approach works well and delivers good performance.
For querying, we use duckdb
to run a scan_parquet
directly on S3. Generally, this performs as expected, but occasionally, the results seem incorrect—specifically, fewer rows than expected. Upon re-running the query, the result is accurate. After further investigation, it became clear that the issue occurs when a query reads data during a write operation. DuckDB is accessing partially written data.
A similar problem arises with overwrites. In some cases, we noticed an inflated row count due to the system reading both old and new data files before the deletion of old files completes. As a result, queries running during the write/overwrite process can yield incorrect results.
How Iceberg Ensures Data Integrity and Performance
Iceberg addresses two key challenges in modern data handling:
- Enhancing user experience
- Ensuring reliability and performance
On the user experience front, Iceberg offers advanced features like:
- Schema evolution, allowing modifications without rewriting existing data
- Hidden partitioning for improved query performance without the need for manual partitioning
- Evolving partition layouts over time
- Time travel, enabling access to historical table states
For our use case, schema evolution allows us to add new metrics to the DWH without reprocessing or rewriting all existing data. For more information on Iceberg’s user-centric features, refer to the documentation here.
In terms of reliability and performance, Iceberg stands out by enforcing strict serializable isolation. This ensures that all table changes are atomic, and readers never encounter partial or uncommitted data. As a result, issues like reading incomplete data files—common in Parquet and other formats—are eliminated. Readers are guaranteed to see consistent data, even during concurrent write operations.
This means that in contrast to our previous experience with DuckDB and Parquet, Iceberg avoids reading partial or inconsistent data, solving the problem of incorrect query results due to concurrent writes.
For more information on Iceberg’s reliability and performance features, check out the official documentation here.
Understanding Inserts, Updates, and Deletes in Iceberg
To effectively work with Iceberg, it’s important to understand its specification, particularly how it handles data operations like inserts, updates, and deletes.
In Iceberg, individual data files within a table are tracked explicitly, rather than organized into directories. This allows writers to generate data files independently and only add them to the table during an explicit commit.
The table’s state is maintained through metadata files. Each change to the table (e.g., insert, update, or delete) generates a new metadata file, which replaces the previous one in an atomic swap. The metadata file contains critical information, such as the table schema, partitioning configuration, custom properties, and snapshots of the table’s contents. A snapshot represents the table’s state at a specific point in time, encompassing all data files included in that snapshot.
Manifest files track the data files within each snapshot, including partition data and associated metrics. Since manifest files are reused across snapshots, they minimize unnecessary rewrites of metadata. Each snapshot is constructed from multiple manifest files, and Iceberg uses a manifest list file to manage metadata about these manifests (e.g., partition statistics and data file counts). This structure optimizes read operations by skipping unnecessary manifests.
According to the Iceberg specification, all files—including manifest files—are immutable. Once written, they do not change, though they can be deleted during comp
Navigating Remote File Access with PyIceberg and S3
In this section, we’ll explore how PyIceberg handles reading data from a remote filesystem, specifically S3. We’ll walk through key code snippets, explain the flow, and point out how different components come together to load data.
We start with the basic operation of scanning a table stored in Iceberg, querying for a specific match ID, and converting the result to an Arrow table:
table = catalog.load_table((database_name, table_name)) scan = table.scan("match_id = 123213") df = scan.to_arrow()
The to_arrow()
method plays a critical role in converting the scanned data into an Arrow table. Let’s dive into the details of this method:
def to_arrow(self) -> pa.Table: from pyiceberg.io.pyarrow import project_table return project_table( self.plan_files(), self.table_metadata, self.io, self.row_filter, self.projection(), case_sensitive=self.case_sensitive, limit=self.limit, )
Here, project_table()
is the key function that processes the data. It plans which files to read from S3 and applies any filters or projections.
Next, let’s look at how PyIceberg selects the appropriate filesystem to access data. The function below determines the scheme (e.g., S3) from the table’s metadata and initializes the corresponding filesystem:
scheme, netloc, _ = PyArrowFileIO.parse_location(table_metadata.location) if isinstance(io, PyArrowFileIO): fs = io.fs_by_scheme(scheme, netloc)
The fs_by_scheme()
function maps the scheme (like s3
or s3a
) to the actual filesystem implementation. Here’s the core of that function:
self.fs_by_scheme: Callable[[str, Optional[str]], FileSystem] = lru_cache(self._initialize_fs)
This means it caches the filesystem based on the scheme, optimizing performance for subsequent operations.
Finally, _initialize_fs()
constructs the S3 filesystem using PyArrow’s S3FileSystem
, setting up parameters such as access keys, session tokens, and region details:
def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem: if scheme in {"s3", "s3a", "s3n"}: from pyarrow.fs import S3FileSystem client_kwargs: Dict[str, Any] = { "endpoint_override": self.properties.get(S3_ENDPOINT), "access_key": self.properties.get(S3_ACCESS_KEY_ID), "secret_key": self.properties.get(S3_SECRET_ACCESS_KEY), "session_token": self.properties.get(S3_SESSION_TOKEN), "region": self.properties.get(S3_REGION), } return S3FileSystem(**client_kwargs)
This ensures that the PyIceberg scan uses the correct S3 client, which is responsible for fetching the data from the S3 buckets.
Using FSSpec as an Alternative
While PyArrow’s S3FileSystem
is commonly used for accessing remote data, you can also use the fsspec
library as an alternative. fsspec
is a versatile library that provides a unified interface to various filesystems, including cloud storage like S3 and even platforms like Dropbox.
By using fsspec
, you gain additional flexibility and the ability to switch between different backends seamlessly. Learn more about fsspec
here.
Implementing S3 Caching with FSSpec and SimpleCacheFileSystem
FSSpec offers a powerful interface for abstracting file system operations. In this case, we can easily swap a local filesystem implementation with an S3-backed one without requiring PyIceberg (or any user) to know where the data originates from. If the FSSpec implementation is correct, it will seamlessly handle requests.
Let’s explore a practical scenario: building a custom FSSpec implementation that first checks for a file in the local filesystem, and if it’s not found, delegates the read operation to S3. Once fetched from S3, the file is stored locally for faster subsequent access.
While this is doable manually, the good news is that FSSpec already includes SimpleCacheFileSystem
, which handles exactly this kind of caching.
Here’s how it works:
- Initialize the S3 filesystem: First, create an S3 filesystem using the
s3fs
library.
from fsspec.implementations.cached import SimpleCacheFileSystem import s3fs s3filesystem = s3fs.S3FileSystem( key=S3_ACCESS_KEY_ID, secret=S3_SECRET_ACCESS_KEY, token=S3_SESSION_TOKEN, )
- Create the cache layer: Now we add a caching layer on top of the S3 filesystem using
SimpleCacheFileSystem
. This layer caches files locally (in/tmp/file-cache
), so once a file is read, subsequent reads are faster.
fs = SimpleCacheFileSystem( fs=s3filesystem, cache_storage="/tmp/file-cache", check_files=False, expiry_time=False, cache_check=10 )
- Reading the file: The first read will be slower as the file gets fetched from S3. Once cached locally, future reads are faster since the file comes from the local cache.
with fs.open("s3://bucket/some-file.txt", "r") as fp: data = fp.read()
This setup allows for efficient access to remote files with local caching, improving performance on repeated reads.
Performance Impact of Caching with FSSpec
When we introduced FSSpec caching, one of the immediate questions was: How does this affect performance, especially in systems like DuckDB, which recently added FSSpec support?
There has been a lot of discussion in the community about integrating caching to improve query times. Here are some highlights:
Mim
If I may, what’s really missing today from the DuckDB + Arrow ecosystem is a disk cache, that’s the killer feature, we can worry about query distribution later ????
(Original Tweet: link)
Koen
With the support of fsspec, it should be possible to route all requests through a local cache or even use Nginx as a proxy. Maybe it’s too pragmatic, but it might work!
(Original Tweet: link)
Koen
Simple example of how to add a caching layer to DuckDB queries using FSSpec.
First run took: 16892.7ms
Second run took: 341.2ms
(Original Tweet: link)
If you’re interested in a working example, you can find one here.
These examples illustrate the significant performance boost that caching with FSSpec can bring, reducing query times by orders of magnitude once data has been cached locally.
Optimizing Caching for Iceberg: Handling Immutable Data
To optimize caching for Iceberg, the primary objective is to minimize network traffic. This leads us to two critical constraints:
- We don’t need to check if a file has changed.
- We don’t need to check if a file still exists.
When new data is written in Iceberg, several steps occur:
- New Parquet files are written.
- A manifest file is created, followed by a manifest list.
- A new metadata file is written.
- The Iceberg catalog is updated to point to the new metadata file.
Since files in Iceberg are immutable, once written, they are never changed. For example, if we add random Parquet files, they will be ignored unless they are listed in the manifest. Even when old data is overwritten, Iceberg adds new delete files, but the underlying files remain unchanged.
When an “optimize” operation is run, files that should no longer be read are simply excluded from the new manifest list, turning them into orphan files. These files exist, but Iceberg doesn’t reference them anymore.
Adding FSSpec support to PyIceberg was a bit of a journey, requiring deep exploration into how PyIceberg handles IO operations. Initially, we started by overwriting table.io
to customize the IO implementation. This approach worked, but we quickly realized it was not the most elegant solution.
Here’s where the problem lies: by default, PyIceberg picks the IO handler based on the schema of the data’s location (e.g., s3://
for S3 or file://
for local storage). While this works fine for general use cases, the performance was still slow because of how Iceberg interacts with S3. Before each file read, the cache layer was making unnecessary HeadObject
requests to check if the file exists, which is both slow and costly.
Since Iceberg uses a manifest file to list all the files required for a particular query, there’s no need to check if a file exists on S3—if it’s listed in the manifest, we know we need to read it. To solve this, we needed to cache the isfile
operation. Instead of querying S3 directly, we created a custom cache filesystem that checks the local cache first before hitting S3.
Here’s how we addressed this:
- Custom Cache for
isfile
: We built a custom filesystem that delegates theisfile
check to the local filesystem first, significantly reducing unnecessary and expensive network requests to S3. - Setting
scan.io
: Initially, we tried settingscan.io
to force PyIceberg to use our custom FSSpec implementation. While this worked, it wasn’t the cleanest solution. - Using
py-io-impl
: After further investigation and reviewing the documentation, we found a much cleaner approach: using thepy-io-impl
configuration option to set the IO handler. This allows us to cleanly integrate our custom FSSpec filesystem without directly overwritingtable.io
.
your_module.py:
class AggresiveSimpleCacheFileSystem(SimpleCacheFileSystem): """ Very similar to the original SimpleCacheFileSystem, but the `isfile` check will first check local cache for existence of a file. When it's there, don't bother checking origin. This can cause files to exist in the local cache that are not available at the origin. Iceberg files are always immutable. An overwrite of data happens by writing a new manifest file, and pointing the AWS Glue Table to this new file. We never cache the AWS Glue Table info. This way we always have a fresh pointer to the correct manifest file, which may, or may not be cached. """ def isfile(self, path): exists = self._check_file(path) is not None if not exists: return super().isfile(path) else: return True class CachingPyArrowFileIO(PyArrowFileIO): _pyarrow_fs = None _fsspec_fs = None def new_input(self, location): return FsspecInputFile(location=location, fs=self._fsspec_fs) def __init__(self, properties): super().__init__(properties) cache_storage = properties["cache_storage"] self.fs_by_scheme = self._fs_by_scheme fs = s3fs.S3FileSystem( key=properties.get(pyiceberg.io.S3_ACCESS_KEY_ID), secret=properties.get(pyiceberg.io.S3_SECRET_ACCESS_KEY), token=properties.get(pyiceberg.io.S3_SESSION_TOKEN), ) self._fsspec_fs = AggresiveSimpleCacheFileSystem( fs=fs, cache_storage=cache_storage, check_files=False, expiry_time=False, cache_check=10, # default, might want to change this some day. ) self._pyarrow_fs = PyFileSystem(FSSpecHandler(self._fsspec_fs)) def _fs_by_scheme(self, scheme, netloc): if scheme == "s3": return self._pyarrow_fs else: return super().fs_by_scheme(scheme, netloc)
Here’s how you can configure it:
properties= { "py-io-impl": "your_module.CachingPyArrowFileIO",
"cache_storage": "/tmp/cache-dir"}
catalog = load_catalog(catalog_name, type=catalog_type, **properties)
With this approach, we reduced read times from 50 seconds to just 4 seconds by eliminating unnecessary checks and leveraging efficient caching with FSSpec.
Addressing the CPU Bottleneck After Network Optimization
Once the network bottleneck is mitigated by adding a caching layer, the next challenge in optimizing PyIceberg performance is CPU utilization.
There are three key steps PyIceberg performs when reading data:
- Determine which files need to be read.
- Apply positional delete files to the data.
- Convert Parquet files into tables.
While PyIceberg uses multithreading to parallelize these steps, Python’s Global Interpreter Lock (GIL) limits multithreading’s effectiveness for CPU-bound tasks. In this case, IO operations benefit from multithreading, but once the IO bottleneck is reduced, the next hurdle is leveraging multiple cores for CPU-bound tasks.
To achieve better CPU utilization, multiprocessing is required. Here’s how you can replace parts of the code with multiprocessing using ProcessPoolExecutor
:
from concurrent.futures import ProcessPoolExecutor # Example of how to use ProcessPoolExecutor with ProcessPoolExecutor() as executor: results = executor.map(process_data_chunk, data_chunks)
In PyIceberg, this was applied using the ProcessPoolExecutor
in the ExecutorFactory
, but it broke some parts of the code due to serialization issues. These parts need to be rewritten to ensure the data structures can be shared between processes.
This multiprocessing approach could lead to sub-second read performance, and it remains a potential area for future improvements.
Conclusion: Optimizing Iceberg for High-Performance Reads
Iceberg is a powerful solution for bridging the gap between traditional databases and scalable, distributed storage systems. By adding caching with FSSpec and optimizing network traffic, we’ve shown that read times can be drastically reduced from 50 seconds to just 4 seconds.
However, after addressing the network bottleneck, the next frontier for optimization lies in CPU-bound tasks. Future work could focus on replacing key components with multiprocessing to fully leverage multi-core systems, potentially enabling sub-second read performance.
While Iceberg already provides a robust framework for scalable data, continued optimization through better handling of IO, CPU parallelism, and caching will allow for even greater efficiency in large-scale data environments.