aboutsummaryrefslogtreecommitdiff
path: root/src/model/block.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-02-24 11:58:03 +0100
committerAlex Auvolat <alex@adnab.me>2021-02-24 11:58:03 +0100
commit13e2eda0c2beb34b087f45d7461dba0a483c78af (patch)
tree7b425f138d2fc10f18bdbfcf5082b31c2ec63ca1 /src/model/block.rs
parent09fd6ea7f02f5e8a7642942c52227056464b8833 (diff)
downloadgarage-13e2eda0c2beb34b087f45d7461dba0a483c78af.tar.gz
garage-13e2eda0c2beb34b087f45d7461dba0a483c78af.zip
Arrange block manager
Diffstat (limited to 'src/model/block.rs')
-rw-r--r--src/model/block.rs148
1 files changed, 65 insertions, 83 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index 1627ef85..2a991623 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -20,7 +20,7 @@ use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
use garage_table::table_sharded::TableShardedReplication;
-use garage_table::{DeletedFilter, TableReplication};
+use garage_table::TableReplication;
use crate::block_ref_table::*;
@@ -303,56 +303,49 @@ impl BlockManager {
}
if exists && !needed {
- let garage = self.garage.load_full().unwrap();
- let active_refs = garage
- .block_ref_table
- .get_range(&hash, None, Some(DeletedFilter::NotDeleted), 1)
- .await?;
- let needed_by_others = !active_refs.is_empty();
- if needed_by_others {
- let ring = self.system.ring.borrow().clone();
- let who = self.replication.replication_nodes(&hash, &ring);
- let msg = Arc::new(Message::NeedBlockQuery(*hash));
- let who_needs_fut = who.iter().map(|to| {
- self.rpc_client
- .call_arc(*to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT)
- });
- let who_needs = join_all(who_needs_fut).await;
-
- let mut need_nodes = vec![];
- for (node, needed) in who.into_iter().zip(who_needs.iter()) {
- match needed {
- Ok(Message::NeedBlockReply(needed)) => {
- if *needed {
- need_nodes.push(node);
- }
- }
- Err(e) => {
- return Err(Error::Message(format!(
- "Should delete block, but unable to confirm that all other nodes that need it have it: {}",
- e
- )));
- }
- Ok(_) => {
- return Err(Error::Message(format!(
- "Unexpected response to NeedBlockQuery RPC"
- )));
+ trace!("Offloading block {:?}", hash);
+
+ let ring = self.system.ring.borrow().clone();
+
+ let mut who = self.replication.replication_nodes(&hash, &ring);
+ who.retain(|id| *id != self.system.id);
+
+ let msg = Arc::new(Message::NeedBlockQuery(*hash));
+ let who_needs_fut = who.iter().map(|to| {
+ self.rpc_client
+ .call_arc(*to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT)
+ });
+ let who_needs_resps = join_all(who_needs_fut).await;
+
+ let mut need_nodes = vec![];
+ for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) {
+ match needed? {
+ Message::NeedBlockReply(needed) => {
+ if needed {
+ need_nodes.push(*node);
}
}
+ _ => {
+ return Err(Error::Message(format!(
+ "Unexpected response to NeedBlockQuery RPC"
+ )));
+ }
}
+ }
+
+ if need_nodes.len() > 0 {
+ trace!("Block {:?} neede by {} nodes, sending", hash, need_nodes.len());
- if need_nodes.len() > 0 {
- let put_block_message = self.read_block(hash).await?;
- self.rpc_client
- .try_call_many(
- &need_nodes[..],
- put_block_message,
- RequestStrategy::with_quorum(need_nodes.len())
- .with_timeout(BLOCK_RW_TIMEOUT),
- )
- .await?;
+ let put_block_message = Arc::new(self.read_block(hash).await?);
+ let put_resps = join_all(need_nodes.iter().map(|to| {
+ self.rpc_client.call_arc(*to, put_block_message.clone(), BLOCK_RW_TIMEOUT)
+ })).await;
+ for resp in put_resps {
+ resp?;
}
}
+ trace!("Deleting block {:?}, offload finished ({} / {})", hash, need_nodes.len(), who.len());
+
fs::remove_file(path).await?;
self.resync_queue.remove(&hash)?;
}
@@ -427,53 +420,42 @@ impl BlockManager {
}
// 2. Repair blocks actually on disk
- let mut ls_data_dir = fs::read_dir(&self.data_dir).await?;
- while let Some(data_dir_ent) = ls_data_dir.next().await {
- let data_dir_ent = data_dir_ent?;
- let dir_name = data_dir_ent.file_name();
- let dir_name = match dir_name.into_string() {
- Ok(x) => x,
- Err(_) => continue,
- };
- if dir_name.len() != 2 || hex::decode(&dir_name).is_err() {
- continue;
- }
+ self.repair_aux_read_dir_rec(&self.data_dir, must_exit).await?;
- let mut ls_data_dir_2 = match fs::read_dir(data_dir_ent.path()).await {
- Err(e) => {
- warn!(
- "Warning: could not list dir {:?}: {}",
- data_dir_ent.path().to_str(),
- e
- );
- continue;
- }
- Ok(x) => x,
- };
- while let Some(file) = ls_data_dir_2.next().await {
- let file = file?;
- let file_name = file.file_name();
- let file_name = match file_name.into_string() {
+ Ok(())
+ }
+
+ fn repair_aux_read_dir_rec<'a>(&'a self, path: &'a PathBuf, must_exit: &'a watch::Receiver<bool>) -> BoxFuture<'a, Result<(), Error>> {
+ async move {
+ let mut ls_data_dir = fs::read_dir(path).await?;
+ while let Some(data_dir_ent) = ls_data_dir.next().await {
+ let data_dir_ent = data_dir_ent?;
+ let name = data_dir_ent.file_name();
+ let name = match name.into_string() {
Ok(x) => x,
Err(_) => continue,
};
- if file_name.len() != 64 {
- continue;
+ let ent_type = data_dir_ent.file_type().await?;
+ println!("name: {}, path: {:?}", name, data_dir_ent.path().to_str());
+
+ if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
+ self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit).await?;
+ } else if name.len() == 64 {
+ let hash_bytes = match hex::decode(&name) {
+ Ok(h) => h,
+ Err(_) => continue,
+ };
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&hash_bytes[..]);
+ self.put_to_resync(&hash.into(), 0)?;
}
- let hash_bytes = match hex::decode(&file_name) {
- Ok(h) => h,
- Err(_) => continue,
- };
- let mut hash = [0u8; 32];
- hash.copy_from_slice(&hash_bytes[..]);
- self.put_to_resync(&hash.into(), 0)?;
if *must_exit.borrow() {
- return Ok(());
+ break;
}
}
- }
- Ok(())
+ Ok(())
+ }.boxed()
}
}