diff options
Diffstat (limited to 'src/table.rs')
-rw-r--r-- | src/table.rs | 21 |
1 files changed, 19 insertions, 2 deletions
diff --git a/src/table.rs b/src/table.rs index 533b4291..99ac77bb 100644 --- a/src/table.rs +++ b/src/table.rs @@ -4,6 +4,7 @@ use std::time::Duration; use async_trait::async_trait; use futures::stream::*; +use tokio::sync::RwLock; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -12,7 +13,7 @@ use crate::error::Error; use crate::membership::System; use crate::proto::*; use crate::rpc_client::*; -use crate::table_sync::TableSyncer; +use crate::table_sync::*; pub struct Table<F: TableSchema> { pub instance: F, @@ -21,6 +22,7 @@ pub struct Table<F: TableSchema> { pub system: Arc<System>, pub store: sled::Tree, + pub syncer: RwLock<Option<Arc<TableSyncer<F>>>>, pub param: TableReplicationParams, } @@ -59,6 +61,9 @@ pub enum TableRPC<F: TableSchema> { ReadEntryResponse(Option<ByteBuf>), Update(Vec<Arc<ByteBuf>>), + + SyncChecksums(Vec<RangeChecksum>), + SyncDifferentSet(Vec<SyncRange>), } pub trait PartitionKey { @@ -132,8 +137,10 @@ impl<F: TableSchema + 'static> Table<F> { system, store, param, + syncer: RwLock::new(None), }); - TableSyncer::launch(table.clone()).await; + let syncer = TableSyncer::launch(table.clone()).await; + *table.syncer.write().await = Some(syncer); table } @@ -309,6 +316,11 @@ impl<F: TableSchema + 'static> Table<F> { self.handle_update(pairs).await?; Ok(TableRPC::Ok) } + TableRPC::SyncChecksums(checksums) => { + let syncer = self.syncer.read().await.as_ref().unwrap().clone(); + let differing = syncer.handle_checksum_rpc(&checksums[..], self.system.background.stop_signal.clone()).await?; + Ok(TableRPC::SyncDifferentSet(differing)) + } _ => Err(Error::RPCError(format!("Unexpected table RPC"))), } } @@ -353,6 +365,11 @@ impl<F: TableSchema + 'static> Table<F> { Ok(()) } + pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> { + // TODO + Ok(()) + } + fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { let mut ret = p.hash().to_vec(); ret.extend(s.sort_key()); |