Table Write protocol for interop between diverse OLAP databases and processing engines
In the previous article, I argued that there is an opportunity for creating a new family of protocols: table transfer protocols, namely Table Read and Table Write protocols to address the M x N interoperability problem between OLAP, timeseries, vector, and search databases on the one hand and data processing/query and ML engines on the other hand.
I discussed why existing technologies and protocols that tackle this interoperability problem, namely ADBC (Arrow Database Connectivity), BigQuery Storage APIs, open table formats (Apache Iceberg, Apache Hudi, and Delta Lake), and Arrow Flight are insufficient and have different downsides. Arrow Flight comes closest.
I’ve already described the common design principles for table transfer protocols and a more concrete proposal for Table Read protocol. In this article, I propose semi-concrete designs of Table Write protocol and a table replication method based on Table Read and Table Write protocols. In this series’s next and final article, I will review table transfer protocols again and discuss table transfer protocols’ place in the disassembled database architecture and data stack.
Use cases for Table Write protocol
The ETL loop
The need for distributed columnar writing (data ingestion) appears when a processing engine has completed a distributed computation job (after pulling data onto multiple worker nodes from some database(s) using Table Read protocol, or from a data lake) and wants to write the results back into the database into a new table, or update existing rows.
Arrow Flight doesn’t permit data ingestion from distributed writer nodes.
Open table formats (Iceberg, Hudi, Delta Lake) permit distributed writes, but if the target database uses these table formats, the read path of Arrow Flight becomes unnecessary: processing engines can read table data directly from the object storage.
On the other hand, open table formats are inefficient in many OLAP and HTAP use cases, as I argued in “The future of OLAP table storage is not Iceberg”. So, it would be sad if the recent diversification of processing engines and ML runtimes pushed more data teams to use Iceberg as their only OLAP storage, unnecessarily leaving efficiency on the table and locking into their dependence on cloud object storage.1
Thus, Table Write protocol’s primary motivational use case is distributed writing of processing job (or ML inference) results from the processing engine’s nodes back into the database.
This use case enables closing the loop:
The processing engine does a distributed read of input data from the distributed database using Table Read protocol,
The engine performs a distributed processing job,
The engine writes the results back into the database in parallel from multiple worker nodes or GPUs via Table Write protocol.
In the above loop, columnar table data is not passed through a single node at any point, or possibly even through CPU memory at all2.
Table Write protocol can also be used to ingest data from a single node (a single data transformation/generation process, or a single-node source database). Even in this case, Table Write protocol should have advantages over Arrow Flight: writing different data partitions to multiple (distributed) database nodes, and the possibility of achieving resilient writing with exactly once delivery guarantees with less end-to-end implementation complexity than Arrow Flight would require3, similar to how Table Read protocol enables consistent and resilient reads with less end-to-end implementation complexity than Arrow Flight.
Distributed table replication between different database technologies
Another target use case for Table Write protocol is distributed table replication via a combination of Table Read and Table Write protocols. The main difference from the above use case (“the ETL loop”) is that there is no processing engine between the source and target databases.
Additionally, such a table replication method would enable the most efficient and consistent (a la change data capture) import/export of tables between different databases that use different storage formats, without intermediary Kafka or Parquet/Iceberg storage.
The best Table Write protocol is Table Read protocol in reverse
The primary way to ingest data with exactly once delivery guarantees into a distributed database is to give database nodes access to a shared log of records to write (or commands/operations to execute) and let the nodes atomically commit (or reach a distributed consensus about) the read offsets within the log. If during ingestion some node crashes or becomes unavailable to the part of the cluster that maintains the consensus, the database’s cluster manager restarts consumption from the offset in the log of the last record that has been consumed and processed persistently (as known to the database’s consensus).
On the source data ingestion path (i.e., before ETL/ELT), the role of this shared log is usually played by Kafka or similar systems: Kinesis, Google Pub/Sub, Redpanda, WarpStream, and others. These systems are populated with data from IoT gateways, change data capture agents, log and metric collectors, etc.
The approach with database-side offset management is recommended in KafkaConsumer
documentation to achieve exactly once delivery guarantees. This approach is also extensively described in Apache Pinot’s design document for partition-level Kafka consumption.
Apart from exactly once delivery guarantees, this pull-based ingestion approach also permits the database to control node assignment for ingesting each data partition, and change this assignment at any time. The latter may be needed in the event of database cluster scale-up, scale-down, or data rebalancing that happens concurrently with a long-running data ingestion job.
With push-based ingestion, this flexibility is only possible if there is a layer of proxy nodes in front of the “core” database server nodes, or if the database’s cluster and data distribution management is fully externalised to Kubernetes (the same cluster that hosts the nodes that do the “push”). Neither of these is a common OLAP database setup.
Yet another advantage of pull-based ingestion from a shared log is “almost free” replication, as database replica nodes (for a given table, partition, or data segment) can all consume data from the same log, instead of transmitting records between each other after one of the replicas have consumed them. The latter approach (“internal replication”) requires extra burdensome background processing, internal buffers, etc. This could be entirely avoided with pull-based ingestion from a shared log.
BigQuery Storage Write APIs offer concise, push-based ingestion API with exactly once delivery guarantees by providing a simplified version of Kafka producer API. In other words, BigQuery internalises the shared record log mentioned above. If the client pushes data into BigQuery via consumption from another log or a system that offers consistent-at-the-offset reads of records to ingest, such as a Flink or Spark Streaming processing pipeline, BigQuery’s approach becomes wasteful: in effect, there are two duplicative systems providing semantics of a log in front of the “core” database nodes.
Per Table Write protocol’s design use cases, writing sources are specifically processing engines such as Flink or Spark that already provide offset or block-based consumption in their “sink” interfaces, or other databases (when Table Write protocol is used for table replication) that could provide consistent-at-the-offset reads via Table Read protocol. Therefore, it would be wasteful for Table Write protocol to use BigQuery’s push-based approach to ingestion.
It turns out that Table Read Protocol already takes care of most aspects that would be nice to have in an efficient pull-based ingestion protocol, namely at-the-offset read consistency, load distribution and resource usage control, fault tolerance, control of transfer encodings for columns, and streaming of columnar data (including GPU off-loading).
This leads to a decision to make Table Write protocol essentially Table Read protocol in reverse: that is, the source side (such as a processing or ML engine) in the Table Write protocol interaction (called the Write job below) acts as the server side in the “underlying” Table Read protocol interaction (aka the Read job) and the target side in the Write job acts as the client side in the underlying Read job.
Another benefit of using Table Read protocol in reverse as Table Write protocol is getting a no-intermediary distributed table replication method “for free”. All databases that implement the server side of Table Read protocol can act as sources in the Write job. See the “Table replication method” section below for more details.
If a processing engine implements both the client and server sides of Table Read protocol for the “ETL loop”, it could also use Table Write protocol for inter-stage data exchange. DataFusion Ballista uses Arrow Flight for this purpose.
Table Write protocol: details
Node roles
I’ll use the same terms for node roles as in Table Read protocol.
The Write job’s orchestrator/controller node role on the source side (such as a processing engine) is still called Coordinator.
The orchestrator/controller node role on the target side (i.e., the database the data is written into) is called Agent.
The nodes on the source side that hold the data to be written and will send it to the target are called Servers.
The target side’s nodes that receive and store data are called Clients.
If either the source or target side of the Write job is a single-process data-framing library runtime, an embedded database, or a single-node database, the above node roles are played by threads or coroutines within that process.
Out-of-scope: transaction semantics
As I described in the previous post, table transfer protocols are “headless”. This means that table transfer protocols don’t define transaction semantics for ACID guarantees.
Transactions are usually managed by the target database4. Some OLAP databases use the standard SQL transaction protocol (BEGIN…COMMIT, or implicit autocommit of SQL statements) and others have different ways to organise ingestion: for example, Apache Druid has “indexing/ingestion tasks”. Also, many OLAP and timeseries databases manage continuously running services to ingest data from Kafka or other perpetual data sources.
Thus, Write jobs are supposed to be embedded in either:
Transaction protocol interactions, that are jointly managed by the transaction manager and the transaction client (roles) on the target and source sides, respectively. An SQL console is a typical example of a transaction client.
Data ingestion tasks/jobs/streams, that are jointly managed by the ingestion manager and (optionally) the ingestion client (roles) on the target and source sides, respectively. Here are some examples of ingestion clients: a process with a SparkContext or a SparkSession, an ETL/ELT tool, a table replication system, a feature store, or an ML/MLOps platform.
Transaction and ingestion clients should interact with the target database directly before starting the Write job, and then “commit” the results after the Write job is completed if the database requires explicit transaction commits or completion signals for data ingestion jobs.
Write job initiation
Transaction Client or Agent → Coordinator: READ_FOR_WRITE Agent location, job context, table processing plan,
partitioning hints
READ_FOR_WRITE
is an optional step in the Write job. Upon receiving this request, Coordinator behaves the same as when it prepares the step #2 response in Table Read protocol, except that it sends the response as a WRITE_INIT
request to the given Agent location and with the given job context.
READ_FOR_WRITE
request may be sent by Agent itself in a replication job (see “Table replication method” below for details), or when Agent retries to get Server locations of temporarily unavailable partitions. In this case, if Agent doesn’t know its own public address, it may use the reuse-connection://
schema convention from Arrow Flight.
Coordinator → Agent: WRITE_INIT
job context, write operation, field names,
[ partition: (filter, relation,
[ location: (URI, serving hints), ]), ]
Response:
Write job ID
The job context includes all information needed to anchor this Write job on the target side: the target table name, the transaction ID, the ingestion task/job/stream ID, the ID of the “root” Write job to track their completions (see the section “Handling Server restarts” below), etc.
Coordinator receives the context from the transaction or ingestion client, or Agent itself via a READ_FOR_WRITE
message or by other means.
When Agent receives a WRITE_INIT
request, it immediately creates an ID for this new Write job and sends it back to Coordinator before doing its other logic, described in the “Agent’s logic” section below.
Record writing (DML) and output semantics
The write operation (a Substrait’s WriteOp + extra fields for upsert behaviour definition, such as the default values for unspecified fields) determines the semantics for writing the records in this Write job: whether they are inserted or upserted into the target table, or update existing rows, or delete existing rows.
However, WRITE_OP_CTAS
(Create Table As Select) write operation is not supported: table creation semantics are out of scope for Table Write protocol. The transaction client should create the table separately before letting Coordinator initialise Write job(s) via WRITE_INIT
requests.
On the other hand, there is a type of write operation that is not currently codified in Substrait but would be useful to implement for Table Write protocol, in particular for the cases when it is used for table replication: the record writing semantics are determined by some column in the records themselves. This would be useful when the source database in a table replication job natively uses “tombstone” bitmaps to indicate deleted rows in data segments5.
Note: Substrait WriteRel’s OUTPUT_MODE_MODIFIED_RECORDS
is not supported in Table Write protocol because it doesn’t seem there is any use case for it. Even returning just the number of modified rows may not be possible in databases that use the “write first, compact/deduplicate rows later” approach to upserts, updates, and deletes. The number of modified rows can be requested along with other statistics of the Write job(s) from the transaction or ingestion manager, via some protocol specific to the target database.
Partitions
The list of partitions in the WRITE_INIT
request is the equivalent of WriteRel’s input
relation: the latter would be the union of partitions’ relations. At the same time, this list of partitions is a simplified version of Coordinator’s step #2 response to Agent in Table Read protocol, except that slices are omitted (it could be assumed that there is a slice wrapping each partition).
The partition is defined by its filter (a Substrait expression), in the same way as in Table Read protocol. See more on partition semantics in Table Read protocol description.
Each partition’s relation (a Substrait’s Rel) defines the physical schema of the written records in the partition, as they will appear on the wire between the source’s Servers and the target’s Clients. Relations are specific for each partition because there may be different fields in different partitions, for example, if the source side is another database in a table replication, and the schema of the table in the source database has been changed. The same field may also have different types in different partitions as long as the target database can upcast all these types to the column type in the target table.
The relation shouldn’t include the partition’s filter. If all partitions in the Write job share the same relation, it’s possible to send it only once and omit them from each partition structure. Field names are always extracted in this way, as they should be shared between partitions: that’s why partition’s relation
field is a Substrait’s Rel
rather than a RelRoot.
When the source side is a processing engine, the locations list of each partition should typically have just one element, i.e., the location of the processing engine’s worker node that holds the processing results. However, if the source side of the Write job is a database (in the course of a table replication job), partitions may have multiple locations that can enable better load distribution on the source side and higher overall speed of the table replication job.
The location list for a partition may also be empty in response to a READ_FOR_WRITE
request. This indicates that the partition is temporarily unavailable. Agent should handle this in the same way as in Table Read protocol: periodically repeat READ_FOR_WRITE
requests where the original table processing plan specialised to the unavailable partition’s filter and the parent Write job’s ID as the “root” Write job ID. See also the section “Handling Server restarts” below.
Multiple Write jobs within a single transaction or ingestion task/job/stream
Coordinator can initiate multiple Write jobs within a transaction or an ingestion task/job/stream: Coordinator can send multiple WRITE_INIT
requests to Agent with different lists of partitions. These Write jobs may overlap in time.
Writing from a perpetual data source, such as a stream processing engine like Flink or hybrids of databases with stream processing systems like Materialize, RisingWave, and others, can be implemented as a series of Write jobs within a single ingestion task/job/stream. Yet, every Write job has a limited number of partitions, and every partition’s filter is “closed”, such as time BETWEEN $X and $Y
rather than time > $X
.
Agent’s logic
Upon receiving the WRITE_INIT
request from Coordinator, Agent, Coordinator, Clients, and Servers generally behave according to Table Read protocol, starting from step #3.
Of course, in determining which target database’s node should act as Client for each partition, Agent should consider the target side’s existing data segment placement on the database nodes, data distribution rules for the target table, and the current load of database nodes. In other words, the difference from the “standard” client-side load distribution logic as described for Table Read protocol is that the target side of the Write job (i.e., the client side of the underlying Read job) may be stateful if database nodes themselves are stateful.
If the source side’s VPC rules prohibit inbound connections, Agent can send to Coordinator the Clients locations that will consume data from the given partition locations. The source side’s Servers must establish connections with the corresponding Clients to enable the transfer of data from Servers:
Agent → Coordinator: WRITE_PRE_CONNECT_CLIENTS
[ (Server location, Client locations), ]
Coordinator → Client(s): WRITE_PRE_CONNECT_CLIENTS
Server locations
Client → Server(s): WRITE_PRE_CONNECT
Note that the connection from Coordinator to Agent is already established via the WRITE_INIT
request that makes the WRITE_PRE_CONNECT_CLIENTS
request possible.
If Clients get swapped in the course of the Write job, Agent can send WRITE_PRE_CONNECT_CLIENTS
requests repeatedly to update the lists of Client locations.
Handling Write jobs larger than source Servers’ memory
If the source side’s Servers are transferring more data in this Write job than they can hold in memory at once (or the processing engine prefers to reserve memory for other concurrent jobs), Coordinator must set Servers’ serving parallelism limit to one for these Servers in the WRITE_INIT
request.
After that, Servers must break down large partitions into multiple sections6 as part of consumer info that each Server sends to Client at step #5 of the underlying Read job, even though there may be no difference in the offered per-column transfer encodings between the sections (which is the original “justification” for the concept of sections in Table Read protocol).
To permit multiple target database’s replica nodes ingesting the same partition as Clients concurrently, Servers must also track serving parallelism and enforce the limit per Client, rather than in total across Clients, and Agent must send partition consumption tasks to Clients at step #3 and synchronise their completion such that only two Clients that request the same partition take advantage of Server’s per-Client tracking of the serving parallelism limit.
By doing what is described above, Coordinator, Agent, and Servers cooperate to execute sequential consumption of the partitions and sections by Clients. Then, the source side can leverage WRITE_PROGRESS
messages to dismiss successfully ingested partitions (and sections within partitions) promptly. This enables Write jobs with the total size of written records larger than the size of memory allocated to these jobs on Servers through which the records flow.
Write job progress
After a Client successfully consumes a data section from a Server (i.e., a columnar data streaming protocol interaction for the section ends with a COMPLETE
signal) and successfully persists this data to its disk or object storage, Client writes the number of consumed and persisted rows to the target side’s metadata store, or, perhaps, an ephemeral metadata “substore” specific for the Write job and tied to this Write job’s parent transaction or ingestion task/job/stream lifetime, as supported by Oxia.7
Then, Agent executes the following logic concurrently with its Read job logic:
ingested_row_counts =
new map<partition, map<consumption_token, integer>>
min_ingested_row_counts = new map<partition, integer>
async for (partition, consumption_token, ingested_row_count) updates
in metadata_store:
ingested_row_counts[partition][consumption_token] =
ingested_row_count
if size(ingested_row_counts[partition]) < replication_factor:
// Not all replicas have reported any progress for the partition
// yet, skip
return
min_count = min_ingested_row_counts[partition]
new_count = min_value(ingested_row_counts[partition])
if new_count > min_count:
min_ingested_row_counts[partition] = new_count
Agent → Coordinator: WRITE_PROGRESS
partition, new_count
Agent listens to the metadata store. When it sees that all replica Clients, identified by their consumption tokens (note the metadata store schema is internal to the target side, and thus this ID must not necessarily be equal to the consumption token as it appears in Table Read protocol, although this is a natural choice), have consumed N rows within the partition, which is higher than the previous N for that partition, Agent sends WRITE_PROGRESS
messages to Coordinator.
There is more than one replica Client (usually, two) if the target database wants to piggy-back Table Write protocol to get “almost free” replication, as I mentioned above in the section “The best Table Write protocol is Table Read protocol in reverse”, and as illustrated in Apache Pinot’s documentation here.
When Coordinator receives a WRITE_PROGRESS
message, it registers them in its own metadata store (to support high availability and Write job restarts) and forwards them to the relevant Servers. Servers then release resources (memory, primarily) used to serve the data sections that are now completely ingested, to enable Write jobs larger than source Servers’ memory: see above.
Handling Server restarts
If some source’s Server crashes or is restarted, Client sends to Agent a step #6 message in Table Read protocol. Then Agent sends to Coordinator a READ_FOR_WRITE
request (equivalent to step #7 in Table Read protocol) where it retransmits the job context that it received from Coordinator in the WRITE_INIT
request (adding the current Write job’s ID as the “root” Write job ID if there is none yet in the job context), partition.relation & partition.filter
as the table processing plan, and empty partitioning hints.
Coordinator responds with a new WRITE_INIT
request which may have a more fine-grained partition breakdown, perhaps if the old Server crashed exactly because the original partitions were too big and didn’t fit in memory. And even if Coordinator returns a single partition with the same filter as before, the new Server may break this partition into sections differently than the old Server.
When Client(s) reconnect to the new Server and initiates streaming of new partition’s sections, it sends the row_offset
equal to the already ingested (consumed and persisted) number of rows in the whole original partition at step #10 of the underlying Read job.
The new Server may figure out if any rows should be transferred in the given section and the given (partition) row offset either very cheaply, or the new Server may need to “shadow consume” the section without sending the actual data. If the section needs to be skipped completely, Server responds to the step #10 request from Client with a streaming protocol’s COMPLETE
signal immediately.
If Coordinator broke down the original partition into several smaller partitions with different Server locations, Coordinator must order partitions in its WRITE_INIT
message such that they coincide with the original partition’s row order, and Clients must consume the new partitions sequentially to send the correct row_offset
(subtracting the number of consumed rows in the preceding partition) to each of the new partitions.
Write job completion
Table Write protocol has implicit completion: Both Coordinator and Agent must keep track of partitions in the Write job (as listed in the WRITE_INIT
request) whose transfer has been completed, as Agent signals to Coordinator in WRITE_PROGRESS
messages described above. Coordinator and Agent must also keep track of the tree of Write jobs as created by WRITE_INIT
requests with specified “root” Write job IDs in the job context.
When Coordinator and Agent see that the transfer of all partitions in this Write job is complete, as well as all child Write jobs for the “root” Write job, Coordinator and Agent without any additional communication with each other both release all resources associated with this Write job and signal Servers and Clients involved in this Write job to do the same.
Write job abortion
As transaction and ingestion task/job/stream semantics are not defined in Table Write protocol, it doesn’t need to be concerned with abortion and error messaging.
The Write job might fail due to a data (consistency) conflict, crashes of the target’s Clients, or it could be aborted externally due to overload.
If the Write job abortion originates from actions on the level of Table Write protocol rather than higher levels, Agent registers the abortion signal and error messages with the transaction or ingestion manager rather than the source side’s Coordinator.
It’s the responsibility of the transaction/ingestion manager to send the termination signal to the transaction/ingestion client, which in turn propagates this termination signal to Coordinator. All these interactions are out of the scope of Table Write protocol: they happen according to the already existing protocols between the target database, the processing engine (or another source, such as another database, in the context of table replication), and the system or process that acts as the transaction or ingestion client.
Then, Coordinator sends signals to Servers to release all resources associated with the Write job. These signals are internal to the source side and thus out of scope for Table Write protocol.
As per Table Read protocol, Agent and Coordinator must monitor each other’s aliveness. When their counterpart appears unresponsive to them, they terminate all connections and release resources on their side of the Write job. They should report this to their transaction/ingestion manager or transaction/ingestion client, respectively.
Table Write protocol: implementation notes
It should be relatively simple for most processing engines to implement the source side of Table Write protocol, i.e., the server side of Table Read protocol. They can leverage their implementations of inter-stage (and inter-node) data exchange.
If the processing engine already implements the server side of Arrow Flight protocol, as DataFusion does, implementing the server side of Table Read protocol becomes even simpler: Table Read protocol is “progressive”, which means the implementation could only support “core” features that are mostly equivalent to Arrow Flight. The primary difference would be the streaming protocol for columnar data instead of the standard FlightData streaming in Arrow Flight RPC, or Arrow Dissociated IPC.
On the target side of Table Write protocol, i.e., the client side of Table Read protocol, the databases could reuse much of the heavy lifting that they have done to implement pull-based Kafka ingestion, and combine that with other heavy lifting that they have done to implement importing and external querying of data stored in columnar formats, such as Parquet or Arrow.
However, despite the reuse of concepts and logic with Table Read protocol, implementing the server and client sides of Table Read protocol for a database (the latter is necessary for it to act as the target side in Table Write protocol) are completely separate efforts.
Table replication method
An efficient table replication method across database types and storage formats is possible via a combination of Table Read and Table Write protocols: the source side of the Write job is the database from which the table is replicated, and the target side of the Write job is the target database into which the table is replicated.
The table replication method automatically inherits all the nice properties of Table Read and Table Write protocols: read consistency, resiliency, efficient load distribution, and the controllability of resource usage, all on both sides of the replication process.
The proposed table replication method consists of a series of Write jobs. I’ll call Replicator the system that acts as the transaction or ingestion client for these Write jobs (see “Node roles” section above). Replicator’s logic looks as follows:
if target_db uses task/job/stream-based ingestion:
ingestion_task_id =
... (Start ingestion task/job/stream in target_db.)
Replicator → Coordinator (source_db): REPLICATION_INIT
processing_plan, replication params
Replicator ← Coordinator: REPLICATION_PHASE
augmented_processing_plan
repeat until cancelled (maybe with delay between steps):
if target_db uses transactions:
// Note: EXECUTE_COMMAND (and TRANSACTION_RESULT below) are
// dummy. Actually, Replicator uses the transaction protocol
// specific to the target DB, such as the PostgreSQL wire
// protocol that many OLAP databases adopt.
Replicator → Agent (target_db): EXECUTE_COMMAND
"COPY INTO $target_table
FROM $source_db WITH $augmented_processing_plan"
// Executed on Agent: context = {table=target_table, tx_id=...}
Agent → Coordinator: READ_FOR_WRITE
reuse-connection://?, context, augmented_processing_plan, ...
... (The entire Write job happens in background.)
Replicator ← Agent: TRANSACTION_RESULT
result // Success or Failure, after the Write job is complete.
else: // target_db uses task/job/stream-based ingestion
context = {task_id=ingestion_task_id}
Replicator → Coordinator: READ_FOR_WRITE
agent_location, context, augmented_processing_plan, ...
... (The entire Write job happens in background.)
Replicator ← Coordinator:
result // Success or failure, after the Write job is complete.
if result == Success:
Replicator → Coordinator: REPLICATION_STEP
augmented_processing_plan, replication params
Replicator ← Coordinator: REPLICATION_PHASE
augmented_processing_plan // New plan
REPLICATION_INIT
and REPLICATION_STEP
requests from Replicator to Coordinator with REPLICATION_PHASE
responses make most of the interesting work here.
Upon receiving a REPLICATION_INIT
request, Coordinator augments the given processing plan (it may be a simple column selection and projection from the source table) with extra special predicates that ensure consistent reads. These extra predicates are opaque to Replicator and the target database’s Agent, and their nature depends on the source database’s approach to consistent reads. See more details at the link above.
Upon receiving a REPLICATION_STEP
request, Coordinator inverts that extra predicate in the given (previous) augmented processing plan. For example, if that extra predicate has the form segment.snapshot_number <= $N
, Coordinator returns a new processing plan augmented with a predicate like segment.snapshot_number > $N and segment.snapshot_number <= $X
, where X
is the latest segment snapshot number.
Index, statistics, and segment metadata replication
Table Read protocol permits transferring column indexes, as well as column- and section-level statistics (they could be treated as a kind of index) alongside column data to make replication faster at the cost of higher network traffic.
When the source and target databases are two separate instances of the same database technology, the source side’s Servers can also transfer arbitrary internal data segment- or column-level metadata in extension or metadata fields in its step #5 messages, or Arrow’s metadata as transferred in the streaming protocol.
Note the difference in the approach to metadata transfer in this table replication method from the approach of open table formats: Iceberg, Hudi, and Delta Lake, who externalise the exact metadata storage format and medium, i.e., object storage. The table replication method based on Table Write protocol offers greater flexibility: the metadata may be stored in raw files on object storage like in Iceberg and Delta Lake, in HBase or another wide column store like in Hudi, in ZooKeeper or Raft-based consensus system with in-memory caching like in ClickHouse, in Databend, Druid, and Pinot, in a distributed key-value store such as Cassandra like in Facebook’s Tectonic, in files directly on worker or database nodes alongside the data like in systems built with Oxia, in front-end server’s key-value store with in-memory caching like in Doris and StarRocks, in a relational DBMS such as Aurora, AlloyDB, or Cockroach like in Quickwit, in a disaggregated columnar storage like in BigQuery, or in hybrid and tiered ways where different parts of metadata are stored in different formats and on different mediums.
The approaches to storing metadata may be completely different in the source and the target databases in the replication job, but they don’t need to concern about the metastore architecture and format on the opposite side.
Replication of “delete files”
The source database can emulate replication of data segments with different writing semantics a la Iceberg’s delete files with the help of the new write operation type proposed in the section “Record writing (DML) and output semantics” above: the writing semantics are determined by a column within the record itself. However, the source database’s nodes may transfer the same value in this special column for entire partitions’ sections, and its transmission is almost free with Arrow’s Run-End Encoding for the column (as transferred on the level of the streaming protocol for columnar data). The alternative design is making write operations specific to each partition in the WRITE_INIT
message rather than shared.
This is because without the read throughput provided by public cloud’s object storage, such as with on-premises setups of OSS object storage like MinIO where the number of nodes in the object storage layer doesn’t greatly exceed the number of nodes doing query processing, analytics queries on tables stored in Iceberg, Hudi, or Delta Lake formats would run even slower.
The streaming protocol for columnar data proposed for use in Table Read and Table Write protocols permits loading columnar data directly from NVM-based storage to GPU memory and back, using transfer off-loading via libfabric or UCX.
In fairness, if the database’s architecture is built around a shared, durable log (like WAL), and natively implements distributed transaction management, implementing Arrow Flight-style SQL bulk ingestion becomes relatively simple, unlike Table Write protocol, which would require significant development effort and complexity anyway. I believe that some HTAP databases, such as Google’s AlloyDB and CedarDB (see Schmidt et al., 2024) have these features and thus can offer a resilient data ingestion interface with exactly once delivery via Arrow Flight protocol more easily than via Table Write protocol. However, the vast majority of distributed OLAP databases don’t have a shared durable log in their architecture, and thus for them offering a resilient data ingestion interface with exactly once delivery guarantees (and without sticking Kafka in between) would be simpler via Table Write protocol.
The emerging alternative is cross-system transaction managers such as Apache Seata, Temporal, etc. to orchestrate transactions on top of multiple OLAP and other databases.
I use the term “data segment” to disambiguate between physical partitions at the source and logical partitions as defined in Table Read protocol.
In Table Read protocol, there is an extra abstraction called column group, so, more precisely, sections belong to column groups within partitions, not partitions themselves.
If the target side doesn’t implement Agent’s high availability, the metadata “store” could be just Agent’s memory, and Clients “write into this metadata store” by sending messages to Agent.