diff options
author | Alex Auvolat <alex@adnab.me> | 2021-12-14 12:34:01 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-12-15 15:39:10 +0100 |
commit | 8f6026de5ecd44cbe0fc0bcd47638a1ece860439 (patch) | |
tree | 5adf96c2218aa27b59eeb66cda676895979d4257 /src/table | |
parent | 945b75dbf1de8bb22ebf9824727a2c45561bfcf4 (diff) | |
download | garage-8f6026de5ecd44cbe0fc0bcd47638a1ece860439.tar.gz garage-8f6026de5ecd44cbe0fc0bcd47638a1ece860439.zip |
Make table name a const in trait
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/data.rs | 20 | ||||
-rw-r--r-- | src/table/gc.rs | 13 | ||||
-rw-r--r-- | src/table/merkle.rs | 15 | ||||
-rw-r--r-- | src/table/schema.rs | 7 | ||||
-rw-r--r-- | src/table/sync.rs | 42 | ||||
-rw-r--r-- | src/table/table.rs | 12 |
6 files changed, 57 insertions, 52 deletions
diff --git a/src/table/data.rs b/src/table/data.rs index fb0b6d02..7af5f552 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -19,7 +19,6 @@ use crate::schema::*; pub struct TableData<F: TableSchema, R: TableReplication> { system: Arc<System>, - pub name: String, pub(crate) instance: F, pub(crate) replication: R, @@ -36,31 +35,24 @@ where F: TableSchema, R: TableReplication, { - pub fn new( - system: Arc<System>, - name: String, - instance: F, - replication: R, - db: &sled::Db, - ) -> Arc<Self> { + pub fn new(system: Arc<System>, instance: F, replication: R, db: &sled::Db) -> Arc<Self> { let store = db - .open_tree(&format!("{}:table", name)) + .open_tree(&format!("{}:table", F::TABLE_NAME)) .expect("Unable to open DB tree"); let merkle_tree = db - .open_tree(&format!("{}:merkle_tree", name)) + .open_tree(&format!("{}:merkle_tree", F::TABLE_NAME)) .expect("Unable to open DB Merkle tree tree"); let merkle_todo = db - .open_tree(&format!("{}:merkle_todo", name)) + .open_tree(&format!("{}:merkle_todo", F::TABLE_NAME)) .expect("Unable to open DB Merkle TODO tree"); let gc_todo = db - .open_tree(&format!("{}:gc_todo_v2", name)) + .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) .expect("Unable to open DB tree"); Arc::new(Self { system, - name, instance, replication, store, @@ -245,7 +237,7 @@ where Err(e) => match F::try_migrate(bytes) { Some(x) => Ok(x), None => { - warn!("Unable to decode entry of {}: {}", self.name, e); + warn!("Unable to decode entry of {}: {}", F::TABLE_NAME, e); for line in hexdump::hexdump_iter(bytes) { debug!("{}", line); } diff --git a/src/table/gc.rs b/src/table/gc.rs index 98d7c95d..5cb8cb9b 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -57,11 +57,11 @@ where pub(crate) fn launch(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> { let endpoint = system .netapp - .endpoint(format!("garage_table/gc.rs/Rpc:{}", data.name)); + .endpoint(format!("garage_table/gc.rs/Rpc:{}", F::TABLE_NAME)); let gc = Arc::new(Self { system: system.clone(), - data: data.clone(), + data, endpoint, }); @@ -69,7 +69,7 @@ where let gc1 = gc.clone(); system.background.spawn_worker( - format!("GC loop for {}", data.name), + format!("GC loop for {}", F::TABLE_NAME), move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit), ); @@ -90,7 +90,7 @@ where } } Err(e) => { - warn!("({}) Error doing GC: {}", self.data.name, e); + warn!("({}) Error doing GC: {}", F::TABLE_NAME, e); } } } @@ -160,7 +160,7 @@ where return Ok(Some(Duration::from_secs(60))); } - debug!("({}) GC: doing {} items", self.data.name, entries.len()); + debug!("({}) GC: doing {} items", F::TABLE_NAME, entries.len()); // Split entries to GC by the set of nodes on which they are stored. // Here we call them partitions but they are not exactly @@ -262,7 +262,8 @@ where info!( "({}) GC: {} items successfully pushed, will try to delete.", - self.data.name, n_items + F::TABLE_NAME, + n_items ); // Step 2: delete tombstones everywhere. diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 56f307d3..5ec6ab61 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -82,7 +82,7 @@ where let ret2 = ret.clone(); background.spawn_worker( - format!("Merkle tree updater for {}", ret.data.name), + format!("Merkle tree updater for {}", F::TABLE_NAME), |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit), ); @@ -97,14 +97,16 @@ where if let Err(e) = self.update_item(&key[..], &valhash[..]) { warn!( "({}) Error while updating Merkle tree item: {}", - self.data.name, e + F::TABLE_NAME, + e ); } } Err(e) => { warn!( "({}) Error while iterating on Merkle todo tree: {}", - self.data.name, e + F::TABLE_NAME, + e ); tokio::time::sleep(Duration::from_secs(10)).await; } @@ -147,7 +149,8 @@ where if !deleted { debug!( "({}) Item not deleted from Merkle todo because it changed: {:?}", - self.data.name, k + F::TABLE_NAME, + k ); } Ok(()) @@ -183,7 +186,7 @@ where // should not happen warn!( "({}) Replacing intermediate node with empty node, should not happen.", - self.data.name + F::TABLE_NAME ); Some(MerkleNode::Empty) } else if children.len() == 1 { @@ -195,7 +198,7 @@ where MerkleNode::Empty => { warn!( "({}) Single subnode in tree is empty Merkle node", - self.data.name + F::TABLE_NAME ); Some(MerkleNode::Empty) } diff --git a/src/table/schema.rs b/src/table/schema.rs index 4d6050e8..fa51fa84 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -57,12 +57,19 @@ pub trait Entry<P: PartitionKey, S: SortKey>: /// Trait for the schema used in a table pub trait TableSchema: Send + Sync { + /// The name of the table in the database + const TABLE_NAME: &'static str; + /// The partition key used in that table type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; /// The sort key used int that table type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; + /// They type for an entry in that table type E: Entry<Self::P, Self::S>; + + /// The type for a filter that can be applied to select entries + /// (e.g. filter out deleted entries) type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; // Action to take if not able to decode current version: diff --git a/src/table/sync.rs b/src/table/sync.rs index c5795f65..df9fb4d0 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -77,13 +77,13 @@ where ) -> Arc<Self> { let endpoint = system .netapp - .endpoint(format!("garage_table/sync.rs/Rpc:{}", data.name)); + .endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME)); let todo = SyncTodo { todo: vec![] }; let syncer = Arc::new(Self { system: system.clone(), - data: data.clone(), + data, merkle, todo: Mutex::new(todo), endpoint, @@ -95,13 +95,13 @@ where let s1 = syncer.clone(); system.background.spawn_worker( - format!("table sync watcher for {}", data.name), + format!("table sync watcher for {}", F::TABLE_NAME), move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx), ); let s2 = syncer.clone(); system.background.spawn_worker( - format!("table syncer for {}", data.name), + format!("table syncer for {}", F::TABLE_NAME), move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx), ); @@ -128,7 +128,7 @@ where _ = ring_recv.changed().fuse() => { let new_ring = ring_recv.borrow(); if !Arc::ptr_eq(&new_ring, &prev_ring) { - debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name); + debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME); self.add_full_sync(); prev_ring = new_ring.clone(); } @@ -146,7 +146,7 @@ where _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) { nothing_to_do_since = None; - debug!("({}) Interval passed, adding full sync to syncer todo list", self.data.name); + debug!("({}) Interval passed, adding full sync to syncer todo list", F::TABLE_NAME); self.add_full_sync(); } } @@ -177,7 +177,9 @@ where if let Err(e) = res { warn!( "({}) Error while syncing {:?}: {}", - self.data.name, partition, e + F::TABLE_NAME, + partition, + e ); } } else { @@ -205,7 +207,9 @@ where debug!( "({}) Syncing {:?} with {:?}...", - self.data.name, partition, nodes + F::TABLE_NAME, + partition, + nodes ); let mut sync_futures = nodes .iter() @@ -219,7 +223,7 @@ where while let Some(r) = sync_futures.next().await { if let Err(e) = r { n_errors += 1; - warn!("({}) Sync error: {}", self.data.name, e); + warn!("({}) Sync error: {}", F::TABLE_NAME, e); } } if n_errors > self.data.replication.max_write_errors() { @@ -272,7 +276,7 @@ where if nodes.contains(&self.system.id) { warn!( "({}) Interrupting offload as partitions seem to have changed", - self.data.name + F::TABLE_NAME ); break; } @@ -286,7 +290,7 @@ where counter += 1; info!( "({}) Offloading {} items from {:?}..{:?} ({})", - self.data.name, + F::TABLE_NAME, items.len(), begin, end, @@ -329,7 +333,7 @@ where } if not_removed > 0 { - debug!("({}) {} items not removed during offload because they changed in between (trying again...)", self.data.name, not_removed); + debug!("({}) {} items not removed during offload because they changed in between (trying again...)", F::TABLE_NAME, not_removed); } Ok(()) @@ -360,7 +364,9 @@ where if root_ck.is_empty() { debug!( "({}) Sync {:?} with {:?}: partition is empty.", - self.data.name, partition, who + F::TABLE_NAME, + partition, + who ); return Ok(()); } @@ -384,7 +390,9 @@ where SyncRpc::RootCkDifferent(false) => { debug!( "({}) Sync {:?} with {:?}: no difference", - self.data.name, partition, who + F::TABLE_NAME, + partition, + who ); return Ok(()); } @@ -413,11 +421,11 @@ where // Just send that item directly if let Some(val) = self.data.store.get(&ik[..])? { if blake2sum(&val[..]) != ivhash { - warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik); + warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik); } todo_items.push(val.to_vec()); } else { - warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik); + warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik); } } MerkleNode::Intermediate(l) => { @@ -482,7 +490,7 @@ where async fn send_items(&self, who: Uuid, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> { info!( "({}) Sending {} items to {:?}", - self.data.name, + F::TABLE_NAME, item_value_list.len(), who ); diff --git a/src/table/table.rs b/src/table/table.rs index e1357471..396888c1 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -55,18 +55,12 @@ where { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== - pub fn new( - instance: F, - replication: R, - system: Arc<System>, - db: &sled::Db, - name: String, - ) -> Arc<Self> { + pub fn new(instance: F, replication: R, system: Arc<System>, db: &sled::Db) -> Arc<Self> { let endpoint = system .netapp - .endpoint(format!("garage_table/table.rs/Rpc:{}", name)); + .endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME)); - let data = TableData::new(system.clone(), name, instance, replication, db); + let data = TableData::new(system.clone(), instance, replication, db); let merkle_updater = MerkleUpdater::launch(&system.background, data.clone()); |