aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/block/manager.rs4
-rw-r--r--src/block/resync.rs24
-rw-r--r--src/rpc/layout/helper.rs5
-rw-r--r--src/rpc/rpc_helper.rs73
4 files changed, 94 insertions, 12 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 537e1fc1..572bdadd 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -370,7 +370,7 @@ impl BlockManager {
prevent_compression: bool,
order_tag: Option<OrderTag>,
) -> Result<(), Error> {
- let who = self.replication.write_sets(&hash);
+ let who = self.system.cluster_layout().current_storage_nodes_of(&hash);
let compression_level = self.compression_level.filter(|_| !prevent_compression);
let (header, bytes) = DataBlock::from_buffer(data, compression_level)
@@ -396,7 +396,7 @@ impl BlockManager {
.rpc_helper()
.try_write_many_sets(
&self.endpoint,
- who.as_ref(),
+ &[who],
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
.with_drop_on_completion(permit)
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 947c68de..b476a0b8 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -377,7 +377,10 @@ impl BlockResyncManager {
info!("Resync block {:?}: offloading and deleting", hash);
let existing_path = existing_path.unwrap();
- let mut who = manager.replication.storage_nodes(hash);
+ let mut who = manager
+ .system
+ .cluster_layout()
+ .current_storage_nodes_of(hash);
if who.len() < manager.replication.write_quorum() {
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
}
@@ -455,6 +458,25 @@ impl BlockResyncManager {
}
if rc.is_nonzero() && !exists {
+ // The refcount is > 0, and the block is not present locally.
+ // We might need to fetch it from another node.
+
+ // First, check whether we are still supposed to store that
+ // block in the latest cluster layout version.
+ let storage_nodes = manager
+ .system
+ .cluster_layout()
+ .current_storage_nodes_of(&hash);
+
+ if !storage_nodes.contains(&manager.system.id) {
+ info!(
+ "Resync block {:?}: block is absent with refcount > 0, but it will drop to zero after all metadata is synced. Not fetching the block.",
+ hash
+ );
+ return Ok(());
+ }
+
+ // We know we need the block. Fetch it.
info!(
"Resync block {:?}: fetching absent but needed block (refcount > 0)",
hash
diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs
index 44c826f9..c08a5629 100644
--- a/src/rpc/layout/helper.rs
+++ b/src/rpc/layout/helper.rs
@@ -219,6 +219,11 @@ impl LayoutHelper {
ret
}
+ pub fn current_storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
+ let ver = self.current();
+ ver.nodes_of(position, ver.replication_factor).collect()
+ }
+
pub fn trackers_hash(&self) -> Hash {
self.trackers_hash
}
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index b8ca8120..2505c2ce 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -540,19 +540,73 @@ impl RpcHelper {
// ---- functions not related to MAKING RPCs, but just determining to what nodes
// they should be made and in which order ----
+ /// Determine to what nodes, and in what order, requests to read a data block
+ /// should be sent. All nodes in the Vec returned by this function are tried
+ /// one by one until there is one that returns the block (in block/manager.rs).
+ ///
+ /// We want to have the best chance of finding the block in as few requests
+ /// as possible, and we want to avoid nodes that answer slowly.
+ ///
+ /// Note that when there are several active layout versions, the block might
+ /// be stored only by nodes of the latest version (in case of a block that was
+ /// written after the layout change), or only by nodes of the oldest active
+ /// version (for all blocks that were written before). So we have to try nodes
+ /// of all layout versions. We also want to try nodes of all layout versions
+ /// fast, so as to optimize the chance of finding the block fast.
+ ///
+ /// Therefore, the strategy is the following:
+ ///
+ /// 1. ask first all nodes of all currently active layout versions
+ /// -> ask the preferred node in all layout versions (older to newer),
+ /// then the second preferred onde in all verions, etc.
+ /// -> we start by the oldest active layout version first, because a majority
+ /// of blocks will have been saved before the layout change
+ /// 2. ask all nodes of historical layout versions, for blocks which have not
+ /// yet been transferred to their new storage nodes
+ ///
+ /// The preference order, for each layout version, is given by `request_order`,
+ /// based on factors such as nodes being in the same datacenter,
+ /// having low ping, etc.
pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec<Uuid> {
let layout = self.0.layout.read().unwrap();
- let mut ret = Vec::with_capacity(12);
- let ver_iter = layout
- .versions()
- .iter()
- .rev()
- .chain(layout.inner().old_versions.iter().rev());
- for ver in ver_iter {
- if ver.version > layout.sync_map_min() {
- continue;
+ // Compute, for each layout version, the set of nodes that might store
+ // the block, and put them in their preferred order as of `request_order`.
+ let mut vernodes = layout.versions().iter().map(|ver| {
+ let nodes = ver.nodes_of(position, ver.replication_factor);
+ rpc_helper.request_order(layout.current(), nodes)
+ });
+
+ let mut ret = if layout.versions().len() == 1 {
+ // If we have only one active layout version, then these are the
+ // only nodes we ask in step 1
+ vernodes.next().unwrap()
+ } else {
+ let vernodes = vernodes.collect::<Vec<_>>();
+
+ let mut nodes = Vec::<Uuid>::with_capacity(12);
+ for i in 0..layout.current().replication_factor {
+ for vn in vernodes.iter() {
+ if let Some(n) = vn.get(i) {
+ if !nodes.contains(&n) {
+ if *n == self.0.our_node_id {
+ // it's always fast (almost free) to ask locally,
+ // so always put that as first choice
+ nodes.insert(0, *n);
+ } else {
+ nodes.push(*n);
+ }
+ }
+ }
+ }
}
+
+ nodes
+ };
+
+ // Second step: add nodes of older layout versions
+ let old_ver_iter = layout.inner().old_versions.iter().rev();
+ for ver in old_ver_iter {
let nodes = ver.nodes_of(position, ver.replication_factor);
for node in rpc_helper.request_order(layout.current(), nodes) {
if !ret.contains(&node) {
@@ -560,6 +614,7 @@ impl RpcHelper {
}
}
}
+
ret
}