diff options
Diffstat (limited to 'src/table.rs')
-rw-r--r-- | src/table.rs | 35 |
1 files changed, 18 insertions, 17 deletions
diff --git a/src/table.rs b/src/table.rs index 69d818c2..533b4291 100644 --- a/src/table.rs +++ b/src/table.rs @@ -12,6 +12,7 @@ use crate::error::Error; use crate::membership::System; use crate::proto::*; use crate::rpc_client::*; +use crate::table_sync::TableSyncer; pub struct Table<F: TableSchema> { pub instance: F, @@ -20,7 +21,6 @@ pub struct Table<F: TableSchema> { pub system: Arc<System>, pub store: sled::Tree, - pub partitions: Vec<Partition>, pub param: TableReplicationParams, } @@ -61,12 +61,6 @@ pub enum TableRPC<F: TableSchema> { Update(Vec<Arc<ByteBuf>>), } -pub struct Partition { - pub begin: Hash, - pub end: Hash, - pub other_nodes: Vec<UUID>, -} - pub trait PartitionKey { fn hash(&self) -> Hash; } @@ -124,7 +118,7 @@ pub trait TableSchema: Send + Sync { } impl<F: TableSchema + 'static> Table<F> { - pub fn new( + pub async fn new( instance: F, system: Arc<System>, db: &sled::Db, @@ -132,14 +126,15 @@ impl<F: TableSchema + 'static> Table<F> { param: TableReplicationParams, ) -> Arc<Self> { let store = db.open_tree(&name).expect("Unable to open DB tree"); - Arc::new(Self { + let table = Arc::new(Self { instance, name, system, store, - partitions: Vec::new(), param, - }) + }); + TableSyncer::launch(table.clone()).await; + table } pub fn rpc_handler(self: Arc<Self>) -> Box<dyn TableRpcHandler + Send + Sync> { @@ -207,7 +202,11 @@ impl<F: TableSchema + 'static> Table<F> { } } - pub async fn get(&self, partition_key: &F::P, sort_key: &F::S) -> Result<Option<F::E>, Error> { + pub async fn get( + self: &Arc<Self>, + partition_key: &F::P, + sort_key: &F::S, + ) -> Result<Option<F::E>, Error> { let hash = partition_key.hash(); let who = self .system @@ -245,17 +244,19 @@ impl<F: TableSchema + 'static> Table<F> { } if let Some(ret_entry) = &ret { if not_all_same { - let _: Result<_, _> = self.repair_on_read(&who[..], &ret_entry).await; + self.system + .background + .spawn(self.clone().repair_on_read(who, ret_entry.clone())); } } Ok(ret) } - async fn repair_on_read(&self, who: &[UUID], what: &F::E) -> Result<(), Error> { - let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(what)?)); + async fn repair_on_read(self: Arc<Self>, who: Vec<UUID>, what: F::E) -> Result<(), Error> { + let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); self.rpc_try_call_many(&who[..], &TableRPC::<F>::Update(vec![what_enc]), who.len()) - .await - .map(|_| ()) + .await?; + Ok(()) } async fn rpc_try_call_many( |