aboutsummaryrefslogtreecommitdiff
path: root/src/table/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/table.rs')
-rw-r--r--src/table/table.rs53
1 files changed, 26 insertions, 27 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index 2ce5868f..f00b4239 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -16,20 +16,17 @@ use garage_rpc::rpc_server::*;
use crate::crdt::CRDT;
use crate::data::*;
use crate::gc::*;
+use crate::merkle::*;
use crate::replication::*;
use crate::schema::*;
use crate::sync::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
-pub struct TableAux<R: TableReplication> {
- pub system: Arc<System>,
- pub replication: R,
-}
-
pub struct Table<F: TableSchema, R: TableReplication> {
- pub data: Arc<TableData<F>>,
- pub aux: Arc<TableAux<R>>,
+ pub system: Arc<System>,
+ pub data: Arc<TableData<F, R>>,
+ pub merkle_updater: Arc<MerkleUpdater<F, R>>,
pub syncer: Arc<TableSyncer<F, R>>,
rpc_client: Arc<RpcClient<TableRPC<F>>>,
}
@@ -67,19 +64,22 @@ where
let rpc_path = format!("table_{}", name);
let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path);
- let data = TableData::new(name, instance, db, system.background.clone());
+ let data = TableData::new(name, instance, replication, db);
- let aux = Arc::new(TableAux {
- system,
- replication,
- });
+ let merkle_updater = MerkleUpdater::launch(data.clone(), system.background.clone());
- let syncer = TableSyncer::launch(data.clone(), aux.clone(), rpc_server);
- TableGC::launch(data.clone(), aux.clone(), rpc_server);
+ let syncer = TableSyncer::launch(
+ system.clone(),
+ data.clone(),
+ merkle_updater.clone(),
+ rpc_server,
+ );
+ TableGC::launch(data.clone(), system.clone(), rpc_server);
let table = Arc::new(Self {
+ system,
data,
- aux,
+ merkle_updater,
syncer,
rpc_client,
});
@@ -91,7 +91,7 @@ where
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
- let who = self.aux.replication.write_nodes(&hash);
+ let who = self.data.replication.write_nodes(&hash);
//eprintln!("insert who: {:?}", who);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
@@ -101,7 +101,7 @@ where
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.aux.replication.write_quorum())
+ RequestStrategy::with_quorum(self.data.replication.write_quorum())
.with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
@@ -113,7 +113,7 @@ where
for entry in entries.iter() {
let hash = entry.partition_key().hash();
- let who = self.aux.replication.write_nodes(&hash);
+ let who = self.data.replication.write_nodes(&hash);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
for node in who {
if !call_list.contains_key(&node) {
@@ -137,7 +137,7 @@ where
errors.push(e);
}
}
- if errors.len() > self.aux.replication.max_write_errors() {
+ if errors.len() > self.data.replication.max_write_errors() {
Err(Error::Message("Too many errors".into()))
} else {
Ok(())
@@ -150,7 +150,7 @@ where
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash();
- let who = self.aux.replication.read_nodes(&hash);
+ let who = self.data.replication.read_nodes(&hash);
//eprintln!("get who: {:?}", who);
let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
@@ -159,7 +159,7 @@ where
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.aux.replication.read_quorum())
+ RequestStrategy::with_quorum(self.data.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
@@ -190,8 +190,7 @@ where
if not_all_same {
let self2 = self.clone();
let ent2 = ret_entry.clone();
- self.aux
- .system
+ self.system
.background
.spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
}
@@ -207,7 +206,7 @@ where
limit: usize,
) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash();
- let who = self.aux.replication.read_nodes(&hash);
+ let who = self.data.replication.read_nodes(&hash);
let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
@@ -216,7 +215,7 @@ where
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.aux.replication.read_quorum())
+ RequestStrategy::with_quorum(self.data.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
@@ -248,7 +247,7 @@ where
}
if !to_repair.is_empty() {
let self2 = self.clone();
- self.aux.system.background.spawn_cancellable(async move {
+ self.system.background.spawn_cancellable(async move {
for (_, v) in to_repair.iter_mut() {
self2.repair_on_read(&who[..], v.take().unwrap()).await?;
}
@@ -288,7 +287,7 @@ where
let self2 = self.clone();
self.rpc_client
- .set_local_handler(self.aux.system.id, move |msg| {
+ .set_local_handler(self.system.id, move |msg| {
let self2 = self2.clone();
async move { self2.handle(&msg).await }
});