aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-16 11:43:58 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-16 11:43:58 +0100
commit515029d026937d29395379c76188f509984b8ace (patch)
tree0a89cd87079f330d1021a1954a1328654b236e65 /src/table
parent1d9961e4118af0e26068e1d6c5c6c009a1292a88 (diff)
downloadgarage-515029d026937d29395379c76188f509984b8ace.tar.gz
garage-515029d026937d29395379c76188f509984b8ace.zip
Refactor code
Diffstat (limited to 'src/table')
-rw-r--r--src/table/data.rs136
-rw-r--r--src/table/gc.rs22
-rw-r--r--src/table/merkle.rs61
-rw-r--r--src/table/sync.rs58
-rw-r--r--src/table/table.rs53
5 files changed, 167 insertions, 163 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index 0029b936..9aa2a3bc 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -4,62 +4,59 @@ use std::sync::Arc;
use log::warn;
use serde_bytes::ByteBuf;
use sled::Transactional;
+use tokio::sync::Notify;
-use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::*;
use crate::crdt::CRDT;
-use crate::merkle::*;
+use crate::replication::*;
use crate::schema::*;
-pub struct TableData<F: TableSchema> {
+pub struct TableData<F: TableSchema, R: TableReplication> {
pub name: String,
- pub instance: F,
+
+ pub(crate) instance: F,
+ pub(crate) replication: R,
pub store: sled::Tree,
+
+ pub(crate) merkle_tree: sled::Tree,
+ pub(crate) merkle_todo: sled::Tree,
+ pub(crate) merkle_todo_notify: Notify,
pub(crate) gc_todo: sled::Tree,
- pub merkle_updater: Arc<MerkleUpdater>,
}
-impl<F> TableData<F>
+impl<F, R> TableData<F, R>
where
F: TableSchema,
+ R: TableReplication,
{
- pub fn new(
- name: String,
- instance: F,
- db: &sled::Db,
- background: Arc<BackgroundRunner>,
- ) -> Arc<Self> {
+ pub fn new(name: String, instance: F, replication: R, db: &sled::Db) -> Arc<Self> {
let store = db
.open_tree(&format!("{}:table", name))
.expect("Unable to open DB tree");
- let merkle_todo_store = db
- .open_tree(&format!("{}:merkle_todo", name))
- .expect("Unable to open DB Merkle TODO tree");
- let merkle_tree_store = db
+ let merkle_tree = db
.open_tree(&format!("{}:merkle_tree", name))
.expect("Unable to open DB Merkle tree tree");
+ let merkle_todo = db
+ .open_tree(&format!("{}:merkle_todo", name))
+ .expect("Unable to open DB Merkle TODO tree");
let gc_todo = db
.open_tree(&format!("{}:gc_todo", name))
.expect("Unable to open DB tree");
- let merkle_updater = MerkleUpdater::launch(
- name.clone(),
- background,
- merkle_todo_store,
- merkle_tree_store,
- );
-
Arc::new(Self {
name,
instance,
+ replication,
store,
+ merkle_tree,
+ merkle_todo,
+ merkle_todo_notify: Notify::new(),
gc_todo,
- merkle_updater,
})
}
@@ -129,37 +126,36 @@ where
let update = self.decode_entry(update_bytes)?;
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
- let changed =
- (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
- let (old_entry, new_entry) = match store.get(&tree_key)? {
- Some(prev_bytes) => {
- let old_entry = self
- .decode_entry(&prev_bytes)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- let mut new_entry = old_entry.clone();
- new_entry.merge(&update);
- (Some(old_entry), new_entry)
- }
- None => (None, update.clone()),
- };
-
- if Some(&new_entry) != old_entry.as_ref() {
- let new_bytes = rmp_to_vec_all_named(&new_entry)
- .map_err(Error::RMPEncode)
+ let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
+ let (old_entry, new_entry) = match store.get(&tree_key)? {
+ Some(prev_bytes) => {
+ let old_entry = self
+ .decode_entry(&prev_bytes)
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- let new_bytes_hash = blake2sum(&new_bytes[..]);
- mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?;
- store.insert(tree_key.clone(), new_bytes)?;
- Ok(Some((old_entry, new_entry, new_bytes_hash)))
- } else {
- Ok(None)
+ let mut new_entry = old_entry.clone();
+ new_entry.merge(&update);
+ (Some(old_entry), new_entry)
}
- })?;
+ None => (None, update.clone()),
+ };
+
+ if Some(&new_entry) != old_entry.as_ref() {
+ let new_bytes = rmp_to_vec_all_named(&new_entry)
+ .map_err(Error::RMPEncode)
+ .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ let new_bytes_hash = blake2sum(&new_bytes[..]);
+ mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?;
+ store.insert(tree_key.clone(), new_bytes)?;
+ Ok(Some((old_entry, new_entry, new_bytes_hash)))
+ } else {
+ Ok(None)
+ }
+ })?;
if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
let is_tombstone = new_entry.is_tombstone();
self.instance.updated(old_entry, Some(new_entry));
- self.merkle_updater.todo_notify.notify_one();
+ self.merkle_todo_notify.notify_one();
if is_tombstone {
self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?;
}
@@ -169,22 +165,21 @@ where
}
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
- let removed =
- (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
- if let Some(cur_v) = store.get(k)? {
- if cur_v == v {
- store.remove(k)?;
- mkl_todo.insert(k, vec![])?;
- return Ok(true);
- }
+ let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
+ if let Some(cur_v) = store.get(k)? {
+ if cur_v == v {
+ store.remove(k)?;
+ mkl_todo.insert(k, vec![])?;
+ return Ok(true);
}
- Ok(false)
- })?;
+ }
+ Ok(false)
+ })?;
if removed {
let old_entry = self.decode_entry(v)?;
self.instance.updated(Some(old_entry), None);
- self.merkle_updater.todo_notify.notify_one();
+ self.merkle_todo_notify.notify_one();
}
Ok(removed)
}
@@ -194,22 +189,21 @@ where
k: &[u8],
vhash: Hash,
) -> Result<bool, Error> {
- let removed =
- (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
- if let Some(cur_v) = store.get(k)? {
- if blake2sum(&cur_v[..]) == vhash {
- store.remove(k)?;
- mkl_todo.insert(k, vec![])?;
- return Ok(Some(cur_v));
- }
+ let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
+ if let Some(cur_v) = store.get(k)? {
+ if blake2sum(&cur_v[..]) == vhash {
+ store.remove(k)?;
+ mkl_todo.insert(k, vec![])?;
+ return Ok(Some(cur_v));
}
- Ok(None)
- })?;
+ }
+ Ok(None)
+ })?;
if let Some(old_v) = removed {
let old_entry = self.decode_entry(&old_v[..])?;
self.instance.updated(Some(old_entry), None);
- self.merkle_updater.todo_notify.notify_one();
+ self.merkle_todo_notify.notify_one();
Ok(true)
} else {
Ok(false)
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 061c5045..d99e3e40 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -13,20 +13,20 @@ use tokio::sync::watch;
use garage_util::data::*;
use garage_util::error::Error;
+use garage_rpc::membership::System;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
use crate::data::*;
use crate::replication::*;
use crate::schema::*;
-use crate::table::*;
const TABLE_GC_BATCH_SIZE: usize = 1024;
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
pub struct TableGC<F: TableSchema, R: TableReplication> {
- data: Arc<TableData<F>>,
- aux: Arc<TableAux<R>>,
+ data: Arc<TableData<F, R>>,
+ system: Arc<System>,
rpc_client: Arc<RpcClient<GcRPC>>,
}
@@ -46,23 +46,23 @@ where
R: TableReplication + 'static,
{
pub(crate) fn launch(
- data: Arc<TableData<F>>,
- aux: Arc<TableAux<R>>,
+ data: Arc<TableData<F, R>>,
+ system: Arc<System>,
rpc_server: &mut RpcServer,
) -> Arc<Self> {
let rpc_path = format!("table_{}/gc", data.name);
- let rpc_client = aux.system.rpc_client::<GcRPC>(&rpc_path);
+ let rpc_client = system.rpc_client::<GcRPC>(&rpc_path);
let gc = Arc::new(Self {
data: data.clone(),
- aux: aux.clone(),
+ system: system.clone(),
rpc_client,
});
gc.register_handler(rpc_server, rpc_path);
let gc1 = gc.clone();
- aux.system.background.spawn_worker(
+ system.background.spawn_worker(
format!("GC loop for {}", data.name),
move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit),
);
@@ -130,8 +130,8 @@ where
let mut partitions = HashMap::new();
for (k, vhash, v) in entries {
let pkh = Hash::try_from(&k[..32]).unwrap();
- let mut nodes = self.aux.replication.write_nodes(&pkh);
- nodes.retain(|x| *x != self.aux.system.id);
+ let mut nodes = self.data.replication.write_nodes(&pkh);
+ nodes.retain(|x| *x != self.system.id);
nodes.sort();
if !partitions.contains_key(&nodes) {
@@ -220,7 +220,7 @@ where
let self2 = self.clone();
self.rpc_client
- .set_local_handler(self.aux.system.id, move |msg| {
+ .set_local_handler(self.system.id, move |msg| {
let self2 = self2.clone();
async move { self2.handle_rpc(&msg).await }
});
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 60b7833f..8c3dcad9 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -9,12 +9,16 @@ use serde::{Deserialize, Serialize};
use sled::transaction::{
ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree,
};
-use tokio::sync::{watch, Notify};
+use tokio::sync::watch;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
+use crate::data::*;
+use crate::replication::*;
+use crate::schema::*;
+
pub type MerklePartition = [u8; 2];
pub fn hash_of_merkle_partition(p: MerklePartition) -> Hash {
@@ -32,28 +36,30 @@ pub fn hash_of_merkle_partition_opt(p: Option<MerklePartition>) -> Hash {
// 16 bits (two bytes) of item's partition keys' hashes.
// It builds one Merkle tree for each of these 2**16 partitions.
-pub struct MerkleUpdater {
- table_name: String,
+pub struct MerkleUpdater<F: TableSchema, R: TableReplication> {
+ data: Arc<TableData<F, R>>,
background: Arc<BackgroundRunner>,
// Content of the todo tree: items where
// - key = the key of an item in the main table, ie hash(partition_key)+sort_key
// - value = the hash of the full serialized item, if present,
// or an empty vec if item is absent (deleted)
- pub(crate) todo: sled::Tree,
- pub(crate) todo_notify: Notify,
+ // Fields in data:
+ // pub(crate) merkle_todo: sled::Tree,
+ // pub(crate) merkle_todo_notify: Notify,
// Content of the merkle tree: items where
// - key = .bytes() for MerkleNodeKey
// - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found
- pub(crate) merkle_tree: sled::Tree,
+ // Field in data:
+ // pub(crate) merkle_tree: sled::Tree,
empty_node_hash: Hash,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MerkleNodeKey {
// partition: first 16 bits (two bytes) of the partition_key's hash
- pub partition: MerklePartition,
+ pub partition: [u8; 2],
// prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key)
#[serde(with = "serde_bytes")]
@@ -74,27 +80,26 @@ pub enum MerkleNode {
Leaf(Vec<u8>, Hash),
}
-impl MerkleUpdater {
+impl<F, R> MerkleUpdater<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
pub(crate) fn launch(
- table_name: String,
+ data: Arc<TableData<F, R>>,
background: Arc<BackgroundRunner>,
- todo: sled::Tree,
- merkle_tree: sled::Tree,
) -> Arc<Self> {
let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
let ret = Arc::new(Self {
- table_name,
+ data,
background,
- todo,
- todo_notify: Notify::new(),
- merkle_tree,
empty_node_hash,
});
let ret2 = ret.clone();
ret.background.spawn_worker(
- format!("Merkle tree updater for {}", ret.table_name),
+ format!("Merkle tree updater for {}", ret.data.name),
|must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit),
);
@@ -103,27 +108,27 @@ impl MerkleUpdater {
async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() {
- if let Some(x) = self.todo.iter().next() {
+ if let Some(x) = self.data.merkle_todo.iter().next() {
match x {
Ok((key, valhash)) => {
if let Err(e) = self.update_item(&key[..], &valhash[..]) {
warn!(
"({}) Error while updating Merkle tree item: {}",
- self.table_name, e
+ self.data.name, e
);
}
}
Err(e) => {
warn!(
"({}) Error while iterating on Merkle todo tree: {}",
- self.table_name, e
+ self.data.name, e
);
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
} else {
select! {
- _ = self.todo_notify.notified().fuse() => (),
+ _ = self.data.merkle_todo_notify.notified().fuse() => (),
_ = must_exit.changed().fuse() => (),
}
}
@@ -143,18 +148,20 @@ impl MerkleUpdater {
partition: k[0..2].try_into().unwrap(),
prefix: vec![],
};
- self.merkle_tree
+ self.data
+ .merkle_tree
.transaction(|tx| self.update_item_rec(tx, k, khash, &key, new_vhash))?;
let deleted = self
- .todo
+ .data
+ .merkle_todo
.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)?
.is_ok();
if !deleted {
debug!(
"({}) Item not deleted from Merkle todo because it changed: {:?}",
- self.table_name, k
+ self.data.name, k
);
}
Ok(())
@@ -197,7 +204,7 @@ impl MerkleUpdater {
// should not happen
warn!(
"({}) Replacing intermediate node with empty node, should not happen.",
- self.table_name
+ self.data.name
);
Some(MerkleNode::Empty)
} else if children.len() == 1 {
@@ -301,7 +308,7 @@ impl MerkleUpdater {
// Access a node in the Merkle tree, used by the sync protocol
pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result<MerkleNode, Error> {
- let ent = self.merkle_tree.get(k.encode())?;
+ let ent = self.data.merkle_tree.get(k.encode())?;
match ent {
None => Ok(MerkleNode::Empty),
Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),
@@ -309,11 +316,11 @@ impl MerkleUpdater {
}
pub fn merkle_tree_len(&self) -> usize {
- self.merkle_tree.len()
+ self.data.merkle_tree.len()
}
pub fn todo_len(&self) -> usize {
- self.todo.len()
+ self.data.merkle_todo.len()
}
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
index ac0305e2..9c148393 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -14,6 +14,7 @@ use tokio::sync::{mpsc, watch};
use garage_util::data::*;
use garage_util::error::Error;
+use garage_rpc::membership::System;
use garage_rpc::ring::Ring;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
@@ -29,8 +30,9 @@ const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
pub struct TableSyncer<F: TableSchema, R: TableReplication> {
- data: Arc<TableData<F>>,
- aux: Arc<TableAux<R>>,
+ system: Arc<System>,
+ data: Arc<TableData<F, R>>,
+ merkle: Arc<MerkleUpdater<F, R>>,
todo: Mutex<SyncTodo>,
rpc_client: Arc<RpcClient<SyncRPC>>,
@@ -76,18 +78,20 @@ where
R: TableReplication + 'static,
{
pub(crate) fn launch(
- data: Arc<TableData<F>>,
- aux: Arc<TableAux<R>>,
+ system: Arc<System>,
+ data: Arc<TableData<F, R>>,
+ merkle: Arc<MerkleUpdater<F, R>>,
rpc_server: &mut RpcServer,
) -> Arc<Self> {
let rpc_path = format!("table_{}/sync", data.name);
- let rpc_client = aux.system.rpc_client::<SyncRPC>(&rpc_path);
+ let rpc_client = system.rpc_client::<SyncRPC>(&rpc_path);
let todo = SyncTodo { todo: vec![] };
let syncer = Arc::new(Self {
+ system: system.clone(),
data: data.clone(),
- aux: aux.clone(),
+ merkle,
todo: Mutex::new(todo),
rpc_client,
});
@@ -97,13 +101,13 @@ where
let (busy_tx, busy_rx) = mpsc::unbounded_channel();
let s1 = syncer.clone();
- aux.system.background.spawn_worker(
+ system.background.spawn_worker(
format!("table sync watcher for {}", data.name),
move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
);
let s2 = syncer.clone();
- aux.system.background.spawn_worker(
+ system.background.spawn_worker(
format!("table syncer for {}", data.name),
move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
);
@@ -126,7 +130,7 @@ where
let self2 = self.clone();
self.rpc_client
- .set_local_handler(self.aux.system.id, move |msg| {
+ .set_local_handler(self.system.id, move |msg| {
let self2 = self2.clone();
async move { self2.handle_rpc(&msg).await }
});
@@ -137,8 +141,8 @@ where
mut must_exit: watch::Receiver<bool>,
mut busy_rx: mpsc::UnboundedReceiver<bool>,
) {
- let mut prev_ring: Arc<Ring> = self.aux.system.ring.borrow().clone();
- let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone();
+ let mut prev_ring: Arc<Ring> = self.system.ring.borrow().clone();
+ let mut ring_recv: watch::Receiver<Arc<Ring>> = self.system.ring.clone();
let mut nothing_to_do_since = Some(Instant::now());
while !*must_exit.borrow() {
@@ -178,7 +182,7 @@ where
self.todo
.lock()
.unwrap()
- .add_full_sync(&self.data, &self.aux);
+ .add_full_sync(&self.data, &self.system);
}
async fn syncer_task(
@@ -213,10 +217,10 @@ where
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
if partition.retain {
- let my_id = self.aux.system.id;
+ let my_id = self.system.id;
let nodes = self
- .aux
+ .data
.replication
.write_nodes(&hash_of_merkle_partition(partition.range.begin))
.into_iter()
@@ -242,7 +246,7 @@ where
warn!("({}) Sync error: {}", self.data.name, e);
}
}
- if n_errors > self.aux.replication.max_write_errors() {
+ if n_errors > self.data.replication.max_write_errors() {
return Err(Error::Message(format!(
"Sync failed with too many nodes (should have been: {:?}).",
nodes
@@ -288,19 +292,19 @@ where
if items.len() > 0 {
let nodes = self
- .aux
+ .data
.replication
.write_nodes(&begin)
.into_iter()
.collect::<Vec<_>>();
- if nodes.contains(&self.aux.system.id) {
+ if nodes.contains(&self.system.id) {
warn!(
"({}) Interrupting offload as partitions seem to have changed",
self.data.name
);
break;
}
- if nodes.len() < self.aux.replication.write_quorum() {
+ if nodes.len() < self.data.replication.write_quorum() {
return Err(Error::Message(format!(
"Not offloading as we don't have a quorum of nodes to write to."
)));
@@ -376,7 +380,7 @@ where
partition: u16::to_be_bytes(i),
prefix: vec![],
};
- match self.data.merkle_updater.read_node(&key)? {
+ match self.merkle.read_node(&key)? {
MerkleNode::Empty => (),
x => {
ret.push((key.partition, hash_of(&x)?));
@@ -458,7 +462,7 @@ where
while !todo.is_empty() && !*must_exit.borrow() {
let key = todo.pop_front().unwrap();
- let node = self.data.merkle_updater.read_node(&key)?;
+ let node = self.merkle.read_node(&key)?;
match node {
MerkleNode::Empty => {
@@ -570,7 +574,7 @@ where
}
}
SyncRPC::GetNode(k) => {
- let node = self.data.merkle_updater.read_node(&k)?;
+ let node = self.merkle.read_node(&k)?;
Ok(SyncRPC::Node(k.clone(), node))
}
SyncRPC::Items(items) => {
@@ -585,15 +589,15 @@ where
impl SyncTodo {
fn add_full_sync<F: TableSchema, R: TableReplication>(
&mut self,
- data: &TableData<F>,
- aux: &TableAux<R>,
+ data: &TableData<F, R>,
+ system: &System,
) {
- let my_id = aux.system.id;
+ let my_id = system.id;
self.todo.clear();
- let ring = aux.system.ring.borrow().clone();
- let split_points = aux.replication.split_points(&ring);
+ let ring = system.ring.borrow().clone();
+ let split_points = data.replication.split_points(&ring);
for i in 0..split_points.len() {
let begin: MerklePartition = {
@@ -613,7 +617,7 @@ impl SyncTodo {
let begin_hash = hash_of_merkle_partition(begin);
let end_hash = hash_of_merkle_partition_opt(end);
- let nodes = aux.replication.write_nodes(&begin_hash);
+ let nodes = data.replication.write_nodes(&begin_hash);
let retain = nodes.contains(&my_id);
if !retain {
diff --git a/src/table/table.rs b/src/table/table.rs
index 2ce5868f..f00b4239 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -16,20 +16,17 @@ use garage_rpc::rpc_server::*;
use crate::crdt::CRDT;
use crate::data::*;
use crate::gc::*;
+use crate::merkle::*;
use crate::replication::*;
use crate::schema::*;
use crate::sync::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
-pub struct TableAux<R: TableReplication> {
- pub system: Arc<System>,
- pub replication: R,
-}
-
pub struct Table<F: TableSchema, R: TableReplication> {
- pub data: Arc<TableData<F>>,
- pub aux: Arc<TableAux<R>>,
+ pub system: Arc<System>,
+ pub data: Arc<TableData<F, R>>,
+ pub merkle_updater: Arc<MerkleUpdater<F, R>>,
pub syncer: Arc<TableSyncer<F, R>>,
rpc_client: Arc<RpcClient<TableRPC<F>>>,
}
@@ -67,19 +64,22 @@ where
let rpc_path = format!("table_{}", name);
let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path);
- let data = TableData::new(name, instance, db, system.background.clone());
+ let data = TableData::new(name, instance, replication, db);
- let aux = Arc::new(TableAux {
- system,
- replication,
- });
+ let merkle_updater = MerkleUpdater::launch(data.clone(), system.background.clone());
- let syncer = TableSyncer::launch(data.clone(), aux.clone(), rpc_server);
- TableGC::launch(data.clone(), aux.clone(), rpc_server);
+ let syncer = TableSyncer::launch(
+ system.clone(),
+ data.clone(),
+ merkle_updater.clone(),
+ rpc_server,
+ );
+ TableGC::launch(data.clone(), system.clone(), rpc_server);
let table = Arc::new(Self {
+ system,
data,
- aux,
+ merkle_updater,
syncer,
rpc_client,
});
@@ -91,7 +91,7 @@ where
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
- let who = self.aux.replication.write_nodes(&hash);
+ let who = self.data.replication.write_nodes(&hash);
//eprintln!("insert who: {:?}", who);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
@@ -101,7 +101,7 @@ where
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.aux.replication.write_quorum())
+ RequestStrategy::with_quorum(self.data.replication.write_quorum())
.with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
@@ -113,7 +113,7 @@ where
for entry in entries.iter() {
let hash = entry.partition_key().hash();
- let who = self.aux.replication.write_nodes(&hash);
+ let who = self.data.replication.write_nodes(&hash);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
for node in who {
if !call_list.contains_key(&node) {
@@ -137,7 +137,7 @@ where
errors.push(e);
}
}
- if errors.len() > self.aux.replication.max_write_errors() {
+ if errors.len() > self.data.replication.max_write_errors() {
Err(Error::Message("Too many errors".into()))
} else {
Ok(())
@@ -150,7 +150,7 @@ where
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash();
- let who = self.aux.replication.read_nodes(&hash);
+ let who = self.data.replication.read_nodes(&hash);
//eprintln!("get who: {:?}", who);
let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
@@ -159,7 +159,7 @@ where
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.aux.replication.read_quorum())
+ RequestStrategy::with_quorum(self.data.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
@@ -190,8 +190,7 @@ where
if not_all_same {
let self2 = self.clone();
let ent2 = ret_entry.clone();
- self.aux
- .system
+ self.system
.background
.spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
}
@@ -207,7 +206,7 @@ where
limit: usize,
) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash();
- let who = self.aux.replication.read_nodes(&hash);
+ let who = self.data.replication.read_nodes(&hash);
let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
@@ -216,7 +215,7 @@ where
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.aux.replication.read_quorum())
+ RequestStrategy::with_quorum(self.data.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
@@ -248,7 +247,7 @@ where
}
if !to_repair.is_empty() {
let self2 = self.clone();
- self.aux.system.background.spawn_cancellable(async move {
+ self.system.background.spawn_cancellable(async move {
for (_, v) in to_repair.iter_mut() {
self2.repair_on_read(&who[..], v.take().unwrap()).await?;
}
@@ -288,7 +287,7 @@ where
let self2 = self.clone();
self.rpc_client
- .set_local_handler(self.aux.system.id, move |msg| {
+ .set_local_handler(self.system.id, move |msg| {
let self2 = self2.clone();
async move { self2.handle(&msg).await }
});