aboutsummaryrefslogtreecommitdiff
path: root/src/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table.rs')
-rw-r--r--src/table.rs35
1 files changed, 18 insertions, 17 deletions
diff --git a/src/table.rs b/src/table.rs
index 69d818c2..533b4291 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -12,6 +12,7 @@ use crate::error::Error;
use crate::membership::System;
use crate::proto::*;
use crate::rpc_client::*;
+use crate::table_sync::TableSyncer;
pub struct Table<F: TableSchema> {
pub instance: F,
@@ -20,7 +21,6 @@ pub struct Table<F: TableSchema> {
pub system: Arc<System>,
pub store: sled::Tree,
- pub partitions: Vec<Partition>,
pub param: TableReplicationParams,
}
@@ -61,12 +61,6 @@ pub enum TableRPC<F: TableSchema> {
Update(Vec<Arc<ByteBuf>>),
}
-pub struct Partition {
- pub begin: Hash,
- pub end: Hash,
- pub other_nodes: Vec<UUID>,
-}
-
pub trait PartitionKey {
fn hash(&self) -> Hash;
}
@@ -124,7 +118,7 @@ pub trait TableSchema: Send + Sync {
}
impl<F: TableSchema + 'static> Table<F> {
- pub fn new(
+ pub async fn new(
instance: F,
system: Arc<System>,
db: &sled::Db,
@@ -132,14 +126,15 @@ impl<F: TableSchema + 'static> Table<F> {
param: TableReplicationParams,
) -> Arc<Self> {
let store = db.open_tree(&name).expect("Unable to open DB tree");
- Arc::new(Self {
+ let table = Arc::new(Self {
instance,
name,
system,
store,
- partitions: Vec::new(),
param,
- })
+ });
+ TableSyncer::launch(table.clone()).await;
+ table
}
pub fn rpc_handler(self: Arc<Self>) -> Box<dyn TableRpcHandler + Send + Sync> {
@@ -207,7 +202,11 @@ impl<F: TableSchema + 'static> Table<F> {
}
}
- pub async fn get(&self, partition_key: &F::P, sort_key: &F::S) -> Result<Option<F::E>, Error> {
+ pub async fn get(
+ self: &Arc<Self>,
+ partition_key: &F::P,
+ sort_key: &F::S,
+ ) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash();
let who = self
.system
@@ -245,17 +244,19 @@ impl<F: TableSchema + 'static> Table<F> {
}
if let Some(ret_entry) = &ret {
if not_all_same {
- let _: Result<_, _> = self.repair_on_read(&who[..], &ret_entry).await;
+ self.system
+ .background
+ .spawn(self.clone().repair_on_read(who, ret_entry.clone()));
}
}
Ok(ret)
}
- async fn repair_on_read(&self, who: &[UUID], what: &F::E) -> Result<(), Error> {
- let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(what)?));
+ async fn repair_on_read(self: Arc<Self>, who: Vec<UUID>, what: F::E) -> Result<(), Error> {
+ let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
self.rpc_try_call_many(&who[..], &TableRPC::<F>::Update(vec![what_enc]), who.len())
- .await
- .map(|_| ())
+ .await?;
+ Ok(())
}
async fn rpc_try_call_many(