diff options
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/data.rs | 20 | ||||
-rw-r--r-- | src/table/table.rs | 2 |
2 files changed, 18 insertions, 4 deletions
diff --git a/src/table/data.rs b/src/table/data.rs index 9aa2a3bc..e07a21d2 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -9,13 +9,16 @@ use tokio::sync::Notify; use garage_util::data::*; use garage_util::error::*; +use garage_rpc::membership::System; + use crate::crdt::CRDT; use crate::replication::*; use crate::schema::*; pub struct TableData<F: TableSchema, R: TableReplication> { - pub name: String, + system: Arc<System>, + pub name: String, pub(crate) instance: F, pub(crate) replication: R, @@ -32,7 +35,7 @@ where F: TableSchema, R: TableReplication, { - pub fn new(name: String, instance: F, replication: R, db: &sled::Db) -> Arc<Self> { + pub fn new(system: Arc<System>, name: String, instance: F, replication: R, db: &sled::Db) -> Arc<Self> { let store = db .open_tree(&format!("{}:table", name)) .expect("Unable to open DB tree"); @@ -49,6 +52,7 @@ where .expect("Unable to open DB tree"); Arc::new(Self { + system, name, instance, replication, @@ -157,7 +161,17 @@ where self.instance.updated(old_entry, Some(new_entry)); self.merkle_todo_notify.notify_one(); if is_tombstone { - self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?; + // We are only responsible for GC'ing this item if we are the + // "leader" of the partition, i.e. the first node in the + // set of nodes that replicates this partition. + // This avoids GC loops and does not change the termination properties + // of the GC algorithm, as in all cases GC is suspended if + // any node of the partition is unavailable. + let pk_hash = Hash::try_from(&tree_key[..32]).unwrap(); + let nodes = self.replication.write_nodes(&pk_hash); + if nodes.first() == Some(&self.system.id) { + self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?; + } } } diff --git a/src/table/table.rs b/src/table/table.rs index 421c8bf5..e203b178 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -64,7 +64,7 @@ where let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path); - let data = TableData::new(name, instance, replication, db); + let data = TableData::new(system.clone(), name, instance, replication, db); let merkle_updater = MerkleUpdater::launch(&system.background, data.clone()); |