Skip to content

Commit b6e63f5

Browse files
authored
ref(docs): Add CleanupStore to docs (#302)
1 parent 66f9c7f commit b6e63f5

4 files changed

Lines changed: 48 additions & 4 deletions

File tree

docs/explanation/architecture.md

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
**Understanding how ETL components work together to replicate data from Postgres**
44

5-
ETL's architecture centers around four core abstractions that work together to provide reliable, high-performance data replication: `Pipeline`, `Destination`, `SchemaStore`, and `StateStore`. This document explains how these components interact and coordinate data flow from Postgres logical replication to target systems.
5+
ETL's architecture centers around five core abstractions that work together to provide reliable, high-performance data replication: `Pipeline`, `Destination`, `SchemaStore`, `StateStore`, and `CleanupStore`. This document explains how these components interact and coordinate data flow from Postgres logical replication to target systems.
66

77
A diagram of the overall architecture is shown below:
88

@@ -36,6 +36,10 @@ flowchart LR
3636
subgraph SchemaStore[Schema Store]
3737
D2["Memory<br>Postgres"]
3838
end
39+
40+
subgraph CleanupStore[Cleanup Store]
41+
D3["Memory<br>Postgres"]
42+
end
3943
end
4044
4145
A --> ApplyWorker
@@ -160,6 +164,24 @@ Like `SchemaStore`, `StateStore` uses cache-first reads with `load_*` methods fo
160164

161165
The store tracks both replication progress through `TableReplicationPhase` and source-to-destination table name mappings.
162166

167+
### CleanupStore
168+
169+
The `CleanupStore` trait provides atomic, table-scoped maintenance operations that affect both schema and state storage. The pipeline calls these primitives when tables are removed from a publication.
170+
171+
```rust
172+
pub trait CleanupStore {
173+
/// Deletes all stored state for `table_id` for the current pipeline.
174+
///
175+
/// Removes replication state (including history), table schemas, and
176+
/// table mappings. This must NOT drop or modify the actual destination table.
177+
///
178+
/// Intended for use when a table is removed from the publication.
179+
fn cleanup_table_state(&self, table_id: TableId) -> impl Future<Output = EtlResult<()>> + Send;
180+
}
181+
```
182+
183+
Implementations must ensure the operation is consistent across in-memory caches and persistent storage, and must be idempotent. Cleanup only removes ETL-maintained metadata and state; it never touches destination tables.
184+
163185
## Data Flow Architecture
164186

165187
### Worker Coordination

docs/tutorials/custom-implementations.md

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ tracing-subscriber = "0.3"
5151

5252
## Step 3: Create Custom Store Implementation
5353

54-
Create `src/custom_store.rs` with a dual-storage implementation:
54+
Create `src/custom_store.rs` with a dual-storage implementation and cleanup primitives:
5555

5656
```rust
5757
use std::collections::HashMap;
@@ -63,6 +63,7 @@ use etl::error::EtlResult;
6363
use etl::state::table::TableReplicationPhase;
6464
use etl::store::schema::SchemaStore;
6565
use etl::store::state::StateStore;
66+
use etl::store::cleanup::CleanupStore;
6667
use etl::types::{TableId, TableSchema};
6768

6869
// Represents data stored in our in-memory cache for fast access
@@ -328,6 +329,26 @@ impl StateStore for CustomStore {
328329
Ok(())
329330
}
330331
}
332+
333+
// Cleanup primitives spanning both schema and state storage
334+
impl CleanupStore for CustomStore {
335+
// Delete everything ETL tracks for a specific table in a consistent, idempotent way
336+
async fn cleanup_table_state(&self, table_id: TableId) -> EtlResult<()> {
337+
{
338+
// Remove from persistent storage first
339+
let mut persistent = self.persistent.lock().await;
340+
persistent.remove(&table_id);
341+
}
342+
343+
{
344+
// Then clear the cache to maintain consistency
345+
let mut cache = self.cache.lock().await;
346+
cache.remove(&table_id);
347+
}
348+
349+
Ok(())
350+
}
351+
}
331352
```
332353

333354
**Result:** Your file should compile without errors when you run `cargo check`.

docs/tutorials/index.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ _What you'll build:_ A working pipeline that streams changes from a sample Postg
2020

2121
**45 minutes****Advanced**
2222

23-
Implement production-ready custom stores and destinations. Learn ETL's design patterns, build persistent SQLite storage, and create HTTP-based destinations with retry logic.
23+
Implement production-ready custom stores and destinations. Learn ETL's design patterns, build persistent storage, implement cleanup primitives for safe table removal, and create HTTP-based destinations with retry logic.
2424

25-
_What you'll build:_ Custom in-memory store for state/schema storage and HTTP destination.
25+
_What you'll build:_ Custom in-memory store for state/schema storage with cleanup, and an HTTP destination.
2626

2727
## Before You Start
2828

etl/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ The ETL core implements a pipeline architecture that replicates data from Postgr
2323
to the apply worker
2424
- **State Store**: Stores the state of the pipeline
2525
- **Schema Store**: Stores the table schemas of the tables involved in the replication
26+
- **Cleanup Store**: Provides atomic cleanup primitives that delete stored state, schema, and mappings for tables removed from a publication (does not touch destination data)
2627

2728
### Information Flow
2829

0 commit comments

Comments
 (0)