aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/Cargo.toml1
-rw-r--r--src/table/data.rs189
-rw-r--r--src/table/lib.rs4
-rw-r--r--src/table/merkle.rs39
-rw-r--r--src/table/replication/fullcopy.rs (renamed from src/table/table_fullcopy.rs)2
-rw-r--r--src/table/replication/mod.rs6
-rw-r--r--src/table/replication/parameters.rs22
-rw-r--r--src/table/replication/sharded.rs (renamed from src/table/table_sharded.rs)2
-rw-r--r--src/table/table.rs265
-rw-r--r--src/table/table_sync.rs129
10 files changed, 368 insertions, 291 deletions
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index 6485a542..6b3aaceb 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -19,7 +19,6 @@ garage_rpc = { version = "0.1.1", path = "../rpc" }
bytes = "0.4"
rand = "0.7"
hex = "0.3"
-arc-swap = "0.4"
log = "0.4"
hexdump = "0.1"
diff --git a/src/table/data.rs b/src/table/data.rs
new file mode 100644
index 00000000..fa89fc27
--- /dev/null
+++ b/src/table/data.rs
@@ -0,0 +1,189 @@
+use std::sync::Arc;
+
+use log::warn;
+use sled::Transactional;
+use serde_bytes::ByteBuf;
+
+use garage_util::data::*;
+use garage_util::error::*;
+use garage_util::background::BackgroundRunner;
+
+use crate::schema::*;
+use crate::merkle::*;
+use crate::crdt::CRDT;
+
+pub struct TableData<F: TableSchema> {
+ pub name: String,
+ pub instance: F,
+
+ pub store: sled::Tree,
+ pub(crate) merkle_updater: Arc<MerkleUpdater>,
+}
+
+impl<F> TableData<F> where F: TableSchema {
+ pub fn new(
+ name: String,
+ instance: F,
+ db: &sled::Db,
+ background: Arc<BackgroundRunner>,
+ ) -> Arc<Self> {
+ let store = db
+ .open_tree(&format!("{}:table", name))
+ .expect("Unable to open DB tree");
+
+ let merkle_todo_store = db
+ .open_tree(&format!("{}:merkle_todo", name))
+ .expect("Unable to open DB Merkle TODO tree");
+ let merkle_tree_store = db
+ .open_tree(&format!("{}:merkle_tree", name))
+ .expect("Unable to open DB Merkle tree tree");
+
+ let merkle_updater = MerkleUpdater::launch(
+ name.clone(),
+ background,
+ merkle_todo_store,
+ merkle_tree_store,
+ );
+
+ Arc::new(Self{
+ name,
+ instance,
+ store,
+ merkle_updater,
+ })
+ }
+
+ // Read functions
+
+ pub fn read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> {
+ let tree_key = self.tree_key(p, s);
+ if let Some(bytes) = self.store.get(&tree_key)? {
+ Ok(Some(ByteBuf::from(bytes.to_vec())))
+ } else {
+ Ok(None)
+ }
+ }
+
+ pub fn read_range(
+ &self,
+ p: &F::P,
+ s: &Option<F::S>,
+ filter: &Option<F::Filter>,
+ limit: usize,
+ ) -> Result<Vec<Arc<ByteBuf>>, Error> {
+ let partition_hash = p.hash();
+ let first_key = match s {
+ None => partition_hash.to_vec(),
+ Some(sk) => self.tree_key(p, sk),
+ };
+ let mut ret = vec![];
+ for item in self.store.range(first_key..) {
+ let (key, value) = item?;
+ if &key[..32] != partition_hash.as_slice() {
+ break;
+ }
+ let keep = match filter {
+ None => true,
+ Some(f) => {
+ let entry = self.decode_entry(value.as_ref())?;
+ F::matches_filter(&entry, f)
+ }
+ };
+ if keep {
+ ret.push(Arc::new(ByteBuf::from(value.as_ref())));
+ }
+ if ret.len() >= limit {
+ break;
+ }
+ }
+ Ok(ret)
+ }
+
+ // Mutation functions
+
+ pub(crate) fn update_many(&self, entries: &[Arc<ByteBuf>]) -> Result<(), Error> {
+ for update_bytes in entries.iter() {
+ self.update_entry(update_bytes.as_slice())?;
+ }
+ Ok(())
+ }
+
+ pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> {
+ let update = self.decode_entry(update_bytes)?;
+ let tree_key = self.tree_key(update.partition_key(), update.sort_key());
+
+ let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| {
+ let (old_entry, new_entry) = match db.get(&tree_key)? {
+ Some(prev_bytes) => {
+ let old_entry = self
+ .decode_entry(&prev_bytes)
+ .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ let mut new_entry = old_entry.clone();
+ new_entry.merge(&update);
+ (Some(old_entry), new_entry)
+ }
+ None => (None, update.clone()),
+ };
+
+ if Some(&new_entry) != old_entry.as_ref() {
+ let new_bytes = rmp_to_vec_all_named(&new_entry)
+ .map_err(Error::RMPEncode)
+ .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?;
+ db.insert(tree_key.clone(), new_bytes)?;
+ Ok(Some((old_entry, new_entry)))
+ } else {
+ Ok(None)
+ }
+ })?;
+
+ if let Some((old_entry, new_entry)) = changed {
+ self.instance.updated(old_entry, Some(new_entry));
+ //self.syncer.load_full().unwrap().invalidate(&tree_key[..]);
+ }
+
+ Ok(())
+ }
+
+ pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
+ let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| {
+ if let Some(cur_v) = txn.get(k)? {
+ if cur_v == v {
+ txn.remove(k)?;
+ mkl_todo.insert(k, vec![])?;
+ return Ok(true);
+ }
+ }
+ Ok(false)
+ })?;
+
+ if removed {
+ let old_entry = self.decode_entry(v)?;
+ self.instance.updated(Some(old_entry), None);
+ //self.syncer.load_full().unwrap().invalidate(k);
+ }
+ Ok(removed)
+ }
+
+ pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
+ let mut ret = p.hash().to_vec();
+ ret.extend(s.sort_key());
+ ret
+ }
+
+ pub(crate) fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
+ match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
+ Ok(x) => Ok(x),
+ Err(e) => match F::try_migrate(bytes) {
+ Some(x) => Ok(x),
+ None => {
+ warn!("Unable to decode entry of {}: {}", self.name, e);
+ for line in hexdump::hexdump_iter(bytes) {
+ debug!("{}", line);
+ }
+ Err(e.into())
+ }
+ },
+ }
+ }
+}
diff --git a/src/table/lib.rs b/src/table/lib.rs
index 62fd30c5..bb249a56 100644
--- a/src/table/lib.rs
+++ b/src/table/lib.rs
@@ -8,9 +8,9 @@ pub mod schema;
pub mod util;
pub mod merkle;
+pub mod replication;
+pub mod data;
pub mod table;
-pub mod table_fullcopy;
-pub mod table_sharded;
pub mod table_sync;
pub use schema::*;
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 50cb90d5..ef197dc8 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -61,7 +61,7 @@ pub enum MerkleNode {
}
impl MerkleUpdater {
- pub(crate) fn new(
+ pub(crate) fn launch(
table_name: String,
background: Arc<BackgroundRunner>,
todo: sled::Tree,
@@ -69,22 +69,22 @@ impl MerkleUpdater {
) -> Arc<Self> {
let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
- Arc::new(Self {
+ let ret = Arc::new(Self {
table_name,
background,
todo,
todo_notify: Notify::new(),
merkle_tree,
empty_node_hash,
- })
- }
+ });
- pub(crate) fn launch(self: &Arc<Self>) {
- let self2 = self.clone();
- self.background.spawn_worker(
- format!("Merkle tree updater for {}", self.table_name),
- |must_exit: watch::Receiver<bool>| self2.updater_loop(must_exit),
+ let ret2 = ret.clone();
+ ret.background.spawn_worker(
+ format!("Merkle tree updater for {}", ret.table_name),
+ |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit),
);
+
+ ret
}
async fn updater_loop(
@@ -156,28 +156,37 @@ impl MerkleUpdater {
new_vhash: Option<Hash>,
) -> ConflictableTransactionResult<Option<Hash>, Error> {
let i = key.prefix.len();
+
+ // Read node at current position (defined by the prefix stored in key)
+ // Calculate an update to apply to this node
+ // This update is an Option<_>, so that it is None if the update is a no-op
+ // and we can thus skip recalculating and re-storing everything
let mutate = match self.read_node_txn(tx, &key)? {
MerkleNode::Empty => {
if let Some(vhv) = new_vhash {
Some(MerkleNode::Leaf(k.to_vec(), vhv))
} else {
+ // Nothing to do, keep empty node
None
}
}
MerkleNode::Intermediate(mut children) => {
let key2 = key.next_key(khash);
if let Some(subhash) = self.update_item_rec(tx, k, khash, &key2, new_vhash)? {
+ // Subtree changed, update this node as well
if subhash == self.empty_node_hash {
intermediate_rm_child(&mut children, key2.prefix[i]);
} else {
intermediate_set_child(&mut children, key2.prefix[i], subhash);
}
+
if children.len() == 0 {
// should not happen
warn!("Replacing intermediate node with empty node, should not happen.");
Some(MerkleNode::Empty)
} else if children.len() == 1 {
- // move node down to this level
+ // We now have a single node (case when the update deleted one of only two
+ // children). Move that single child to this level of the tree.
let key_sub = key.add_byte(children[0].0);
let subnode = self.read_node_txn(tx, &key_sub)?;
tx.remove(key_sub.encode())?;
@@ -186,19 +195,23 @@ impl MerkleUpdater {
Some(MerkleNode::Intermediate(children))
}
} else {
+ // Subtree not changed, nothing to do
None
}
}
MerkleNode::Leaf(exlf_key, exlf_hash) => {
if exlf_key == k {
+ // This leaf is for the same key that the one we are updating
match new_vhash {
Some(vhv) if vhv == exlf_hash => None,
Some(vhv) => Some(MerkleNode::Leaf(k.to_vec(), vhv)),
None => Some(MerkleNode::Empty),
}
} else {
+ // This is an only leaf for another key
if let Some(vhv) = new_vhash {
- // Create two sub-nodes and replace by intermediary node
+ // Move that other key to a subnode, create another subnode for our
+ // insertion and replace current node by an intermediary node
let (pos1, h1) = {
let key2 = key.next_key(blake2sum(&exlf_key[..]));
let subhash =
@@ -216,6 +229,9 @@ impl MerkleUpdater {
intermediate_set_child(&mut int, pos2, h2);
Some(MerkleNode::Intermediate(int))
} else {
+ // Nothing to do, we don't want to insert this value because it is None,
+ // and we don't want to change the other value because it's for something
+ // else
None
}
}
@@ -263,6 +279,7 @@ impl MerkleUpdater {
}
}
+ // Access a node in the Merkle tree, used by the sync protocol
pub(crate) fn read_node(
&self,
k: &MerkleNodeKey,
diff --git a/src/table/table_fullcopy.rs b/src/table/replication/fullcopy.rs
index c55879d8..a62a6c3c 100644
--- a/src/table/table_fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -4,7 +4,7 @@ use garage_rpc::membership::System;
use garage_rpc::ring::Ring;
use garage_util::data::*;
-use crate::*;
+use crate::replication::*;
#[derive(Clone)]
pub struct TableFullReplication {
diff --git a/src/table/replication/mod.rs b/src/table/replication/mod.rs
new file mode 100644
index 00000000..d43d7f19
--- /dev/null
+++ b/src/table/replication/mod.rs
@@ -0,0 +1,6 @@
+mod parameters;
+
+pub mod fullcopy;
+pub mod sharded;
+
+pub use parameters::*;
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
new file mode 100644
index 00000000..4607b050
--- /dev/null
+++ b/src/table/replication/parameters.rs
@@ -0,0 +1,22 @@
+use garage_rpc::membership::System;
+use garage_rpc::ring::Ring;
+
+use garage_util::data::*;
+
+pub trait TableReplication: Send + Sync {
+ // See examples in table_sharded.rs and table_fullcopy.rs
+ // To understand various replication methods
+
+ // Which nodes to send reads from
+ fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
+ fn read_quorum(&self) -> usize;
+
+ // Which nodes to send writes to
+ fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
+ fn write_quorum(&self, system: &System) -> usize;
+ fn max_write_errors(&self) -> usize;
+
+ // Which are the nodes that do actually replicate the data
+ fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>;
+ fn split_points(&self, ring: &Ring) -> Vec<Hash>;
+}
diff --git a/src/table/table_sharded.rs b/src/table/replication/sharded.rs
index 47bdfeaf..42a742cd 100644
--- a/src/table/table_sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -2,7 +2,7 @@ use garage_rpc::membership::System;
use garage_rpc::ring::Ring;
use garage_util::data::*;
-use crate::*;
+use crate::replication::*;
#[derive(Clone)]
pub struct TableShardedReplication {
diff --git a/src/table/table.rs b/src/table/table.rs
index 0e75754c..a4cb4b24 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -2,40 +2,35 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::Duration;
-use log::warn;
-
-use arc_swap::ArcSwapOption;
use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
-use sled::Transactional;
use garage_util::data::*;
use garage_util::error::Error;
use garage_rpc::membership::System;
-use garage_rpc::ring::Ring;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
use crate::crdt::CRDT;
-use crate::merkle::*;
+use crate::data::*;
use crate::schema::*;
use crate::table_sync::*;
+use crate::replication::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
-pub struct Table<F: TableSchema, R: TableReplication> {
- pub instance: F,
+pub struct TableAux<F: TableSchema, R: TableReplication> {
+ pub system: Arc<System>,
pub replication: R,
-
- pub name: String,
pub(crate) rpc_client: Arc<RpcClient<TableRPC<F>>>,
+}
- pub system: Arc<System>,
- pub store: sled::Tree,
- pub syncer: ArcSwapOption<TableSyncer<F, R>>,
- merkle_updater: Arc<MerkleUpdater>,
+pub struct Table<F: TableSchema, R: TableReplication> {
+ pub data: Arc<TableData<F>>,
+ pub aux: Arc<TableAux<F, R>>,
+ pub syncer: Arc<TableSyncer<F, R>>,
}
#[derive(Serialize, Deserialize)]
@@ -55,23 +50,6 @@ pub(crate) enum TableRPC<F: TableSchema> {
impl<F: TableSchema> RpcMessage for TableRPC<F> {}
-pub trait TableReplication: Send + Sync {
- // See examples in table_sharded.rs and table_fullcopy.rs
- // To understand various replication methods
-
- // Which nodes to send reads from
- fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
- fn read_quorum(&self) -> usize;
-
- // Which nodes to send writes to
- fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
- fn write_quorum(&self, system: &System) -> usize;
- fn max_write_errors(&self) -> usize;
-
- // Which are the nodes that do actually replicate the data
- fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>;
- fn split_points(&self, ring: &Ring) -> Vec<Hash>;
-}
impl<F, R> Table<F, R>
where
@@ -88,60 +66,51 @@ where
name: String,
rpc_server: &mut RpcServer,
) -> Arc<Self> {
- let store = db
- .open_tree(&format!("{}:table", name))
- .expect("Unable to open DB tree");
-
- let merkle_todo_store = db
- .open_tree(&format!("{}:merkle_todo", name))
- .expect("Unable to open DB Merkle TODO tree");
- let merkle_tree_store = db
- .open_tree(&format!("{}:merkle_tree", name))
- .expect("Unable to open DB Merkle tree tree");
-
let rpc_path = format!("table_{}", name);
let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path);
- let merkle_updater = MerkleUpdater::new(
- name.clone(),
+ let data = TableData::new(
+ name,
+ instance,
+ db,
system.background.clone(),
- merkle_todo_store,
- merkle_tree_store,
);
- let table = Arc::new(Self {
- instance,
+ let aux = Arc::new(TableAux{
+ system,
replication,
- name,
rpc_client,
- system,
- store,
- syncer: ArcSwapOption::from(None),
- merkle_updater,
});
- table.clone().register_handler(rpc_server, rpc_path);
- let syncer = TableSyncer::launch(table.clone());
- table.syncer.swap(Some(syncer));
+ let syncer = TableSyncer::launch(
+ data.clone(),
+ aux.clone(),
+ );
- table.merkle_updater.launch();
+ let table = Arc::new(Self {
+ data,
+ aux,
+ syncer,
+ });
+
+ table.clone().register_handler(rpc_server, rpc_path);
table
}
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
- let who = self.replication.write_nodes(&hash, &self.system);
+ let who = self.aux.replication.write_nodes(&hash, &self.aux.system);
//eprintln!("insert who: {:?}", who);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
let rpc = TableRPC::<F>::Update(vec![e_enc]);
- self.rpc_client
+ self.aux.rpc_client
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.replication.write_quorum(&self.system))
+ RequestStrategy::with_quorum(self.aux.replication.write_quorum(&self.aux.system))
.with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
@@ -153,7 +122,7 @@ where
for entry in entries.iter() {
let hash = entry.partition_key().hash();
- let who = self.replication.write_nodes(&hash, &self.system);
+ let who = self.aux.replication.write_nodes(&hash, &self.aux.system);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
for node in who {
if !call_list.contains_key(&node) {
@@ -166,7 +135,7 @@ where
let call_futures = call_list.drain().map(|(node, entries)| async move {
let rpc = TableRPC::<F>::Update(entries);
- let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?;
+ let resp = self.aux.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?;
Ok::<_, Error>((node, resp))
});
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
@@ -177,7 +146,7 @@ where
errors.push(e);
}
}
- if errors.len() > self.replication.max_write_errors() {
+ if errors.len() > self.aux.replication.max_write_errors() {
Err(Error::Message("Too many errors".into()))
} else {
Ok(())
@@ -190,16 +159,17 @@ where
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash();
- let who = self.replication.read_nodes(&hash, &self.system);
+ let who = self.aux.replication.read_nodes(&hash, &self.aux.system);
//eprintln!("get who: {:?}", who);
let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self
+ .aux
.rpc_client
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.replication.read_quorum())
+ RequestStrategy::with_quorum(self.aux.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
@@ -210,7 +180,7 @@ where
for resp in resps {
if let TableRPC::ReadEntryResponse(value) = resp {
if let Some(v_bytes) = value {
- let v = self.decode_entry(v_bytes.as_slice())?;
+ let v = self.data.decode_entry(v_bytes.as_slice())?;
ret = match ret {
None => Some(v),
Some(mut x) => {
@@ -230,7 +200,7 @@ where
if not_all_same {
let self2 = self.clone();
let ent2 = ret_entry.clone();
- self.system
+ self.aux.system
.background
.spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
}
@@ -246,16 +216,16 @@ where
limit: usize,
) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash();
- let who = self.replication.read_nodes(&hash, &self.system);
+ let who = self.aux.replication.read_nodes(&hash, &self.aux.system);
let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
let resps = self
- .rpc_client
+ .aux.rpc_client
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.replication.read_quorum())
+ RequestStrategy::with_quorum(self.aux.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
@@ -266,8 +236,8 @@ where
for resp in resps {
if let TableRPC::Update(entries) = resp {
for entry_bytes in entries.iter() {
- let entry = self.decode_entry(entry_bytes.as_slice())?;
- let entry_key = self.tree_key(entry.partition_key(), entry.sort_key());
+ let entry = self.data.decode_entry(entry_bytes.as_slice())?;
+ let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key());
match ret.remove(&entry_key) {
None => {
ret.insert(entry_key, Some(entry));
@@ -287,7 +257,7 @@ where
}
if !to_repair.is_empty() {
let self2 = self.clone();
- self.system.background.spawn_cancellable(async move {
+ self.aux.system.background.spawn_cancellable(async move {
for (_, v) in to_repair.iter_mut() {
self2.repair_on_read(&who[..], v.take().unwrap()).await?;
}
@@ -306,7 +276,7 @@ where
async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
- self.rpc_client
+ self.aux.rpc_client
.try_call_many(
&who[..],
TableRPC::<F>::Update(vec![what_enc]),
@@ -326,8 +296,8 @@ where
});
let self2 = self.clone();
- self.rpc_client
- .set_local_handler(self.system.id, move |msg| {
+ self.aux.rpc_client
+ .set_local_handler(self.aux.system.id, move |msg| {
let self2 = self2.clone();
async move { self2.handle(&msg).await }
});
@@ -336,157 +306,24 @@ where
async fn handle(self: &Arc<Self>, msg: &TableRPC<F>) -> Result<TableRPC<F>, Error> {
match msg {
TableRPC::ReadEntry(key, sort_key) => {
- let value = self.handle_read_entry(key, sort_key)?;
+ let value = self.data.read_entry(key, sort_key)?;
Ok(TableRPC::ReadEntryResponse(value))
}
TableRPC::ReadRange(key, begin_sort_key, filter, limit) => {
- let values = self.handle_read_range(key, begin_sort_key, filter, *limit)?;
+ let values = self.data.read_range(key, begin_sort_key, filter, *limit)?;
Ok(TableRPC::Update(values))
}
TableRPC::Update(pairs) => {
- self.handle_update(pairs)?;
+ self.data.update_many(pairs)?;
Ok(TableRPC::Ok)
}
TableRPC::SyncRPC(rpc) => {
- let syncer = self.syncer.load_full().unwrap();
- let response = syncer
- .handle_rpc(rpc, self.system.background.stop_signal.clone())
+ let response = self.syncer
+ .handle_rpc(rpc, self.aux.system.background.stop_signal.clone())
.await?;
Ok(TableRPC::SyncRPC(response))
}
_ => Err(Error::BadRPC(format!("Unexpected table RPC"))),
}
}
-
- fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> {
- let tree_key = self.tree_key(p, s);
- if let Some(bytes) = self.store.get(&tree_key)? {
- Ok(Some(ByteBuf::from(bytes.to_vec())))
- } else {
- Ok(None)
- }
- }
-
- fn handle_read_range(
- &self,
- p: &F::P,
- s: &Option<F::S>,
- filter: &Option<F::Filter>,
- limit: usize,
- ) -> Result<Vec<Arc<ByteBuf>>, Error> {
- let partition_hash = p.hash();
- let first_key = match s {
- None => partition_hash.to_vec(),
- Some(sk) => self.tree_key(p, sk),
- };
- let mut ret = vec![];
- for item in self.store.range(first_key..) {
- let (key, value) = item?;
- if &key[..32] != partition_hash.as_slice() {
- break;
- }
- let keep = match filter {
- None => true,
- Some(f) => {
- let entry = self.decode_entry(value.as_ref())?;
- F::matches_filter(&entry, f)
- }
- };
- if keep {
- ret.push(Arc::new(ByteBuf::from(value.as_ref())));
- }
- if ret.len() >= limit {
- break;
- }
- }
- Ok(ret)
- }
-
- // ========== CODE THAT ACTUALLY MODIFIES THE TREE ================
-
- pub fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> {
- for update_bytes in entries.iter() {
- self.update_entry(update_bytes.as_slice())?;
- }
- Ok(())
- }
-
- pub(crate) fn update_entry(self: &Arc<Self>, update_bytes: &[u8]) -> Result<(), Error> {
- let update = self.decode_entry(update_bytes)?;
- let tree_key = self.tree_key(update.partition_key(), update.sort_key());
-
- let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| {
- let (old_entry, new_entry) = match db.get(&tree_key)? {
- Some(prev_bytes) => {
- let old_entry = self
- .decode_entry(&prev_bytes)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- let mut new_entry = old_entry.clone();
- new_entry.merge(&update);
- (Some(old_entry), new_entry)
- }
- None => (None, update.clone()),
- };
-
- if Some(&new_entry) != old_entry.as_ref() {
- let new_bytes = rmp_to_vec_all_named(&new_entry)
- .map_err(Error::RMPEncode)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?;
- db.insert(tree_key.clone(), new_bytes)?;
- Ok(Some((old_entry, new_entry)))
- } else {
- Ok(None)
- }
- })?;
-
- if let Some((old_entry, new_entry)) = changed {
- self.instance.updated(old_entry, Some(new_entry));
- self.syncer.load_full().unwrap().invalidate(&tree_key[..]);
- }
-
- Ok(())
- }
-
- pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
- let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| {
- if let Some(cur_v) = txn.get(k)? {
- if cur_v == v {
- txn.remove(k)?;
- mkl_todo.insert(k, vec![])?;
- return Ok(true);
- }
- }
- Ok(false)
- })?;
-
- if removed {
- let old_entry = self.decode_entry(v)?;
- self.instance.updated(Some(old_entry), None);
- self.syncer.load_full().unwrap().invalidate(k);
- }
- Ok(removed)
- }
-
- fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
- let mut ret = p.hash().to_vec();
- ret.extend(s.sort_key());
- ret
- }
-
- fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
- match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
- Ok(x) => Ok(x),
- Err(e) => match F::try_migrate(bytes) {
- Some(x) => Ok(x),
- None => {
- warn!("Unable to decode entry of {}: {}", self.name, e);
- for line in hexdump::hexdump_iter(bytes) {
- debug!("{}", line);
- }
- Err(e.into())
- }
- },
- }
- }
}
diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs
index 51f8cd6f..7394be1b 100644
--- a/src/table/table_sync.rs
+++ b/src/table/table_sync.rs
@@ -16,18 +16,22 @@ use garage_util::data::*;
use garage_util::error::Error;
use crate::*;
+use crate::data::*;
+use crate::replication::*;
const MAX_DEPTH: usize = 16;
+
const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
-// Scan & sync every 12 hours
-const SCAN_INTERVAL: Duration = Duration::from_secs(12 * 60 * 60);
+// Do anti-entropy every 10 minutes
+const SCAN_INTERVAL: Duration = Duration::from_secs(10 * 60);
-// Expire cache after 30 minutes
-const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(30 * 60);
+const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(10 * 60);
pub struct TableSyncer<F: TableSchema, R: TableReplication> {
- table: Arc<Table<F, R>>,
+ data: Arc<TableData<F>>,
+ aux: Arc<TableAux<F, R>>,
+
todo: Mutex<SyncTodo>,
cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksumCache>>>,
}
@@ -106,10 +110,13 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- pub(crate) fn launch(table: Arc<Table<F, R>>) -> Arc<Self> {
- let todo = SyncTodo { todo: Vec::new() };
- let syncer = Arc::new(TableSyncer {
- table: table.clone(),
+ pub(crate) fn launch(data: Arc<TableData<F>>,
+ aux: Arc<TableAux<F, R>>) -> Arc<Self> {
+ let todo = SyncTodo{ todo: vec![] };
+
+ let syncer = Arc::new(Self {
+ data: data.clone(),
+ aux: aux.clone(),
todo: Mutex::new(todo),
cache: (0..MAX_DEPTH)
.map(|_| Mutex::new(BTreeMap::new()))
@@ -119,21 +126,21 @@ where
let (busy_tx, busy_rx) = mpsc::unbounded_channel();
let s1 = syncer.clone();
- table.system.background.spawn_worker(
- format!("table sync watcher for {}", table.name),
+ aux.system.background.spawn_worker(
+ format!("table sync watcher for {}", data.name),
move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
);
let s2 = syncer.clone();
- table.system.background.spawn_worker(
- format!("table syncer for {}", table.name),
+ aux.system.background.spawn_worker(
+ format!("table syncer for {}", data.name),
move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
);
let s3 = syncer.clone();
tokio::spawn(async move {
tokio::time::delay_for(Duration::from_secs(20)).await;
- s3.add_full_scan().await;
+ s3.add_full_scan();
});
syncer
@@ -144,8 +151,8 @@ where
mut must_exit: watch::Receiver<bool>,
mut busy_rx: mpsc::UnboundedReceiver<bool>,
) -> Result<(), Error> {
- let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone();
- let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone();
+ let mut prev_ring: Arc<Ring> = self.aux.system.ring.borrow().clone();
+ let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone();
let mut nothing_to_do_since = Some(Instant::now());
while !*must_exit.borrow() {
@@ -158,8 +165,8 @@ where
select! {
new_ring_r = s_ring_recv => {
if let Some(new_ring) = new_ring_r {
- debug!("({}) Adding ring difference to syncer todo list", self.table.name);
- self.todo.lock().unwrap().add_ring_difference(&self.table, &prev_ring, &new_ring);
+ debug!("({}) Adding ring difference to syncer todo list", self.data.name);
+ self.todo.lock().unwrap().add_ring_difference(&prev_ring, &new_ring, &self.data, &self.aux);
prev_ring = new_ring;
}
}
@@ -182,8 +189,8 @@ where
_ = s_timeout => {
if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) {
nothing_to_do_since = None;
- debug!("({}) Adding full scan to syncer todo list", self.table.name);
- self.add_full_scan().await;
+ debug!("({}) Adding full scan to syncer todo list", self.data.name);
+ self.add_full_scan();
}
}
}
@@ -191,8 +198,8 @@ where
Ok(())
}
- pub async fn add_full_scan(&self) {
- self.todo.lock().unwrap().add_full_scan(&self.table);
+ pub fn add_full_scan(&self) {
+ self.todo.lock().unwrap().add_full_scan(&self.data, &self.aux);
}
async fn syncer_task(
@@ -211,7 +218,7 @@ where
if let Err(e) = res {
warn!(
"({}) Error while syncing {:?}: {}",
- self.table.name, partition, e
+ self.data.name, partition, e
);
}
} else {
@@ -228,18 +235,18 @@ where
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
if partition.retain {
- let my_id = self.table.system.id;
+ let my_id = self.aux.system.id;
let nodes = self
- .table
+ .aux
.replication
- .write_nodes(&partition.begin, &self.table.system)
+ .write_nodes(&partition.begin, &self.aux.system)
.into_iter()
.filter(|node| *node != my_id)
.collect::<Vec<_>>();
debug!(
"({}) Preparing to sync {:?} with {:?}...",
- self.table.name, partition, nodes
+ self.data.name, partition, nodes
);
let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit)?;
@@ -259,10 +266,10 @@ where
while let Some(r) = sync_futures.next().await {
if let Err(e) = r {
n_errors += 1;
- warn!("({}) Sync error: {}", self.table.name, e);
+ warn!("({}) Sync error: {}", self.data.name, e);
}
}
- if n_errors > self.table.replication.max_write_errors() {
+ if n_errors > self.aux.replication.max_write_errors() {
return Err(Error::Message(format!(
"Sync failed with too many nodes (should have been: {:?}).",
nodes
@@ -293,7 +300,7 @@ where
while !*must_exit.borrow() {
let mut items = Vec::new();
- for item in self.table.store.range(begin.to_vec()..end.to_vec()) {
+ for item in self.data.store.range(begin.to_vec()..end.to_vec()) {
let (key, value) = item?;
items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
@@ -304,12 +311,12 @@ where
if items.len() > 0 {
let nodes = self
- .table
+ .aux
.replication
- .write_nodes(&begin, &self.table.system)
+ .write_nodes(&begin, &self.aux.system)
.into_iter()
.collect::<Vec<_>>();
- if nodes.contains(&self.table.system.id) {
+ if nodes.contains(&self.aux.system.id) {
warn!("Interrupting offload as partitions seem to have changed");
break;
}
@@ -340,7 +347,7 @@ where
let update_msg = Arc::new(TableRPC::<F>::Update(values));
for res in join_all(nodes.iter().map(|to| {
- self.table
+ self.aux
.rpc_client
.call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT)
}))
@@ -352,7 +359,7 @@ where
// All remote nodes have written those items, now we can delete them locally
let mut not_removed = 0;
for (k, v) in items.iter() {
- if !self.table.delete_if_equal(&k[..], &v[..])? {
+ if !self.data.delete_if_equal(&k[..], &v[..])? {
not_removed += 1;
}
}
@@ -399,7 +406,7 @@ where
if range.level == 1 {
let mut children = vec![];
for item in self
- .table
+ .data
.store
.range(range.begin.clone()..range.end.clone())
{
@@ -516,7 +523,7 @@ where
let v = self.range_checksum(&range, must_exit)?;
trace!(
"({}) New checksum calculated for {}-{}/{}, {} children",
- self.table.name,
+ self.data.name,
hex::encode(&range.begin)
.chars()
.take(16)
@@ -553,7 +560,7 @@ where
// If their root checksum has level > than us, use that as a reference
let root_cks_resp = self
- .table
+ .aux
.rpc_client
.call(
who,
@@ -582,7 +589,7 @@ where
let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y);
trace!(
"({}) Sync with {:?}: {} ({}) remaining",
- self.table.name,
+ self.data.name,
who,
todo.len(),
total_children
@@ -592,7 +599,7 @@ where
let step = todo.drain(..step_size).collect::<Vec<_>>();
let rpc_resp = self
- .table
+ .aux
.rpc_client
.call(
who,
@@ -606,7 +613,7 @@ where
if diff_ranges.len() > 0 || diff_items.len() > 0 {
info!(
"({}) Sync with {:?}: difference {} ranges, {} items",
- self.table.name,
+ self.data.name,
who,
diff_ranges.len(),
diff_items.len()
@@ -622,7 +629,7 @@ where
}
}
if diff_items.len() > 0 {
- self.table.handle_update(&diff_items[..])?;
+ self.data.update_many(&diff_items[..])?;
}
if items_to_send.len() > 0 {
self.send_items(who, items_to_send).await?;
@@ -640,19 +647,19 @@ where
async fn send_items(&self, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
info!(
"({}) Sending {} items to {:?}",
- self.table.name,
+ self.data.name,
item_list.len(),
who
);
let mut values = vec![];
for item in item_list.iter() {
- if let Some(v) = self.table.store.get(&item[..])? {
+ if let Some(v) = self.data.store.get(&item[..])? {
values.push(Arc::new(ByteBuf::from(v.as_ref())));
}
}
let rpc_resp = self
- .table
+ .aux
.rpc_client
.call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT)
.await?;
@@ -714,7 +721,7 @@ where
ret_ranges.push(their_range.clone());
if their_range.level == 0 {
if let Some(item_bytes) =
- self.table.store.get(their_range.begin.as_slice())?
+ self.data.store.get(their_range.begin.as_slice())?
{
ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
}
@@ -738,7 +745,7 @@ where
}
if our_range.level == 0 {
if let Some(item_bytes) =
- self.table.store.get(our_range.begin.as_slice())?
+ self.data.store.get(our_range.begin.as_slice())?
{
ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
}
@@ -753,7 +760,7 @@ where
if ret_ranges.len() > 0 || ret_items.len() > 0 {
trace!(
"({}) Checksum comparison RPC: {} different + {} items for {} received",
- self.table.name,
+ self.data.name,
ret_ranges.len(),
ret_items.len(),
n_checksums
@@ -782,13 +789,13 @@ where
}
impl SyncTodo {
- fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, table: &Table<F, R>) {
- let my_id = table.system.id;
+ fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, data: &TableData<F>, aux: &TableAux<F, R>) {
+ let my_id = aux.system.id;
self.todo.clear();
- let ring = table.system.ring.borrow().clone();
- let split_points = table.replication.split_points(&ring);
+ let ring = aux.system.ring.borrow().clone();
+ let split_points = aux.replication.split_points(&ring);
for i in 0..split_points.len() - 1 {
let begin = split_points[i];
@@ -797,12 +804,12 @@ impl SyncTodo {
continue;
}
- let nodes = table.replication.replication_nodes(&begin, &ring);
+ let nodes = aux.replication.replication_nodes(&begin, &ring);
let retain = nodes.contains(&my_id);
if !retain {
// Check if we have some data to send, otherwise skip
- if table.store.range(begin..end).next().is_none() {
+ if data.store.range(begin..end).next().is_none() {
continue;
}
}
@@ -813,25 +820,25 @@ impl SyncTodo {
fn add_ring_difference<F: TableSchema, R: TableReplication>(
&mut self,
- table: &Table<F, R>,
old_ring: &Ring,
new_ring: &Ring,
+ data: &TableData<F>, aux: &TableAux<F, R>,
) {
- let my_id = table.system.id;
+ let my_id = aux.system.id;
// If it is us who are entering or leaving the system,
// initiate a full sync instead of incremental sync
if old_ring.config.members.contains_key(&my_id)
!= new_ring.config.members.contains_key(&my_id)
{
- self.add_full_scan(table);
+ self.add_full_scan(data, aux);
return;
}
let mut all_points = None
.into_iter()
- .chain(table.replication.split_points(old_ring).drain(..))
- .chain(table.replication.split_points(new_ring).drain(..))
+ .chain(aux.replication.split_points(old_ring).drain(..))
+ .chain(aux.replication.split_points(new_ring).drain(..))
.chain(self.todo.iter().map(|x| x.begin))
.chain(self.todo.iter().map(|x| x.end))
.collect::<Vec<_>>();
@@ -845,11 +852,11 @@ impl SyncTodo {
for i in 0..all_points.len() - 1 {
let begin = all_points[i];
let end = all_points[i + 1];
- let was_ours = table
+ let was_ours = aux
.replication
.replication_nodes(&begin, &old_ring)
.contains(&my_id);
- let is_ours = table
+ let is_ours = aux
.replication
.replication_nodes(&begin, &new_ring)
.contains(&my_id);