aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-17 21:59:07 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-17 21:59:07 +0200
commit3f40ef149f6dd4d61ceb326b5691e186aec178c3 (patch)
treef28af6fdb06cbe9b3f91f54aab9c4077bd83276d
parentf62b54f1dffe9b76aab47d32ba7f47411d5a58af (diff)
downloadgarage-3f40ef149f6dd4d61ceb326b5691e186aec178c3.tar.gz
garage-3f40ef149f6dd4d61ceb326b5691e186aec178c3.zip
Fix sync: use max root checksum level
-rw-r--r--src/api_server.rs10
-rw-r--r--src/block.rs6
-rw-r--r--src/table.rs4
-rw-r--r--src/table_sync.rs171
4 files changed, 112 insertions, 79 deletions
diff --git a/src/api_server.rs b/src/api_server.rs
index 4ae48720..c6d52d16 100644
--- a/src/api_server.rs
+++ b/src/api_server.rs
@@ -86,16 +86,14 @@ async fn handler_inner(
.to_string();
let version_uuid =
handle_put(garage, &mime_type, &bucket, &key, req.into_body()).await?;
- Ok(Response::new(Box::new(BytesBody::from(format!(
- "{:?}\n",
- version_uuid
+ Ok(Response::new(Box::new(BytesBody::from(hex::encode(
+ version_uuid,
)))))
}
&Method::DELETE => {
let version_uuid = handle_delete(garage, &bucket, &key).await?;
- Ok(Response::new(Box::new(BytesBody::from(format!(
- "{:?}\n",
- version_uuid
+ Ok(Response::new(Box::new(BytesBody::from(hex::encode(
+ version_uuid,
)))))
}
_ => Err(Error::BadRequest(format!("Invalid method"))),
diff --git a/src/block.rs b/src/block.rs
index 1e4c52dc..6add24b7 100644
--- a/src/block.rs
+++ b/src/block.rs
@@ -242,14 +242,14 @@ impl BlockManager {
if need_nodes.len() > 0 {
let put_block_message = self.read_block(hash).await?;
- for resp in rpc_call_many(
+ let put_responses = rpc_call_many(
garage.system.clone(),
&need_nodes[..],
put_block_message,
BLOCK_RW_TIMEOUT,
)
- .await
- {
+ .await;
+ for resp in put_responses {
resp?;
}
}
diff --git a/src/table.rs b/src/table.rs
index d2c7de94..3ad08cff 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -155,7 +155,7 @@ impl<F: TableSchema + 'static> Table<F> {
let hash = e.partition_key().hash();
let ring = self.system.ring.borrow().clone();
let who = ring.walk_ring(&hash, self.param.replication_factor);
- eprintln!("insert who: {:?}", who);
+ //eprintln!("insert who: {:?}", who);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
let rpc = &TableRPC::<F>::Update(vec![e_enc]);
@@ -212,7 +212,7 @@ impl<F: TableSchema + 'static> Table<F> {
let hash = partition_key.hash();
let ring = self.system.ring.borrow().clone();
let who = ring.walk_ring(&hash, self.param.replication_factor);
- eprintln!("get who: {:?}", who);
+ //eprintln!("get who: {:?}", who);
let rpc = &TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self
diff --git a/src/table_sync.rs b/src/table_sync.rs
index 703dd750..024e239f 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -29,6 +29,8 @@ pub struct TableSyncer<F: TableSchema> {
#[derive(Serialize, Deserialize)]
pub enum SyncRPC {
+ GetRootChecksumRange(Hash, Hash),
+ RootChecksumRange(SyncRange),
Checksums(Vec<RangeChecksum>, bool),
Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>),
}
@@ -180,6 +182,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
.filter(|node| **node != my_id)
.map(|node| {
self.clone().do_sync_with(
+ partition.clone(),
root_cks.clone(),
node.clone(),
partition.retain,
@@ -346,13 +349,33 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
async fn do_sync_with(
self: Arc<Self>,
+ partition: Partition,
root_ck: RangeChecksum,
who: UUID,
retain: bool,
mut must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
let mut todo = VecDeque::new();
- todo.push_back(root_ck);
+
+ // If their root checksum has level > than us, use that as a reference
+ let root_cks_resp = self
+ .table
+ .rpc_call(
+ &who,
+ &TableRPC::<F>::SyncRPC(SyncRPC::GetRootChecksumRange(
+ partition.begin.clone(),
+ partition.end.clone(),
+ )),
+ )
+ .await?;
+ if let TableRPC::<F>::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp {
+ if range.level > root_ck.bounds.level {
+ let their_root_range_ck = self.range_checksum(&range, &mut must_exit).await?;
+ todo.push_back(their_root_range_ck);
+ } else {
+ todo.push_back(root_ck);
+ }
+ }
while !todo.is_empty() && !*must_exit.borrow() {
let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y);
@@ -445,83 +468,95 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
message: &SyncRPC,
mut must_exit: watch::Receiver<bool>,
) -> Result<SyncRPC, Error> {
- if let SyncRPC::Checksums(checksums, retain) = message {
- let mut ret_ranges = vec![];
- let mut ret_items = vec![];
- for ckr in checksums.iter() {
- let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?;
- for (range, hash) in ckr.children.iter() {
- // Only consider items that are in the intersection of the two ranges
- // (other ranges will be exchanged at some point)
- if our_ckr
- .found_limit
- .as_ref()
- .map(|x| range.begin.as_slice() >= x.as_slice())
- .unwrap_or(false)
- {
- break;
- }
+ match message {
+ SyncRPC::GetRootChecksumRange(begin, end) => {
+ let root_cks = self.root_checksum(&begin, &end, &mut must_exit).await?;
+ Ok(SyncRPC::RootChecksumRange(root_cks.bounds))
+ }
+ SyncRPC::Checksums(checksums, retain) => {
+ self.handle_checksums_rpc(&checksums[..], *retain, &mut must_exit)
+ .await
+ }
+ _ => Err(Error::Message(format!("Unexpected sync RPC"))),
+ }
+ }
- let differs = match our_ckr
- .children
- .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin))
- {
- Err(_) => true,
- Ok(i) => our_ckr.children[i].1 != *hash,
- };
- if differs {
- ret_ranges.push(range.clone());
- if *retain && range.level == 0 {
- if let Some(item_bytes) =
- self.table.store.get(range.begin.as_slice())?
- {
- ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
- }
+ pub async fn handle_checksums_rpc(
+ self: &Arc<Self>,
+ checksums: &[RangeChecksum],
+ retain: bool,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<SyncRPC, Error> {
+ let mut ret_ranges = vec![];
+ let mut ret_items = vec![];
+ for ckr in checksums.iter() {
+ let our_ckr = self.range_checksum(&ckr.bounds, must_exit).await?;
+ for (range, hash) in ckr.children.iter() {
+ // Only consider items that are in the intersection of the two ranges
+ // (other ranges will be exchanged at some point)
+ if our_ckr
+ .found_limit
+ .as_ref()
+ .map(|x| range.begin.as_slice() >= x.as_slice())
+ .unwrap_or(false)
+ {
+ break;
+ }
+
+ let differs = match our_ckr
+ .children
+ .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin))
+ {
+ Err(_) => true,
+ Ok(i) => our_ckr.children[i].1 != *hash,
+ };
+ if differs {
+ ret_ranges.push(range.clone());
+ if retain && range.level == 0 {
+ if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? {
+ ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
}
}
}
- for (range, _hash) in our_ckr.children.iter() {
- if ckr
- .found_limit
- .as_ref()
- .map(|x| range.begin.as_slice() >= x.as_slice())
- .unwrap_or(false)
- {
- break;
- }
+ }
+ for (range, _hash) in our_ckr.children.iter() {
+ if ckr
+ .found_limit
+ .as_ref()
+ .map(|x| range.begin.as_slice() >= x.as_slice())
+ .unwrap_or(false)
+ {
+ break;
+ }
- let not_present = ckr
- .children
- .binary_search_by(|(their_range, _)| their_range.begin.cmp(&range.begin))
- .is_err();
- if not_present {
- if range.level > 0 {
- ret_ranges.push(range.clone());
- }
- if *retain && range.level == 0 {
- if let Some(item_bytes) =
- self.table.store.get(range.begin.as_slice())?
- {
- ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
- }
+ let not_present = ckr
+ .children
+ .binary_search_by(|(their_range, _)| their_range.begin.cmp(&range.begin))
+ .is_err();
+ if not_present {
+ if range.level > 0 {
+ ret_ranges.push(range.clone());
+ }
+ if retain && range.level == 0 {
+ if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? {
+ ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
}
}
}
}
- let n_checksums = checksums
- .iter()
- .map(|x| x.children.len())
- .fold(0, |x, y| x + y);
- eprintln!(
- "({}) Checksum comparison RPC: {} different + {} items for {} received",
- self.table.name,
- ret_ranges.len(),
- ret_items.len(),
- n_checksums
- );
- return Ok(SyncRPC::Difference(ret_ranges, ret_items));
}
- Err(Error::Message(format!("Unexpected sync RPC")))
+ let n_checksums = checksums
+ .iter()
+ .map(|x| x.children.len())
+ .fold(0, |x, y| x + y);
+ eprintln!(
+ "({}) Checksum comparison RPC: {} different + {} items for {} received",
+ self.table.name,
+ ret_ranges.len(),
+ ret_items.len(),
+ n_checksums
+ );
+ Ok(SyncRPC::Difference(ret_ranges, ret_items))
}
pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> {