aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/model/block.rs36
-rw-r--r--src/model/garage.rs5
-rw-r--r--src/table/table.rs17
-rw-r--r--src/table/table_fullcopy.rs60
-rw-r--r--src/table/table_sharded.rs5
-rw-r--r--src/table/table_sync.rs32
-rw-r--r--src/util/config.rs10
7 files changed, 71 insertions, 94 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index 056d9098..56c85c6a 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -334,17 +334,28 @@ impl BlockManager {
}
if need_nodes.len() > 0 {
- trace!("Block {:?} needed by {} nodes, sending", hash, need_nodes.len());
+ trace!(
+ "Block {:?} needed by {} nodes, sending",
+ hash,
+ need_nodes.len()
+ );
let put_block_message = Arc::new(self.read_block(hash).await?);
let put_resps = join_all(need_nodes.iter().map(|to| {
- self.rpc_client.call_arc(*to, put_block_message.clone(), BLOCK_RW_TIMEOUT)
- })).await;
+ self.rpc_client
+ .call_arc(*to, put_block_message.clone(), BLOCK_RW_TIMEOUT)
+ }))
+ .await;
for resp in put_resps {
resp?;
}
}
- trace!("Deleting block {:?}, offload finished ({} / {})", hash, need_nodes.len(), who.len());
+ trace!(
+ "Deleting block {:?}, offload finished ({} / {})",
+ hash,
+ need_nodes.len(),
+ who.len()
+ );
fs::remove_file(path).await?;
self.resync_queue.remove(&hash)?;
@@ -391,7 +402,7 @@ impl BlockManager {
.try_call_many(
&who[..],
Message::PutBlock(PutBlockMessage { hash, data }),
- RequestStrategy::with_quorum(self.replication.write_quorum())
+ RequestStrategy::with_quorum(self.replication.write_quorum(&self.system))
.with_timeout(BLOCK_RW_TIMEOUT),
)
.await?;
@@ -420,12 +431,17 @@ impl BlockManager {
}
// 2. Repair blocks actually on disk
- self.repair_aux_read_dir_rec(&self.data_dir, must_exit).await?;
+ self.repair_aux_read_dir_rec(&self.data_dir, must_exit)
+ .await?;
Ok(())
}
- fn repair_aux_read_dir_rec<'a>(&'a self, path: &'a PathBuf, must_exit: &'a watch::Receiver<bool>) -> BoxFuture<'a, Result<(), Error>> {
+ fn repair_aux_read_dir_rec<'a>(
+ &'a self,
+ path: &'a PathBuf,
+ must_exit: &'a watch::Receiver<bool>,
+ ) -> BoxFuture<'a, Result<(), Error>> {
// Lists all blocks on disk and adds them to the resync queue.
// This allows us to find blocks we are storing but don't actually need,
// so that we can offload them if necessary and then delete them locally.
@@ -441,7 +457,8 @@ impl BlockManager {
let ent_type = data_dir_ent.file_type().await?;
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
- self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit).await?;
+ self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit)
+ .await?;
} else if name.len() == 64 {
let hash_bytes = match hex::decode(&name) {
Ok(h) => h,
@@ -457,7 +474,8 @@ impl BlockManager {
}
}
Ok(())
- }.boxed()
+ }
+ .boxed()
}
}
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 46e0d02f..467d0aec 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -65,10 +65,7 @@ impl Garage {
read_quorum: (config.meta_replication_factor + 1) / 2,
};
- let control_rep_param = TableFullReplication::new(
- config.meta_epidemic_fanout,
- (config.meta_epidemic_fanout + 1) / 2,
- );
+ let control_rep_param = TableFullReplication::new(config.control_write_max_faults);
info!("Initialize block manager...");
let block_manager = BlockManager::new(
diff --git a/src/table/table.rs b/src/table/table.rs
index 8b16173e..1f6b7d25 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -61,9 +61,8 @@ pub trait TableReplication: Send + Sync {
// Which nodes to send writes to
fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
- fn write_quorum(&self) -> usize;
+ fn write_quorum(&self, system: &System) -> usize;
fn max_write_errors(&self) -> usize;
- fn epidemic_writes(&self) -> bool;
// Which are the nodes that do actually replicate the data
fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>;
@@ -119,7 +118,7 @@ where
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.replication.write_quorum())
+ RequestStrategy::with_quorum(self.replication.write_quorum(&self.system))
.with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
@@ -382,7 +381,6 @@ where
pub async fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> {
let syncer = self.syncer.load_full().unwrap();
- let mut epidemic_propagate = vec![];
for update_bytes in entries.iter() {
let update = self.decode_entry(update_bytes.as_slice())?;
@@ -410,22 +408,11 @@ where
})?;
if old_entry.as_ref() != Some(&new_entry) {
- if self.replication.epidemic_writes() {
- epidemic_propagate.push(new_entry.clone());
- }
-
self.instance.updated(old_entry, Some(new_entry));
syncer.invalidate(&tree_key[..]);
}
}
- if epidemic_propagate.len() > 0 {
- let self2 = self.clone();
- self.system
- .background
- .spawn_cancellable(async move { self2.insert_many(&epidemic_propagate[..]).await });
- }
-
Ok(())
}
diff --git a/src/table/table_fullcopy.rs b/src/table/table_fullcopy.rs
index 5dd0ebb1..c55879d8 100644
--- a/src/table/table_fullcopy.rs
+++ b/src/table/table_fullcopy.rs
@@ -1,4 +1,3 @@
-use arc_swap::ArcSwapOption;
use std::sync::Arc;
use garage_rpc::membership::System;
@@ -9,10 +8,7 @@ use crate::*;
#[derive(Clone)]
pub struct TableFullReplication {
- pub write_factor: usize,
- pub write_quorum: usize,
-
- neighbors: ArcSwapOption<Neighbors>,
+ pub max_faults: usize,
}
#[derive(Clone)]
@@ -22,45 +18,8 @@ struct Neighbors {
}
impl TableFullReplication {
- pub fn new(write_factor: usize, write_quorum: usize) -> Self {
- TableFullReplication {
- write_factor,
- write_quorum,
- neighbors: ArcSwapOption::from(None),
- }
- }
-
- fn get_neighbors(&self, system: &System) -> Vec<UUID> {
- let neighbors = self.neighbors.load_full();
- if let Some(n) = neighbors {
- if Arc::ptr_eq(&n.ring, &system.ring.borrow()) {
- return n.neighbors.clone();
- }
- }
-
- // Recalculate neighbors
- let ring = system.ring.borrow().clone();
- let my_id = system.id;
-
- let mut nodes = vec![];
- for (node, _) in ring.config.members.iter() {
- let node_ranking = fasthash(&[node.as_slice(), my_id.as_slice()].concat());
- nodes.push((*node, node_ranking));
- }
- nodes.sort_by(|(_, rank1), (_, rank2)| rank1.cmp(rank2));
- let mut neighbors = nodes
- .drain(..)
- .map(|(node, _)| node)
- .filter(|node| *node != my_id)
- .take(self.write_factor)
- .collect::<Vec<_>>();
-
- neighbors.push(my_id);
- self.neighbors.swap(Some(Arc::new(Neighbors {
- ring,
- neighbors: neighbors.clone(),
- })));
- neighbors
+ pub fn new(max_faults: usize) -> Self {
+ TableFullReplication { max_faults }
}
}
@@ -78,17 +37,14 @@ impl TableReplication for TableFullReplication {
1
}
- fn write_nodes(&self, _hash: &Hash, system: &System) -> Vec<UUID> {
- self.get_neighbors(system)
+ fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> {
+ self.replication_nodes(hash, system.ring.borrow().as_ref())
}
- fn write_quorum(&self) -> usize {
- self.write_quorum
+ fn write_quorum(&self, system: &System) -> usize {
+ system.ring.borrow().config.members.len() - self.max_faults
}
fn max_write_errors(&self) -> usize {
- self.write_factor - self.write_quorum
- }
- fn epidemic_writes(&self) -> bool {
- true
+ self.max_faults
}
fn replication_nodes(&self, _hash: &Hash, ring: &Ring) -> Vec<UUID> {
diff --git a/src/table/table_sharded.rs b/src/table/table_sharded.rs
index cbb1bc01..47bdfeaf 100644
--- a/src/table/table_sharded.rs
+++ b/src/table/table_sharded.rs
@@ -31,15 +31,12 @@ impl TableReplication for TableShardedReplication {
let ring = system.ring.borrow().clone();
ring.walk_ring(&hash, self.replication_factor)
}
- fn write_quorum(&self) -> usize {
+ fn write_quorum(&self, _system: &System) -> usize {
self.write_quorum
}
fn max_write_errors(&self) -> usize {
self.replication_factor - self.write_quorum
}
- fn epidemic_writes(&self) -> bool {
- false
- }
fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID> {
ring.walk_ring(&hash, self.replication_factor)
diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs
index 2c984226..5fa6793b 100644
--- a/src/table/table_sync.rs
+++ b/src/table/table_sync.rs
@@ -319,7 +319,13 @@ where
}
counter += 1;
- debug!("Offloading {} items from {:?}..{:?} ({})", items.len(), begin, end, counter);
+ debug!(
+ "Offloading {} items from {:?}..{:?} ({})",
+ items.len(),
+ begin,
+ end,
+ counter
+ );
self.offload_items(&items, &nodes[..]).await?;
} else {
break;
@@ -408,7 +414,11 @@ where
.iter()
.all(|x| *x == 0u8)
{
- trace!("range_checksum {:?} returning {} items", range, children.len());
+ trace!(
+ "range_checksum {:?} returning {} items",
+ range,
+ children.len()
+ );
return Ok(RangeChecksum {
bounds: range.clone(),
children,
@@ -423,7 +433,11 @@ where
};
children.push((item_range, blake2sum(&value[..])));
}
- trace!("range_checksum {:?} returning {} items", range, children.len());
+ trace!(
+ "range_checksum {:?} returning {} items",
+ range,
+ children.len()
+ );
Ok(RangeChecksum {
bounds: range.clone(),
children,
@@ -449,7 +463,11 @@ where
}
if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() {
- trace!("range_checksum {:?} returning {} items", range, children.len());
+ trace!(
+ "range_checksum {:?} returning {} items",
+ range,
+ children.len()
+ );
return Ok(RangeChecksum {
bounds: range.clone(),
children,
@@ -464,7 +482,11 @@ where
.iter()
.all(|x| *x == 0u8)
{
- trace!("range_checksum {:?} returning {} items", range, children.len());
+ trace!(
+ "range_checksum {:?} returning {} items",
+ range,
+ children.len()
+ );
return Ok(RangeChecksum {
bounds: range.clone(),
children,
diff --git a/src/util/config.rs b/src/util/config.rs
index f4c841b7..cd65e009 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -23,12 +23,12 @@ pub struct Config {
#[serde(default = "default_block_size")]
pub block_size: usize,
+ #[serde(default = "default_control_write_max_faults")]
+ pub control_write_max_faults: usize,
+
#[serde(default = "default_replication_factor")]
pub meta_replication_factor: usize,
- #[serde(default = "default_epidemic_fanout")]
- pub meta_epidemic_fanout: usize,
-
#[serde(default = "default_replication_factor")]
pub data_replication_factor: usize,
@@ -68,8 +68,8 @@ fn default_block_size() -> usize {
fn default_replication_factor() -> usize {
3
}
-fn default_epidemic_fanout() -> usize {
- 3
+fn default_control_write_max_faults() -> usize {
+ 1
}
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {