Update: for the overview of table transfer protocols, see the following article.
The problem of OLAP data interoperability
The field of OLAP, columnar, time-series, search, and vector databases is burgeoning. There are a lot of serious projects under active development that deliver state-of-the-art performance in different use cases. These include AlloyDB, BigQuery, CedarDB, ClickHouse, CosmosDB, Databend, Apache Doris, Apache Druid, Firebolt, GreptimeDB, Hopsworks, InfluxDB, LanceDB, OpenObserve, Oxla, Apache Paimon, Apache Pinot, QuestDB, Redshift, RisingWave, SingleStore, Snowflake, StarRocks, Synapse Analytics, TimescaleDB, Quickwit, Vastdata, VictoriaMetrics, and many more.
Lots of innovation and diversification are also happening on the side of processing and ML engines. Apart from the usual suspects: Apache Spark, Apache Flink, Apache Hive, and Trino, there are Azure ML, Bodo, cuDF, Dask, Apache DataFusion, Dremio, DuckDB/MotherDuck, Timeplus Proton, Polars, Ray, SageMaker, Theseus, Velox, Vertex AI, and others.
Many OLAP databases can also act as processing engines, which is especially useful for cross-database joins. BigQuery, Redshift, Azure Synapse, Snowflake, ClickHouse, Databend, Doris, StarRocks, and others can do this.
The huge diversity on both the database and processing engine sides presents the classical M × N interoperability problem.
Established processing engines such as Spark and Trino have spent a lot of effort building efficient bespoke integrations with many popular OLAP databases: e.g., see the list of Trino connectors. But even Spark and Trino don’t cover the “long tail” of columnar databases and time-series stores. When the developers of new databases want to make their tables efficiently accessible from many different processing engines, they have to write a lot of custom integrations. Similarly, new processing engines are less appealing if they can efficiently read the data from (and write to) only a few databases.
In this post, I propose a solution to this interoperability problem: a family of table transfer protocols: Table Read protocol for querying, Table Write protocol for data ingestion, and a table replication method on top of Table Read and Write protocols.
Existing solutions are not enough
ConnectorX, ADBC, Arrow Dissociated IPC
ConnectorX and ADBC can take advantage of data partitioning and distribution on the database side, but not the processing engine’s side, that is, when the processing engine consists of multiple nodes. So, it works well for single-process dataframe engines, such as Pandas and Polars, but not distributed processing in Spark, Trino, etc.
Arrow Dissociated IPC is a point-to-point transfer protocol that can “accelerate” ADBC but doesn’t add client-side distribution.
Using ADBC with Arrow Flight does provide processing engine-side distribution semantics. However, Arrow Flight has other shortcomings that I discuss in the section “Arrow Flight protocol” below.
Open table formats: Iceberg, Hudi, Delta Lake
Another solution that has gained popularity recently is making the OLAP databases store data in an object storage-based table catalog, based on either Apache Iceberg, Apache Hudi, or Delta Lake table formats. Then, processing engines can read the table data from the object storage (or distributed file system) in these standardised table formats.
However, as I explained in the previous article, object storage-based table format design fundamentally limits the performance and efficiency of queries. This is especially relevant in OLAP use cases with high query volumes, the need for low query latency, or real-time data updates.
Iceberg, Hudi, and Delta Lake also restrict the data partition file formats to either Apache Parquet, ORC, or Avro. This stifles innovation in columnar data file layouts.
BigQuery Storage APIs
BigQuery Storage APIs (Read API and Write API) have been designed by Google exactly for interoperability between multiple underlying table storage formats, and thus they abstract from the storage details. Storage access efficiency and scalability were also essential design criteria1 for these APIs.
Here’s a nice picture by Google showing BigQuery (BigLake) Storage APIs’ role as the interoperability layer:
Thus, on the purely technical level, BigQuery Storage APIs already mostly fit the bill of table transfer protocols that I’m proposing.
However, there is a huge non-technical issue with BigQuery Storage APIs: Google completely controls their development. This makes it very unlikely that both databases and processing engines will adopt the BigQuery Storage APIs, especially considering that both the subject databases and processing engines usually directly compete with BigQuery and Google’ Vertex AI engine.
Also, BigQuery Storage APIs are too specialised for BigQuery’s managed storage and cloud environments. For example, these APIs are not optimised for direct interaction with storage nodes, but only for access through proxies that most open-source databases don’t even have in their designs. This point is discussed further in the section “Table Read protocol overview and comparison with Arrow Flight” below.
Arrow Flight protocol
Arrow Flight consists of two protocol layers: lower-level Arrow Flight RPC and higher-level Arrow Flight SQL. Arrow Flight RPC layer is responsible for the distribution of data transfer among possibly both database (storage) nodes and client (processing engine) nodes.
Arrow Flight can be combined with Arrow Dissociated IPC for accelerator-aware point-to-point data transfer.
Arrow Flight is a subproject of Apache Arrow. Thus, Arrow Flight’s development and governance are open, unlike the development and governance of BigQuery Storage APIs.
However, there are also a few very unfortunate limitations in Arrow Flight.
First, Arrow Flight SQL defines distribution only on the read (query) path. The write path (data ingestion) connects a single writer node and a single database (storage) node. Write distribution is needed when the processing engine wants to write the results of a distributed job back into the database. If the storage is based on open table formats (Iceberg, Hudi, or Delta Lake), the table catalog can organise parallelised writing, bypassing Arrow Flight. However, most distributed OLAP databases don’t want to use open-format tables as their primary storage because this would limit their query performance and efficiency, as I noted above.
Second, Arrow Flight always transfers columnar data in Arrow columnar format. This inflates the network IO, CPU, and memory usage of storage nodes if all that is needed from these nodes is to read highly compressed column data from disks and send this data to the processing engine or to replication workers, without any filtering or transformation.
This network IO inflation makes Arrow Flight rather inefficient in some of the use cases of open table formats: cross-cloud backup/replication and large-scale JOINs where no pre-aggregation or row-level filter can be pushed down to the level of storage nodes. This is unfortunate because it means that even if the data team is happy with their OLAP database’s performance in user-facing queries, and even if this database supports Arrow Flight access, there are lingering use cases that are not supported well and can make the data team to second-guess their choice and perhaps even switch to open table formats, sacrificing the performance and cost efficiency gains.
This adds to the gravity of open table formats like Iceberg and prevents diverse OLAP and time-series databases from rejoicing in the diverse ecosystem of processing engines without at least the very cumbersome and wasteful always-on, two-way sync between database’s native storage formats and metadata management, and open table formats.
Arrow Flight also misses some interesting possibilities for improving extensibility, resilience, data availability, and load distribution. See the section “Table Read protocol overview and comparison with Arrow Flight” below for elaboration.
Considering all these factors in aggregate, and that Arrow Flight is still implemented by very few databases2, I think there is a strong case for creating a new set of table transfer protocols, drawing the ideas and learnings from
Open table formats: Iceberg, Hudi, and Delta Lake, and the catalogs that implement authentication, authorization, and replication for these table formats,
Distributed OLAP databases and data warehouses: ClickHouse, Druid, Pinot, Doris, StarRocks, BigQuery, Snowflake, Redshift, and others,
Distributed storage systems that achieve certain properties with the help of “fat clients”, such as Facebook’s Tectonic and OK.ru’s S3-compatible storage,
BigQuery Storage API,
Arrow Flight and Arrow Dissociated IPC,
“Computational storage” in Memoria,
RSocket: a protocol providing Reactive Streams semantics,
Accelerator-aware async data transfer protocols: PyTorch RPC and Mercury,
Distributed metadata management: Oxia,
and other systems.
Table transfer protocols
First, disclaimer: in this and the following articles, I don’t aim to describe the table transfer protocols in complete detail. If such protocols are to be developed, they should be designed openly with inputs from many people with diverse perspectives and expertise. So, the descriptions below are sometimes not very precise.
My two primary goals with these protocol descriptions are to demonstrate that:
Improvements too significant to dismiss are possible over the current design of Arrow Flight.
A single set of protocols can cover all functions of open table formats like Iceberg, for most data teams, including atomic distributed table writes and replication.
However, I describe only Table Read protocol below to reduce the size of this article (already way too long). I describe Table Write protocol and a table replication method in the following article.
Basic principles
In the protocol design that I present below, I tried to follow three main principles:
Progressive: there are simpler and more advanced versions of distribution and work splitting, columnar data encoding, transport, and other aspects that different sides of the protocol interaction can support. The sides negotiate using the most suitable and efficient methods that they both support.
Cooperative: reliability and resilience, speed and resource efficiency, trust and security can be achieved most effectively when all nodes participating in the protocol interaction on both sides act cooperatively.
Not opinionated: the protocol doesn’t specifically favour
Cloud or on-premise setups,
Disaggregated object storage or disk-based storage,
Single node, distributed, or serverless databases and processing engines (clients),
IO-bound or compute bound-processing patterns,
Any specific query language or API: SQL, Spark APIs, dataframe APIs, etc.
Cross-cutting and out-of-scope concerns
I omit the discussion of authentication, access authorization, security, and proxying aspects in the protocol descriptions below.
Authentication and encryption could be added straightforwardly with proven mechanisms already used in Arrow Flight, BigQuery Storage APIs, and many other protocols.
Access authorization I think should better be left to higher-level systems that build on top of table transfer protocols:
SQL querying and transaction (BEGIN..COMMIT) facades
Table replication systems
Data governance proxies/facades such as Unity Catalog
Semantic layers such as Rill, Hashboard, Cube, etc.
Feature stores, ML training and inference systems
Consequently, the primary concerns of these and other similar systems are out of scope for table transfer protocols.
Note the difference from Arrow Flight SQL, which covers SQL queries and transactions. Thus, the table transfer protocols described below are lower-level than Arrow Flight SQL.
At the same time, table transfer protocols are higher-level than Arrow Flight RPC. Arrow Flight RPC is oblivious to table-level processing logic such as projections, filtering, aggregations, table read consistency, and table data partitioning:
For other differences from Arrow Flight, see the section "Table Read protocol overview and comparison with Arrow Flight” below.
Node roles
I’ll call a Read job the entire distributed Table Read protocol interaction between all node roles.
Within this article, I use the following terms for node roles participating in the Read job:
Agent is responsible for orchestrating the Read job on the client side, i.e., the side of the processing engine or other table data consumer. This role can be played by Spark Master, for example.
Coordinator is responsible for orchestrating the Read job on the server side, (also called database side interchangeably), i.e., the side of a database, a storage system, or other table data producer.
In databases with query broker and data server separation, such as Apache Doris, Apache Druid, or Apache Pinot, Coordinator’s role can be played by the query broker node/process (called Frontend/FE in Doris, and Broker in Druid and Pinot) that serves the Read job. In databases without such separation, such as ClickHouse, Coordinator’s role is played by any database node.
Clients are the workers who request and consume some portion of the requested table data (this portion is determined by Agent). This role can be played by Spark’s Executors, for example. If the client side is a single process, such as a local dataframing library, Client can be implemented by a bunch of threads within the same process with Agent’s logic.
Servers (identified by locations) are the nodes or serverless data access processes that serve portions of table data to Clients and optionally do some storage-side processing of the data, such as row-level filtering or aggregation.
This role can be played by stateful database processes such as ClickHouse or Doris’s Backend node, or serverless functions that read table data from disaggregated table storage, such as LanceDB or MotherDuck.
Also, the processing engine’s workers can play the role of Servers for subsequent stages in the multi-stage processing plan, or in Table Write protocol interactions. I will discuss this in more detail in the next article.
Base RPC and session layers
All protocol steps can be layered on top of gRPC with Protobuf or FlatBuffers encoding, although this is not significant for any of the protocol design aspects, and is merely a “default” choice that is already used by Arrow Flight protocol and BigQuery Storage APIs.
The exception is accelerator-aware column streaming: see steps 11-… of Table Read protocol, described in the section “Streaming column data from Server to Client”.
Table Read protocol walkthrough
The initial request and Coordinator’s logic
1 . Agent → Coordinator:
table processing plan: Substrait, partitioning hints
2. Agent ← Coordinator:
[ slice: (filter, processing plan, [partition,]), ]
// partition’ type:
(filter, [ location: (URI, serving hints), ])
Table processing plan
The table processing plan (encoded in the Substrait format) is an arbitrary processing plan with filters, projections, grouping and window aggregations on a single table or relation. The relation doesn’t need to be materialised on the server side before the beginning of the Read job: in fact, it could be the output relation of a multi-table join that the server side performs concurrently with serving the output to the client. Alternatively, if Agent (such as Spark Master) specifically wants to do a heavy join on the client (Spark’s) side, it can start two Read jobs in parallel and then plan them together. So, the requirement for the processing plan to be single-table doesn’t principally constrain the applicability of the Table Read protocol, but limits its scope and simplifies implementations.
Agent doesn’t need to know beforehand whether the server side supports the aggregation function or other pieces of the submitted plan. If the server side can’t do some parts of the plan, Coordinator can push the corresponding parts of the table processing plan back to the client side, retaining only what it can perform on Servers, as described below. This permits very “thin” server sides, such as Parquet (ORC, Lance) format-aware object storage nodes to participate in Table Read protocol, assuming that the client is a processing engine whose Clients are ready to take up the table processing plan, potentially involving distributed row shuffle between Clients.
This “partial processing plan push-back” is included in the scope of the protocol, rather than, for example, Coordinator simply responding with an error to plans that the server side can’t execute because the whole plan can inform the slice and partition breakdown that Coordinator returns to Agent.
Slices and partitions
Coordinator returns to Agent a set of slices. Slices are minimal (i.e., most granular) groups of partitions, such that completing the table processing plan on the client side doesn’t require any data exchange between different slices. If Agent goes on to task one Client with consuming and processing data of each slice, these Clients (each dedicated to their slice) won’t need to exchange any data with each other to compute a subset of the results of the table processing plan. However, Agent cannot create a Client large enough to process all of the slice’s data, Agent will need to schedule intermediate shuffling or data movement stages. See the section “Agent’s logic” below for further discussion.
Slices are related to data distribution on the client side. Partitions are related to data distribution on the server side of completing the Read job.
If Coordinator hasn’t pushed any processing logic back to the client side, Coordinator must return each partition wrapped into its own slice (1-1 correspondence), because partitions will already contain the results of the table processing plan.
In the context of this protocol description, slices and partitions are logical and are defined by their filters: boolean-valued functions encoded in Substrait. Partition may not coincide with the notion of data partition that may be used within the database, such as a physical file or section containing a portion of the table data. Some databases also use the term data segment for the latter; below, I’ll also call them “data segments” to prevent confusion with “partitions”, as I use “partitions” in a different meaning here.
An informal expectation set by the protocol is that the server side only returns partitions that Servers can effectively fetch without touching (most of) the rows that don’t match the partition filter. This can be achieved by checking the partition filter against the data segment’s metadata, or if Servers have indexes for these columns (or have the data segment’s rows sorted by these columns, which traditional databases call “cluster indexing”). However, partitions could be more granular than data segments: one segment corresponds to multiple partitions, less granular: one partition includes multiple segments, or a mixture of these: more granular along some dimensions and less granular along others.
Neither Agent nor Clients have to be able to execute the partition filter in its entirety, for example, if the database uses a proprietary hash function for data partitioning, or if they use a partition key that doesn’t end up a result column. Still, filters have to be encoded in Substrait so that Agent can extract the remaining partitioning information from them to inform the workload distribution among Clients.
Along with each slice filter, Coordinator returns to Agent the slice processing plan that Servers could still apply to the rows in this slice on their side, before returning the results to Clients3. The slice processing plan could be a simpler part of the table processing plan, potentially all the way to simple fetch of the input table’s columns and returning them to Clients without any processing on the server side. For consistency, each slice processing plan also includes the corresponding slice filter.
The reasons why Coordinator may push processing back to Clients for the slice could be:
None of Servers that host the slice’s data can perform the table processing plan in full, e.g. if Servers don’t know how to apply the specified aggregation, or
The table processing plan has a window or a group-by aggregation that demands that rows should be moved between Servers, and the database doesn’t want to do any distributed processing for this Read job and prefers to push it back to the client side that is specialised in distributed processing.4
For example, if Agent sends to Coordinator the table processing plan that includes a group-by on a column that is not part of the database’s distribution/partitioning key for the table, Coordinator can return a single slice whose filter is taken from the table processing plan, and the slice processing plan which is a simple fetch of the input table’s columns. In other words, Coordinator has pushed all processing apart from the predicate pushdown back to the client side.
Note that the processing push-back may not be uniform across all slices. The partitioning strategy for the table in the database may differ for recent data and older data segments. This may affect the capacity of the Servers that host this or that partitions to perform the table processing plan on their side.
Slice and partition breakdown
Filters of all returned slices must logically add up (i.e., if OR’ed together) to the table processing plan’s filter, and the filters of all partitions within a slice must logically add up to their parent slice’s filter.5
Partition filters can include conditions that determine data distribution/segmentation on the server side, as well as conditions on sort columns or columns with range access indexes within individual data segments. Since Agent may not know the database’s internal data partitioning scheme for the table, let alone the indexing strategy, Agent doesn’t know a priori how many slices and partitions Coordinator will return, and what extra conditions will Coordinator include in slice and partition filters.
The slice and partition breakdown returned by Coordinator is partially determined by the distribution of data segments on Servers6: Coordinator cannot return fewer partitions (and, therefore, slices, because every partition can belong only to a single slice) than is necessary to reflect the differences in the lists of Server locations that host different partitions (see the section “Partition locations” below). For example, if one data segment is stored only on Server A and another segment is stored only on Server B, Coordinator cannot return fewer than two partitions in total. Meanwhile, it depends on the semantics of the table processing plan whether Coordinator will put these two partitions within a single slice (if the execution of the remaining parts of the table processing plan will require moving around partition rows), or can put each partition into a separate slice.
The above consideration may still under-determine the slice and partition breakdown. Coordinator may have a latitude in augmenting the slice and partition filters with different columns from the database’s partitioning key for the table, or with different columns that are indexed on the data segment level.
If Coordinator has pushed back to the client side a part of the table processing plan that includes a time-based window aggregation or a grouping aggregation, and the time or grouping columns can be included in the partition filters, Coordinator must do this in order to help the client side reduce fan-in/out factor of the ensuing shuffle or aggregation.
Further, partition sizing can depend on some client-side factors:
The maximum number of Clients that can consume data in this Read job,
Whether these Clients all consume the same data (such as, if the client executes a broadcast join with the table consumed in this Read job) or not, and
The priority and latency requirements of this Read job: such as, whether this Read job is a piece of an interactive user-facing query or a low-priority background job.
As well as server-side factors, such as the priority of the database to serve this client, or permit this client to consume only a small portion of the database’s network and disk IOPS and bandwidth, so that the latency of the main query load and data ingestion traffic in the database is unaffected by this Read job.
The main tradeoff in partition sizing is the following: more IOPS and processing overhead on the server side (bad for the server side, if it minds this) could enable better saturation of the bisection bandwidth between Servers and Clients, or more fine-grained or favourable distribution of workload on the client side.
Designing a generic algorithm or protocol for negotiating all these factors between Agent and Coordinator is not the goal of this article. I’ve called partitioning hints whatever information Agent will send to Coordinator in this negotiation at step #1.
Perhaps suffice to say here that there exists a baseline heuristic that doesn’t require any negotiation and should work at least as well as the read path with open table formats (Parquet, Hudi, or Delta Lake): Coordinator can size the total number of partitions that it returns to Agent to approximately match the number data segments that pass the table processing plan’s filter. These partitions could correspond to data segments one-to-one, or group together approximately N data segments whereas approximately 1/Nth of each data segment’s rows are selected. As was noted above, this “alternative partitioning” could be motivated by the grouping or time-based aggregation in the table processing plan.
Partition locations
For every partition, Coordinator returns not a single Server location, but several locations that can serve that partition. This is possible if the database’s serving replication factor is greater than one.
Returning several locations enables Agent to optimise the physical placement of Clients and load distribution between Servers. Coordinator does not know how to optimally distribute the load between Servers yet because Coordinator doesn’t know the details of the overall distributed job that Agent is doing (the Read job that is the subject of Table Read protocol is a part of this overall job, but perhaps not the only one). In turn, these details may not be determined by Agent before it receives the partition breakdown and Server locations from Coordinator, so Agent cannot pass all the necessary information to Coordinator at step #1. See the section “Agent’s logic” below for further discussion.
Coordinator returns serving hints alongside each location. Serving hints could be specific for each unique partition—(Server) location pair, not just location alone. Serving hints could include:
Performance priority hint can define the order among locations that Coordinator deems optimal. This may be informed by
The data sorting and indexing may be different in different locations and thus more or less conducive to their parent slice processing plan (and the expected additions to these plans with grouping columns at step #3).
The extra costs: a location could point to a serverless function (e.g., AWS Lambda) that will pull the partition data from the object storage (e.g., S3). The interaction with this location will cost extra money for both serverless runtime and object storage access. Still, this could be a viable fall-back location if the “primary” Servers fail, or cannot sustain the Read job’s bursty bandwidth requirements.
The maximum bandwidth limit that the Server will cap this client or this Read job with. As noted above, the database may intentionally limit the IO resources that can be consumed by demands from external clients.
Current load of the Server, if known to Coordinator.
Server-side processing behaviour: whether the Server always accepts execution of the corresponding processing plan (as sent to it at step #4), always refuses execution (e.g., if this is a “thin” Server that doesn’t have the resources for any non-trivial processing), or dynamic: the Server accepts or refuses execution based on the current load of the Server at the moment of the request from Client (step #4).
Serving parallelism limit: the maximum number of partitions the Server can serve concurrently to all Clients within this Read job.
Agent’s logic
Agent determines which partitions each Client should request and consume from Servers based on the overall processing job that it is doing. This overall job may be equal to the Read job which is the subject of Table Read protocol interaction, or the Read job could be just a part of a bigger, distributed (cross-database) join, a broadcast join, or something else. Then,
For each Client:3. Agent → Client:
[ (slice processing plan,
[partition: (filter, locations, consumption token),]), ]
The lists of partitions for the specific slice (identified by slice processing plan here) that Agent sends to different Clients shouldn’t necessarily include all partitions for the given slice as Coordinator sent them to Agent at step #2. Indeed, in many Read jobs, these lists of partitions in multi-partition slices will be reduced to just one specific partition sent to each Client. Consider the example that was used above, in which Coordinator receives the table processing plan with a grouping on a column that is not a part of table’s partitioning key on the database side. Thus, Coordinator pushes back this plan and returns a single slice with many partitions. Upon receiving such a slice and partition breakdown and faced with the need to perform grouping and aggregation on the client side, Agent sends the partitions to different Clients that will just shuffle the rows by the grouping column. Agent also arranges that the shuffled rows are consumed by another stage in the processing pipeline that does the final aggregation.7
Agent may throttle sending these requests to Clients rather than doing them in one burst if doing the latter would predictably saturate the serving parallelism limits of some of the Server locations.
If the client and server sides share the resource manager, and Agent spawns or selects Clients specifically for this Read job, Agent should use the information about Server locations for network-aware scheduling.
When sizing memory and CPU resources for Clients, Agent should also take into account the server-side processing behaviour of the Servers each Client is going to consume data from: that is, if Servers always accept the slice processing plan, Clients are guaranteed to use fewer resources than if Servers always refuse processing or have dynamic behaviour.
Agent could also modify Server location’s serving hints when Agent sends it to different Clients. Imagine 100 Clients do broadcast join and each Client needs to consume the same partition. Coordinator indicated that two Server locations can serve this partition. Agent can then explicitly modify priority hints of the two locations sent to Clients such that exactly 50 Clients think that one Server is preferable, and 50 Clients think that another Server is preferable. This is needed because Clients don’t communicate and coordinate with each other in Table Read protocol. Coordinator may have indicated that these two servers have equal priority. Absent of Agent’s tie-breaking, Clients may only select Server locations with equal priority at random, but this may skew the load of Servers unnecessarily.
Another reason to modify Server’s priorities (as sent to different Clients) is to ensure that no Server will hit its serving parallelism limit, at least in the “no failure case”, that is, when all Servers respond to Clients’ requests and the load is not redistributed to the remaining Servers.
For the description of the consumption token, see the section “Preemption of partition consumption” below.
Client’s logic
Steps 4-9 determine the exact Server each Client is going to consume each partition from. In these steps, Clients and Servers also determine whether the slice processing plan will be executed on the server or the client side.
Steps 4-9 are embedded in somewhat complicated asynchronous logic, executed on each Client. This complexity addresses the need to handle dynamic server-side processing behaviour, rejections, Server unavailability, and backoff due to exceeded serving parallelism limits.
location_queue = new concurrent queue<(location, partition)>
Upon receiving the step #3 or step #9 request from Agent:
for every received (slice_processing_plan, partitions):
for partition in partitions:
partition.processing_plan = slice_processing_plan
partition.try_server_side_processing = true
partition_queue.add(partition)
consumer_queue =
new concurrent queue<(partition, location, consumer info, plan)>
partition_queue = new concurrent queue<partition>
async for every (location, partition) from location_queue:
if partition.try_server_side_processing and \\
location.serving_hints.server_side_processing_behaviour != Always Refuse:
server_side_plan = partition.processing_plan
my_plan = no_processing
else:
server_side_plan = no_processing
my_plan = partition.processing_plan
4. Client → Server (identified by location):
partition.filter, server_side_plan, partition.consumption_token,
consumer_hints
5. Client ← Server:
status: Processing Accepted | Processing Refused |
Serving Rejected | On Hold,
consumer_info
if Processing Accepted:
consumer_queue.add(
(partition, location, consumer_info, my_plan))
else:
if Processing Refused:
partition.locations[location].refused = true
elif Serving Rejected, or Server didn't respond:
// Try with another candidate Server.
partition.locations.remove(location)
elif On Hold (if Server reached its serving parallelism limit):
// Try with another candidate Server, or wait before a retry
// if there are no other Servers.
partition.locations[location].on_hold_backoff_period = ...
partition_queue.add(partition)
async for for every partition from partition_queue:
if all(map(partition.locations, lambda loc: loc.refused)):
// All Servers refused to do the processing on their side,
// now ask them again without imposing any non-trivial plan.
partition.try_server_side_processing = false
location = ... (Select a priority location for this partition
that hasn't rejected serving yet and hasn't refused the slice
processing plan (if any), while respecting locations' "On Hold"
backoffs.)
if location:
location_queue.add((location, partition))
else:
6. Client → Agent: partition.filter, partition.processing_plan
// Steps 7-9 repeat steps 1-3 on the scope of this specific
// partition's filter and parent slice processing plan:
7. Agent → Coordinator:
partition.processing_plan & partition.filter, ...
8. Agent ← Coordinator: ... // See step #2
9. Agent → Client(s): ... // See step #3
// Then Client(s) that received step #9 requests execute the
// "Upon receiving the step #3 or step #9 request from Agent:"
// block above.
async for every (partition, location, consumer_info, plan)
from consumer_queue:
(See section "Streaming column data from Server to Client" below.)
In step #4, Client sends to one selected Server the partition filter and the corresponding slice processing plan. Server can respond (step #5) with one of the following statuses:
Processing Accepted: Server is going to execute the requested processing plan on its side. Server may start background disk or object storage I/O and processing plan execution immediately, expecting Client to start requesting the results soon (see steps 10-… below).
Server also pins all data segments underlying the requested partition, so that even if table data rebalancing is initiated in the database cluster concurrently, Server is guaranteed to keep these data segments until after the streaming of this partition’s data from Server to Client is complete. However, this partition on this Server may already become inaccessible for all other requesters, ensuring that the Server is not stuck with keeping this partition indefinitely because some requesters repeatedly request to serve it.
Processing Refused: Server refuses to execute the requested processing plan because Server is CPU- or memory-bound at the moment, but Server can still serve the input columns of the table to Client. Client will try to find another Server location that won’t refuse to execute the partition plan. Client must be able to execute the plan on its side if all Server locations for the partition have refused executing the plan.
Server must not respond with “Processing Refused” if the requested processing plan is trivial (called no_processing
in the above pseudocode block), i.e., after Client already got a refusal from all candidate Servers and now tries just to fetch the input table columns. However, if Server still wants to shed this Client’s request due to temporary overload, it can respond with “On Hold” status.
Serving Rejected: Server no longer serves the requested partition. This can routinely happen if table data rebalancing has happened in the database cluster since Coordinator’s response at step #2. If all locations in Client’s list reject serving the partition or go unresponsive, Client initiates another round of fetching the partition’s Server locations from Coordinator: see steps 6-9.
Note that the database cluster (represented by Coordinator during this Read job) is not obliged to halt all cluster rebalancing operations over the entire length of the Read job. Such pinning is only needed for the specific Server—Client interaction since the Server has responded with “Processing Accepted” and until the data consumption is complete, as noted above.
On Hold: Server can serve the partition, but not right now because Server’s serving parallelism limit is reached, or the Server is generally overloaded. Client can then request the partition from other locations, or from the same Server after some backoff period. (Note: the exact back-off logic is not detailed in the above pseudocode.)
Consumption structure: column groups
If Server returns “Processing Accepted” at step #5, it also returns consumer info. Consumer info specifies how result columns can be consumed by Client from Server. Consumer info can have the following data type:[column group:
[column: [section: (transfer encodings, indexes),],],]
First, columns are arranged into one or more column groups. Column groups are the way for Server to indicate that some groups of columns must be consumed by Client in lockstep, that is, with a shared consumption offset.
Server may want to impose this limitation because there is a shared I/O or CPU processing that is needed to serve these columns, and if Client was free to consume these columns independently one after another rather than in lockstep, it would make Server either hold the I/O or CPU processing results for the whole partition in memory, which might not be even possible if the partition’s data is bigger than Server’s memory, or repeat this I/O or CPU processing multiple times.
The opposite of this is shared-nothing column I/O or processing, such as when Server’s column serving amounts to fetching these columns from files on disk in a columnar format like Parquet or Lance, and relaying the column data to Client over the network. In this case, even larger-than-memory column consumption doesn’t impose shared I/O, and columns could be consumed by Client serially, i.e., one after another (almost) as effectively as in lockstep. Then, Server can return each column nested in its own column group. Client could still consume them in lockstep if its own processing logic demands this, but is not obliged to.
Column sections and transfer encodings
Second, each column is divided into one or more sections, and each column section can be transferred in one encoding from the list of available transfer encodings for the section.
The whole purpose of sections is to reflect the differences in the lists of available transfer encodings (and indexes, see below) across sections. In turn, the point of offering Client to choose among transfer encodings is permitting transfer in the highly compressed on-disk column format, in addition to the standard Arrow columnar format.
Thus, as well as slices and partitions, sections are logical entities and their boundaries may not coincide with the boundaries of the data segments underlying the requested partition on this Server.
If Server does non-trivial processing of a column, Server is expected to return just one section for this column, offering either only one transfer encoding (Arrow), or two encodings: Arrow and a re-compression transfer encoding.
If Server does not process column data before serving it to Client, Server can return the list of sections corresponding to the list of data segments underlying the requested partition. Each section offers two transfer encodings: the on-disk format in which the column is stored in the specific data segment and the Arrow encoding. Client can always fall back to the Arrow encoding if it doesn’t know how to deserialise Server’s on-disk encoding for the given column section. In this case, section boundaries for the column do coincide with data segment boundaries.
Columns within a single group must have the same number of sections, and these sections must hold the same numbers of result rows so that streaming the column group from Server to Client can be organised with a single consumption offset, as indicated above. The exact row numbers themselves may not be known in advance, such as if Server performs row-level filtering.
Columns within different groups, however, may have different numbers of sections, or even different counts of result rows altogether. Cf. “Feature 3: Flexibility” in Lance v2 description. This feature could be used to access columns in feature stores like Chronon and other data stores with materialised aggregations, such as Materialize or RisingWave.
If Client prioritises reducing the network data transfer size over everything else, such as if this Read job exports data from the database hosted in a cloud provider that charges for egress traffic, Client may send the desired (highly compressed) column encodings as a consumer hints at step #4, and Server may agree to perform this re-compression on its side by offering these transfer encodings back, alongside the on-disk and Arrow encodings.
Column indexes
Apart from different transfer encodings, Server may also offer Client to transfer some of the available indexes for each column section, such as inverted index, Bloom filter, geo-index, etc. (see examples of Apache Pinot’s indexes), provided that Client knows how to deserialise (access) and use them.
Arrow’s Dictionaries for dictionary-encoded columns can be treated as a kind of index, too, so that Client can skip downloading the dictionaries for some columns if Client doesn’t need them.
The option to transfer indexes alongside column data is also useful for table or database replication systems that can be built on top of Table Read protocol.
Column sections vs. partitions
The notion of column sections may appear to duplicate the notion of partitions themselves: why not mandate that each partition spans at most one data segment (or a fraction of a data segment) that ensures that each partition can have only one list of available transfer encodings and one list of available indexes, obliterating the need for sections?
I kept column sections in the protocol to leverage databases with Oxia-based (or “Oxia-style”) metadata management, that is, databases in which detailed data segment’s metadata is co-located on Servers with the data segments themselves. In such systems, Coordinator doesn’t hold most of the metadata and even the specific data segment boundaries that appear on the Servers, and partitions are not only Table Read protocol-introduced logical unit of table data but also the database’s own internal unit of table data distribution, more coarse than data segments.
I don’t know of any database that uses Oxia in particular, yet. Some databases may already have Oxia-style metadata management, but I’m not sure. In any case, I’d argue that this style of metadata management would make a lot of sense for scalable OLAP data warehouses: storing all metadata in ZooKeeper is a known scalability bottleneck of Apache Druid and Apache Pinot, for example, among other systems. So, I expect more databases to adopt Oxia-style metadata management in the future.
Streaming column data from Server to Client
Executed on each Client:
async for every (partition, location, consumer_info, plan)
from consumer_queue:
// Sequentially, async, or in parallel:
for every column_group in consumer_info:
num_sections = len(column_group[0])
// Sequentially only!
for section_number in range(num_sections):
per_column_transfer_encodings =
map(column_group, lambda col: ... (Select the transfer
encoding among the options, based on Client's priorities
(speed vs. transfer size) and what it can decode.))
indexes_to_transfer =
... (Select what Client needs for its plan.)
10. Client → Server:
group.id, section_number, row_offset,
per_column_transfer_encodings, indexes_to_transfer
11-... Client ↔ Server:
data plane characteristics negotiation // Multiple steps
1X-... Client ↔ Server:
async streaming of Arrays w/ flow control // Many steps
async apply the plan and the downstream logic imposed by Agent
(out of scope of Table Read protocol) to the accumulated
column data; depending on the downstream needs, this may also
be outside of the (parallel)
"for every column_group in consumer_info:" block,
and applied to all column groups in lockstep.
Steps 11-… constitute a generic streaming protocol for columnar data that I designed specifically for this Table Read protocol because Arrow Dissociated IPC appeared too ad hoc to me. Dissociated IPC protocol is also just a couple of months old, so I’m not breaking some established convention here. But the rationale and functionality of this proposed streaming protocol are the same as of Dissociated IPC: enable packing server-side streams of (Arrow) Arrays into contiguous regions of memory on Client, ready for parallel processing, e.g., on GPU.
Another objective of the new streaming protocol for columnar data is to be flexible enough to accommodate non-Arrow-encoded columns, such as those that are permitted by different column transfer encodings (see discussion in the previous section).
Note that each column group’s section is an independent streaming protocol interaction between Client and Server, with independent flow control. This is because different sections’ column transfer encodings may result in different data plane configurations in the streaming protocol.
Client must not request sections within a column group concurrently, only sequentially. Server may decline a streaming protocol interaction if another interaction (with the previous section number) is still in progress. This is the memory control provision for the server side. If the client side needs more transfer parallelism, Agent may request that as a partitioning hint at step #1 above to begin with, and Coordinator may respond with smaller partitions.
Client may consume different column groups concurrently, although as an extra resource-controlling measure, perhaps Server may add to consumer_info
at step #5 an indicator that column groups must also be consumed by Client only sequentially. If Server permits concurrent consumption of column groups, Client may consume them in lockstep because the streaming protocol uses application-level flow control embedded in Client’s code, rather than transport-level or otherwise “hidden” flow control. This idea is inherited from RSocket and Reactive Streams.
Transfer semantics of indexes (see the section “Column indexes” above) are equivalent to transfer semantics for DictionaryBatches: see the discussion of DictionaryBatches in the streaming protocol description. In fact, Dictionaries themselves are treated just as a specific kind of index.
The row_offset
that Client sends to Server at step #10 is a provision for Table Write protocol, which is based on Table Read protocol. For Read jobs that are not parts of Table Write protocol’s interactions, row_offset
always equals 0.
Data segment-wide buffer tagging and transfer deduplication
The transfer of data segment-wide buffers in the style of Lance v2 (called “file-wide buffers” there) may need to be deduplicated across different column groups. It should be relatively straightforward to implement tagging and transfer deduplication for such buffers, for example, by off-loading their transfers a separate “synthetic stream” (see “mode 4” in the streaming protocol for columnar data, and the section on “multiplexing”). Client can then check if it still holds a copy of this buffer, and skip its transfer if yes.
Availability, consistency, and resilience
Table data availability
If some portion of data segments needed for the Read job are temporarily unavailable on the database side, for example, if the only Server hosting them is restarting, Coordinator should return an empty list of locations for the corresponding partitions. Agent can then periodically retry step #1 with the slice processing plan specialised with the partition filter, as at step #7.
Steps 6-9 (see the section “Client’s logic” above) cover the possible partition unavailability if the database side performs table data rebalancing among Servers concurrently with the Read job.
High Availability (HA) of Agent and Coordinator
High availability, as well as progress checkpointing and crash recovery of Agent and Coordinator are out of scope for table transfer protocols. If some client or a database chooses to implement high availability or either Agent or Coordinator, respectively, these node roles might be played by distributed systems in themselves (or, a Kubernetes pod). But as far as the table transfer protocols are concerned, they are single nodes, however.
By default, a crash of either Agent or Coordinator entails a failure of the Read job. Agent and Coordinator should check each other for aliveness, and terminate all protocol interaction’s activities and resources from their side (client or server side, respectively) if they detect that their counterpart is down.
Read consistency
Support for read consistency in the face of failing partition requests (due to table data rebalancing on the database side) and failing Clients can be layered on top of the above description of Table Read protocol. At step #2, Coordinator may augment all slice processing plans with a special predicate that ensures that a certain snapshot of table data is read. This special predicate may be implemented as a Substrait extension if the server side’s table consistency model lays outside of table semantics, or simply as an extra filter a-la row._last_updated < $TIME
if this is the database’s approach to consistent reads.
More sophisticated distributed databases may use a vector clock with MVCC sequence versions rather than a simple scalar time. If the vector clock is too huge to pass around Clients and Servers, Coordinator may generate a shorthand “id” for this vector clock specifically for this Read job. However, with such a design, Servers should dereference this “id” upon receiving requests at step #4 from Clients, which also takes extra time.
Regardless of the server side’s consistency model, all the steps after step #2 in this Read job just propagate the slice processing plans downward without touching the parts they don’t understand, including these predicates for read consistency.
Upon receiving the step #7 request from Agent, Coordinator recognises that the requested table processing plan already includes the read consistency predicate, thus realising that this is a retry request, and maintains the predicate in the processing plan(s) returned at step #8.
For read consistency to be maintained at step #8, Coordinator must prepare its response wrt. a serlializable metadata view, that is, the mapping between Servers and data segments that they host. This requirement is trivially satisfied if Coordinator is a single node, but is more demanding if Coordinator is a distributed system itself (see the previous section). Most open-source databases use ZooKeeper to ensure this. Oxia also offers similar guarantees.
Preemption of partition consumption
If Agent loses the connection to a Client that is in the process of consuming a lot of data from the server side, assumes the Client dead, and respawns a new Client to do the same consumption, while the original Client is actually still alive and keeps consuming data from Servers, this “accidental overprovisioning” of Clients endangers Servers’ serving parallelism limits and may make the “secondary” Client(s) predictably fail due to timeouts, and either fail the entire Read job along with it or at least make the Read job several times longer.
To prevent this failure scenario, Agent should generate a “consumption token” for each Client—partition pair and send them to Clients at step #3. When Agent assumes some Client is dead and respawns a new Client, it creates new tokens that refer to the dead Client’s tokens as “parents”. Then, if Server receives requests (step #4) with new tokens, it immediately aborts the partition data processing and streaming associated with parent tokens, thus freeing the necessary resources.
Consumption tokens could also be used to implement workload re-distribution (”stealing”) from slow Clients or Servers. This problem may be a result of data skew.
Table Read protocol: implementation notes
Table Read protocol’s implementation complexity could be partially mitigated by creating libraries for certain operations that different node roles should perform, particularly surrounding Substrait plan wrangling. It should be possible to implement these functions only once by using Rust to implement the logic and FlatBuffers to pass Substrait plans across the language boundary in Java, C++, or Go runtimes from and to Rust.
Default implementations of the streaming protocol for columnar data could be derived from RSocket implementations.
Table Read protocol overview and comparison with Arrow Flight
In this section, I summarise the differences between Table Read protocol and Arrow Flight. This section doesn’t add new information about the protocol over the “Table Read protocol walkthrough” above. For the differences in scope between Table Read (and Table Write) protocol and Arrow Flight, see the section “Cross-cutting and out-of-scope concerns” above.
Table Read protocol makes load distribution (both on the client and server sides), reliability, awareness of/readiness to table data redistribution in the database, network data transfer optimisation, and database-side resource (CPU, memory, disk I/O, network I/O, etc.) management explicit, first-class concerns.
This enables flexible layering of other protocols and systems on top of Table Read protocol. Table Write protocol is the foremost example: it leverages the properties of Table Read protocol (wrt. load distribution, reliability, and other concerns) to provide efficiency and exactly once writing guarantees. I will describe Table Write protocol in the next article (spoiler: client and database sides are swapped, and the database nodes consume “partitions to be written”).
Table Read protocol’s amenability to composition is due to these concerns (load distribution, reliability, resource management, etc.) shared between the client and the server sides in the Read job.
On the surface level, Arrow Flight is simpler than Table Read protocol, but for achieving the same properties as Table Read protocol wrt. load distribution, reliability, etc., a lot of complexity should be encapsulated on the database side. In fact, cooperative system design allows achieving these properties using simpler techniques. Therefore, the database-side complexity for implementing “resilient Arrow Flight” should be even greater than the complexity of Table Read protocol, described above. So, this (relative) simplicity of Arrow Flight is deceptive.
Moreover, achieving some of these properties is impossible without a layer of database-specific reverse proxy nodes between “core” database nodes and Clients. Usually, only “very serious” cloud-native data warehouses such as BigQuery, Redshift, Azure Synapse, or Snowflake have such proxies in their design, and most open-source OLAP databases don’t have them. Also, these proxies prevent data transfer off-loading to NVLink- or InfiniBand-based RDMA protocols8.
I defer a comprehensive comparison of table transfer protocols with open table format-based catalogs (Iceberg, Delta Lake, and Hudi) to the following article.
Thanks to Rill Data for sponsoring this work. Rill Data is interested in developing the open data ecosystem rather than promoting any specific database solution. Rill’s technology is compatible with various databases mentioned in this article, including ClickHouse, Apache Druid, and DuckDB.
See “BigLake: BigQuery’s Evolution toward a Multi-Cloud Lakehouse” (Levandoski et al., 2024).
At the moment, I know only about Apache Doris and InfluxDB to implement Arrow Flight.
Individual Servers could still push back processing to Clients on the partition level: see the discussion of server-side processing behaviour hint below.
This could be additionally controlled by a boolean hint that Agent sends to Coordinator about whether the client side prefers Servers to take up this distributed processing stage, or prefers Servers to push it back to Clients.
With a possible exception for certain window aggregation processing semantics: it may demand partition filters to “overhang” their parent slice’s filter.
Either as primary durable storage or as disk cache for data segments durably stored in the object storage: I mentioned these different designs in the previous article.
The interface between shuffling Clients and the second stage of such a processing job is outside of the scope of Table Read protocol.
Excepting, perhaps, the cloud provider’s own accelerator platforms that are privy to the corresponding data warehouse’s internal APIs, such as Google’s Vertex AI (integrated with BigQuery), Amazon’s SageMaker (integrated with Redshift), etc.
Wow! Very informative page! And thank you for referencing Memoria.