aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-12-14 12:34:01 +0100
committerAlex Auvolat <alex@adnab.me>2021-12-15 15:39:10 +0100
commit8f6026de5ecd44cbe0fc0bcd47638a1ece860439 (patch)
tree5adf96c2218aa27b59eeb66cda676895979d4257 /src/table
parent945b75dbf1de8bb22ebf9824727a2c45561bfcf4 (diff)
downloadgarage-8f6026de5ecd44cbe0fc0bcd47638a1ece860439.tar.gz
garage-8f6026de5ecd44cbe0fc0bcd47638a1ece860439.zip
Make table name a const in trait
Diffstat (limited to 'src/table')
-rw-r--r--src/table/data.rs20
-rw-r--r--src/table/gc.rs13
-rw-r--r--src/table/merkle.rs15
-rw-r--r--src/table/schema.rs7
-rw-r--r--src/table/sync.rs42
-rw-r--r--src/table/table.rs12
6 files changed, 57 insertions, 52 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index fb0b6d02..7af5f552 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -19,7 +19,6 @@ use crate::schema::*;
pub struct TableData<F: TableSchema, R: TableReplication> {
system: Arc<System>,
- pub name: String,
pub(crate) instance: F,
pub(crate) replication: R,
@@ -36,31 +35,24 @@ where
F: TableSchema,
R: TableReplication,
{
- pub fn new(
- system: Arc<System>,
- name: String,
- instance: F,
- replication: R,
- db: &sled::Db,
- ) -> Arc<Self> {
+ pub fn new(system: Arc<System>, instance: F, replication: R, db: &sled::Db) -> Arc<Self> {
let store = db
- .open_tree(&format!("{}:table", name))
+ .open_tree(&format!("{}:table", F::TABLE_NAME))
.expect("Unable to open DB tree");
let merkle_tree = db
- .open_tree(&format!("{}:merkle_tree", name))
+ .open_tree(&format!("{}:merkle_tree", F::TABLE_NAME))
.expect("Unable to open DB Merkle tree tree");
let merkle_todo = db
- .open_tree(&format!("{}:merkle_todo", name))
+ .open_tree(&format!("{}:merkle_todo", F::TABLE_NAME))
.expect("Unable to open DB Merkle TODO tree");
let gc_todo = db
- .open_tree(&format!("{}:gc_todo_v2", name))
+ .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open DB tree");
Arc::new(Self {
system,
- name,
instance,
replication,
store,
@@ -245,7 +237,7 @@ where
Err(e) => match F::try_migrate(bytes) {
Some(x) => Ok(x),
None => {
- warn!("Unable to decode entry of {}: {}", self.name, e);
+ warn!("Unable to decode entry of {}: {}", F::TABLE_NAME, e);
for line in hexdump::hexdump_iter(bytes) {
debug!("{}", line);
}
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 98d7c95d..5cb8cb9b 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -57,11 +57,11 @@ where
pub(crate) fn launch(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
let endpoint = system
.netapp
- .endpoint(format!("garage_table/gc.rs/Rpc:{}", data.name));
+ .endpoint(format!("garage_table/gc.rs/Rpc:{}", F::TABLE_NAME));
let gc = Arc::new(Self {
system: system.clone(),
- data: data.clone(),
+ data,
endpoint,
});
@@ -69,7 +69,7 @@ where
let gc1 = gc.clone();
system.background.spawn_worker(
- format!("GC loop for {}", data.name),
+ format!("GC loop for {}", F::TABLE_NAME),
move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit),
);
@@ -90,7 +90,7 @@ where
}
}
Err(e) => {
- warn!("({}) Error doing GC: {}", self.data.name, e);
+ warn!("({}) Error doing GC: {}", F::TABLE_NAME, e);
}
}
}
@@ -160,7 +160,7 @@ where
return Ok(Some(Duration::from_secs(60)));
}
- debug!("({}) GC: doing {} items", self.data.name, entries.len());
+ debug!("({}) GC: doing {} items", F::TABLE_NAME, entries.len());
// Split entries to GC by the set of nodes on which they are stored.
// Here we call them partitions but they are not exactly
@@ -262,7 +262,8 @@ where
info!(
"({}) GC: {} items successfully pushed, will try to delete.",
- self.data.name, n_items
+ F::TABLE_NAME,
+ n_items
);
// Step 2: delete tombstones everywhere.
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 56f307d3..5ec6ab61 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -82,7 +82,7 @@ where
let ret2 = ret.clone();
background.spawn_worker(
- format!("Merkle tree updater for {}", ret.data.name),
+ format!("Merkle tree updater for {}", F::TABLE_NAME),
|must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit),
);
@@ -97,14 +97,16 @@ where
if let Err(e) = self.update_item(&key[..], &valhash[..]) {
warn!(
"({}) Error while updating Merkle tree item: {}",
- self.data.name, e
+ F::TABLE_NAME,
+ e
);
}
}
Err(e) => {
warn!(
"({}) Error while iterating on Merkle todo tree: {}",
- self.data.name, e
+ F::TABLE_NAME,
+ e
);
tokio::time::sleep(Duration::from_secs(10)).await;
}
@@ -147,7 +149,8 @@ where
if !deleted {
debug!(
"({}) Item not deleted from Merkle todo because it changed: {:?}",
- self.data.name, k
+ F::TABLE_NAME,
+ k
);
}
Ok(())
@@ -183,7 +186,7 @@ where
// should not happen
warn!(
"({}) Replacing intermediate node with empty node, should not happen.",
- self.data.name
+ F::TABLE_NAME
);
Some(MerkleNode::Empty)
} else if children.len() == 1 {
@@ -195,7 +198,7 @@ where
MerkleNode::Empty => {
warn!(
"({}) Single subnode in tree is empty Merkle node",
- self.data.name
+ F::TABLE_NAME
);
Some(MerkleNode::Empty)
}
diff --git a/src/table/schema.rs b/src/table/schema.rs
index 4d6050e8..fa51fa84 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -57,12 +57,19 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
/// Trait for the schema used in a table
pub trait TableSchema: Send + Sync {
+ /// The name of the table in the database
+ const TABLE_NAME: &'static str;
+
/// The partition key used in that table
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
/// The sort key used int that table
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+
/// They type for an entry in that table
type E: Entry<Self::P, Self::S>;
+
+ /// The type for a filter that can be applied to select entries
+ /// (e.g. filter out deleted entries)
type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
// Action to take if not able to decode current version:
diff --git a/src/table/sync.rs b/src/table/sync.rs
index c5795f65..df9fb4d0 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -77,13 +77,13 @@ where
) -> Arc<Self> {
let endpoint = system
.netapp
- .endpoint(format!("garage_table/sync.rs/Rpc:{}", data.name));
+ .endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME));
let todo = SyncTodo { todo: vec![] };
let syncer = Arc::new(Self {
system: system.clone(),
- data: data.clone(),
+ data,
merkle,
todo: Mutex::new(todo),
endpoint,
@@ -95,13 +95,13 @@ where
let s1 = syncer.clone();
system.background.spawn_worker(
- format!("table sync watcher for {}", data.name),
+ format!("table sync watcher for {}", F::TABLE_NAME),
move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
);
let s2 = syncer.clone();
system.background.spawn_worker(
- format!("table syncer for {}", data.name),
+ format!("table syncer for {}", F::TABLE_NAME),
move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
);
@@ -128,7 +128,7 @@ where
_ = ring_recv.changed().fuse() => {
let new_ring = ring_recv.borrow();
if !Arc::ptr_eq(&new_ring, &prev_ring) {
- debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name);
+ debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
self.add_full_sync();
prev_ring = new_ring.clone();
}
@@ -146,7 +146,7 @@ where
_ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
nothing_to_do_since = None;
- debug!("({}) Interval passed, adding full sync to syncer todo list", self.data.name);
+ debug!("({}) Interval passed, adding full sync to syncer todo list", F::TABLE_NAME);
self.add_full_sync();
}
}
@@ -177,7 +177,9 @@ where
if let Err(e) = res {
warn!(
"({}) Error while syncing {:?}: {}",
- self.data.name, partition, e
+ F::TABLE_NAME,
+ partition,
+ e
);
}
} else {
@@ -205,7 +207,9 @@ where
debug!(
"({}) Syncing {:?} with {:?}...",
- self.data.name, partition, nodes
+ F::TABLE_NAME,
+ partition,
+ nodes
);
let mut sync_futures = nodes
.iter()
@@ -219,7 +223,7 @@ where
while let Some(r) = sync_futures.next().await {
if let Err(e) = r {
n_errors += 1;
- warn!("({}) Sync error: {}", self.data.name, e);
+ warn!("({}) Sync error: {}", F::TABLE_NAME, e);
}
}
if n_errors > self.data.replication.max_write_errors() {
@@ -272,7 +276,7 @@ where
if nodes.contains(&self.system.id) {
warn!(
"({}) Interrupting offload as partitions seem to have changed",
- self.data.name
+ F::TABLE_NAME
);
break;
}
@@ -286,7 +290,7 @@ where
counter += 1;
info!(
"({}) Offloading {} items from {:?}..{:?} ({})",
- self.data.name,
+ F::TABLE_NAME,
items.len(),
begin,
end,
@@ -329,7 +333,7 @@ where
}
if not_removed > 0 {
- debug!("({}) {} items not removed during offload because they changed in between (trying again...)", self.data.name, not_removed);
+ debug!("({}) {} items not removed during offload because they changed in between (trying again...)", F::TABLE_NAME, not_removed);
}
Ok(())
@@ -360,7 +364,9 @@ where
if root_ck.is_empty() {
debug!(
"({}) Sync {:?} with {:?}: partition is empty.",
- self.data.name, partition, who
+ F::TABLE_NAME,
+ partition,
+ who
);
return Ok(());
}
@@ -384,7 +390,9 @@ where
SyncRpc::RootCkDifferent(false) => {
debug!(
"({}) Sync {:?} with {:?}: no difference",
- self.data.name, partition, who
+ F::TABLE_NAME,
+ partition,
+ who
);
return Ok(());
}
@@ -413,11 +421,11 @@ where
// Just send that item directly
if let Some(val) = self.data.store.get(&ik[..])? {
if blake2sum(&val[..]) != ivhash {
- warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik);
+ warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik);
}
todo_items.push(val.to_vec());
} else {
- warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik);
+ warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik);
}
}
MerkleNode::Intermediate(l) => {
@@ -482,7 +490,7 @@ where
async fn send_items(&self, who: Uuid, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
info!(
"({}) Sending {} items to {:?}",
- self.data.name,
+ F::TABLE_NAME,
item_value_list.len(),
who
);
diff --git a/src/table/table.rs b/src/table/table.rs
index e1357471..396888c1 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -55,18 +55,12 @@ where
{
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
- pub fn new(
- instance: F,
- replication: R,
- system: Arc<System>,
- db: &sled::Db,
- name: String,
- ) -> Arc<Self> {
+ pub fn new(instance: F, replication: R, system: Arc<System>, db: &sled::Db) -> Arc<Self> {
let endpoint = system
.netapp
- .endpoint(format!("garage_table/table.rs/Rpc:{}", name));
+ .endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME));
- let data = TableData::new(system.clone(), name, instance, replication, db);
+ let data = TableData::new(system.clone(), instance, replication, db);
let merkle_updater = MerkleUpdater::launch(&system.background, data.clone());