aboutsummaryrefslogtreecommitdiff
path: root/src/table/sync.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-15 11:05:09 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-22 16:55:24 +0200
commit1b450c4b493dfcb2ee88acbca3ea584beac8eb4b (patch)
treed6437f105a630fa197b67446b5c3b2902335c34a /src/table/sync.rs
parent4067797d0142ee7860aff8da95d65820d6cc0889 (diff)
downloadgarage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.tar.gz
garage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.zip
Improvements to CLI and various fixes for netapp version
Discovery via consul, persist peer list to file
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r--src/table/sync.rs36
1 files changed, 15 insertions, 21 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs
index c5db0987..4fcdc528 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -45,11 +45,10 @@ pub(crate) enum SyncRpc {
Node(MerkleNodeKey, MerkleNode),
Items(Vec<Arc<ByteBuf>>),
Ok,
- Error(String),
}
-impl Message for SyncRpc {
- type Response = SyncRpc;
+impl Rpc for SyncRpc {
+ type Response = Result<SyncRpc, Error>;
}
struct SyncTodo {
@@ -305,7 +304,7 @@ where
async fn offload_items(
self: &Arc<Self>,
items: &[(Vec<u8>, Arc<ByteBuf>)],
- nodes: &[NodeID],
+ nodes: &[Uuid],
) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
@@ -354,7 +353,7 @@ where
async fn do_sync_with(
self: Arc<Self>,
partition: TodoPartition,
- who: NodeID,
+ who: Uuid,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?;
@@ -480,7 +479,7 @@ where
Ok(())
}
- async fn send_items(&self, who: NodeID, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
+ async fn send_items(&self, who: Uuid, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
info!(
"({}) Sending {} items to {:?}",
self.data.name,
@@ -513,9 +512,17 @@ where
)))
}
}
+}
+
+// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
- // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
- async fn handle_rpc(self: &Arc<Self>, message: &SyncRpc) -> Result<SyncRpc, Error> {
+#[async_trait]
+impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> Result<SyncRpc, Error> {
match message {
SyncRpc::RootCkHash(range, h) => {
let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
@@ -535,19 +542,6 @@ where
}
}
-#[async_trait]
-impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> SyncRpc {
- self.handle_rpc(message)
- .await
- .unwrap_or_else(|e| SyncRpc::Error(format!("{}", e)))
- }
-}
-
impl SyncTodo {
fn add_full_sync<F: TableSchema, R: TableReplication>(
&mut self,