aboutsummaryrefslogtreecommitdiff
path: root/src/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table.rs')
-rw-r--r--src/table.rs21
1 files changed, 19 insertions, 2 deletions
diff --git a/src/table.rs b/src/table.rs
index 533b4291..99ac77bb 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -4,6 +4,7 @@ use std::time::Duration;
use async_trait::async_trait;
use futures::stream::*;
+use tokio::sync::RwLock;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@@ -12,7 +13,7 @@ use crate::error::Error;
use crate::membership::System;
use crate::proto::*;
use crate::rpc_client::*;
-use crate::table_sync::TableSyncer;
+use crate::table_sync::*;
pub struct Table<F: TableSchema> {
pub instance: F,
@@ -21,6 +22,7 @@ pub struct Table<F: TableSchema> {
pub system: Arc<System>,
pub store: sled::Tree,
+ pub syncer: RwLock<Option<Arc<TableSyncer<F>>>>,
pub param: TableReplicationParams,
}
@@ -59,6 +61,9 @@ pub enum TableRPC<F: TableSchema> {
ReadEntryResponse(Option<ByteBuf>),
Update(Vec<Arc<ByteBuf>>),
+
+ SyncChecksums(Vec<RangeChecksum>),
+ SyncDifferentSet(Vec<SyncRange>),
}
pub trait PartitionKey {
@@ -132,8 +137,10 @@ impl<F: TableSchema + 'static> Table<F> {
system,
store,
param,
+ syncer: RwLock::new(None),
});
- TableSyncer::launch(table.clone()).await;
+ let syncer = TableSyncer::launch(table.clone()).await;
+ *table.syncer.write().await = Some(syncer);
table
}
@@ -309,6 +316,11 @@ impl<F: TableSchema + 'static> Table<F> {
self.handle_update(pairs).await?;
Ok(TableRPC::Ok)
}
+ TableRPC::SyncChecksums(checksums) => {
+ let syncer = self.syncer.read().await.as_ref().unwrap().clone();
+ let differing = syncer.handle_checksum_rpc(&checksums[..], self.system.background.stop_signal.clone()).await?;
+ Ok(TableRPC::SyncDifferentSet(differing))
+ }
_ => Err(Error::RPCError(format!("Unexpected table RPC"))),
}
}
@@ -353,6 +365,11 @@ impl<F: TableSchema + 'static> Table<F> {
Ok(())
}
+ pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> {
+ // TODO
+ Ok(())
+ }
+
fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
let mut ret = p.hash().to_vec();
ret.extend(s.sort_key());