aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/table/table.rs2
-rw-r--r--src/table/table_sync.rs302
2 files changed, 188 insertions, 116 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index 300e400f..31241530 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -435,7 +435,7 @@ where
let syncer = self.syncer.load_full().unwrap();
debug!("({}) Deleting range {:?} - {:?}", self.name, begin, end);
- let mut count = 0;
+ let mut count: usize = 0;
while let Some((key, _value)) = self.store.get_lt(end.as_slice())? {
if key.as_ref() < begin.as_slice() {
break;
diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs
index 073540d4..1a1b328b 100644
--- a/src/table/table_sync.rs
+++ b/src/table/table_sync.rs
@@ -1,15 +1,14 @@
use rand::Rng;
use std::collections::{BTreeMap, VecDeque};
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
-use futures::future::BoxFuture;
+use futures::future::join_all;
use futures::{pin_mut, select};
use futures_util::future::*;
use futures_util::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
-use tokio::sync::Mutex;
use tokio::sync::{mpsc, watch};
use garage_rpc::ring::Ring;
@@ -33,7 +32,7 @@ pub struct TableSyncer<F: TableSchema, R: TableReplication> {
pub enum SyncRPC {
GetRootChecksumRange(Hash, Hash),
RootChecksumRange(SyncRange),
- Checksums(Vec<RangeChecksum>, bool),
+ Checksums(Vec<RangeChecksum>),
Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>),
}
@@ -43,8 +42,11 @@ pub struct SyncTodo {
#[derive(Debug, Clone)]
struct TodoPartition {
+ // Partition consists in hashes between begin included and end excluded
begin: Hash,
end: Hash,
+
+ // Are we a node that stores this partition or not?
retain: bool,
}
@@ -161,7 +163,7 @@ where
new_ring_r = s_ring_recv => {
if let Some(new_ring) = new_ring_r {
debug!("({}) Adding ring difference to syncer todo list", self.table.name);
- self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring);
+ self.todo.lock().unwrap().add_ring_difference(&self.table, &prev_ring, &new_ring);
prev_ring = new_ring;
}
}
@@ -194,7 +196,7 @@ where
}
pub async fn add_full_scan(&self) {
- self.todo.lock().await.add_full_scan(&self.table);
+ self.todo.lock().unwrap().add_full_scan(&self.table);
}
async fn syncer_task(
@@ -203,7 +205,8 @@ where
busy_tx: mpsc::UnboundedSender<bool>,
) -> Result<(), Error> {
while !*must_exit.borrow() {
- if let Some(partition) = self.todo.lock().await.pop_task() {
+ let task = self.todo.lock().unwrap().pop_task();
+ if let Some(partition) = task {
busy_tx.send(true)?;
let res = self
.clone()
@@ -228,76 +231,152 @@ where
partition: &TodoPartition,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
- let my_id = self.table.system.id;
- let nodes = self
- .table
- .replication
- .write_nodes(&partition.begin, &self.table.system)
- .into_iter()
- .filter(|node| *node != my_id)
- .collect::<Vec<_>>();
+ if partition.retain {
+ let my_id = self.table.system.id;
+ let nodes = self
+ .table
+ .replication
+ .write_nodes(&partition.begin, &self.table.system)
+ .into_iter()
+ .filter(|node| *node != my_id)
+ .collect::<Vec<_>>();
+
+ debug!(
+ "({}) Preparing to sync {:?} with {:?}...",
+ self.table.name, partition, nodes
+ );
+ let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit)?;
+
+ let mut sync_futures = nodes
+ .iter()
+ .map(|node| {
+ self.clone().do_sync_with(
+ partition.clone(),
+ root_cks.clone(),
+ *node,
+ must_exit.clone(),
+ )
+ })
+ .collect::<FuturesUnordered<_>>();
+
+ let mut n_errors = 0;
+ while let Some(r) = sync_futures.next().await {
+ if let Err(e) = r {
+ n_errors += 1;
+ warn!("({}) Sync error: {}", self.table.name, e);
+ }
+ }
+ if n_errors > self.table.replication.max_write_errors() {
+ return Err(Error::Message(format!(
+ "Sync failed with too many nodes (should have been: {:?}).",
+ nodes
+ )));
+ }
+ } else {
+ self.offload_partition(&partition.begin, &partition.end, must_exit)
+ .await?;
+ }
- debug!(
- "({}) Preparing to sync {:?} with {:?}...",
- self.table.name, partition, nodes
- );
- let root_cks = self
- .root_checksum(&partition.begin, &partition.end, must_exit)
- .await?;
+ Ok(())
+ }
- let mut sync_futures = nodes
- .iter()
- .map(|node| {
- self.clone().do_sync_with(
- partition.clone(),
- root_cks.clone(),
- *node,
- partition.retain,
- must_exit.clone(),
- )
- })
- .collect::<FuturesUnordered<_>>();
+ // Offload partition: this partition is not something we are storing,
+ // so send it out to all other nodes that store it and delete items locally.
+ // We don't bother checking if the remote nodes already have the items,
+ // we just batch-send everything. Offloading isn't supposed to happen very often.
+ // If any of the nodes that are supposed to store the items is unable to
+ // save them, we interrupt the process.
+ async fn offload_partition(
+ self: &Arc<Self>,
+ begin: &Hash,
+ end: &Hash,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ let mut counter: usize = 0;
- let mut n_errors = 0;
- while let Some(r) = sync_futures.next().await {
- if let Err(e) = r {
- n_errors += 1;
- warn!("({}) Sync error: {}", self.table.name, e);
+ while !*must_exit.borrow() {
+ let mut items = Vec::new();
+
+ for item in self.table.store.range(begin.to_vec()..end.to_vec()) {
+ let (key, value) = item?;
+ items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
+
+ if items.len() >= 1024 {
+ break;
+ }
+ }
+
+ if items.len() > 0 {
+ let nodes = self
+ .table
+ .replication
+ .write_nodes(&begin, &self.table.system)
+ .into_iter()
+ .collect::<Vec<_>>();
+ if nodes.contains(&self.table.system.id) {
+ warn!("Interrupting offload as partitions seem to have changed");
+ break;
+ }
+
+ counter += 1;
+ debug!("Offloading items from {:?}..{:?} ({})", begin, end, counter);
+ self.offload_items(&items, &nodes[..]).await?;
+ } else {
+ break;
}
}
- if n_errors > self.table.replication.max_write_errors() {
- return Err(Error::Message(format!(
- "Sync failed with too many nodes (should have been: {:?}).",
- nodes
- )));
- }
- if !partition.retain {
+ Ok(())
+ }
+
+ async fn offload_items(
+ self: &Arc<Self>,
+ items: &Vec<(Vec<u8>, Arc<ByteBuf>)>,
+ nodes: &[UUID],
+ ) -> Result<(), Error> {
+ let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
+ let update_msg = Arc::new(TableRPC::<F>::Update(values));
+
+ for res in join_all(nodes.iter().map(|to| {
self.table
- .delete_range(&partition.begin, &partition.end)
- .await?;
+ .rpc_client
+ .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT)
+ }))
+ .await
+ {
+ res?;
+ }
+
+ // All remote nodes have written those items, now we can delete them locally
+ for (k, v) in items.iter() {
+ self.table.store.transaction(|tx_db| {
+ if let Some(curv) = tx_db.get(k)? {
+ if curv == &v[..] {
+ tx_db.remove(&k[..])?;
+ }
+ }
+ Ok(())
+ })?;
}
Ok(())
}
- async fn root_checksum(
+ fn root_checksum(
self: &Arc<Self>,
begin: &Hash,
end: &Hash,
must_exit: &mut watch::Receiver<bool>,
) -> Result<RangeChecksum, Error> {
for i in 1..MAX_DEPTH {
- let rc = self
- .range_checksum(
- &SyncRange {
- begin: begin.to_vec(),
- end: end.to_vec(),
- level: i,
- },
- must_exit,
- )
- .await?;
+ let rc = self.range_checksum(
+ &SyncRange {
+ begin: begin.to_vec(),
+ end: end.to_vec(),
+ level: i,
+ },
+ must_exit,
+ )?;
if rc.found_limit.is_none() {
return Ok(rc);
}
@@ -307,7 +386,7 @@ where
)))
}
- async fn range_checksum(
+ fn range_checksum(
self: &Arc<Self>,
range: &SyncRange,
must_exit: &mut watch::Receiver<bool>,
@@ -357,9 +436,7 @@ where
};
let mut time = Instant::now();
while !*must_exit.borrow() {
- let sub_ck = self
- .range_checksum_cached_hash(&sub_range, must_exit)
- .await?;
+ let sub_ck = self.range_checksum_cached_hash(&sub_range, must_exit)?;
if let Some(hash) = sub_ck.hash {
children.push((sub_range.clone(), hash));
@@ -397,50 +474,48 @@ where
}
}
- fn range_checksum_cached_hash<'a>(
- self: &'a Arc<Self>,
- range: &'a SyncRange,
- must_exit: &'a mut watch::Receiver<bool>,
- ) -> BoxFuture<'a, Result<RangeChecksumCache, Error>> {
- async move {
- let mut cache = self.cache[range.level].lock().await;
+ fn range_checksum_cached_hash(
+ self: &Arc<Self>,
+ range: &SyncRange,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<RangeChecksumCache, Error> {
+ {
+ let mut cache = self.cache[range.level].lock().unwrap();
if let Some(v) = cache.get(&range) {
if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT {
return Ok(v.clone());
}
}
cache.remove(&range);
- drop(cache);
-
- let v = self.range_checksum(&range, must_exit).await?;
- trace!(
- "({}) New checksum calculated for {}-{}/{}, {} children",
- self.table.name,
- hex::encode(&range.begin)
- .chars()
- .take(16)
- .collect::<String>(),
- hex::encode(&range.end).chars().take(16).collect::<String>(),
- range.level,
- v.children.len()
- );
+ }
- let hash = if v.children.len() > 0 {
- Some(blake2sum(&rmp_to_vec_all_named(&v)?[..]))
- } else {
- None
- };
- let cache_entry = RangeChecksumCache {
- hash,
- found_limit: v.found_limit,
- time: v.time,
- };
+ let v = self.range_checksum(&range, must_exit)?;
+ trace!(
+ "({}) New checksum calculated for {}-{}/{}, {} children",
+ self.table.name,
+ hex::encode(&range.begin)
+ .chars()
+ .take(16)
+ .collect::<String>(),
+ hex::encode(&range.end).chars().take(16).collect::<String>(),
+ range.level,
+ v.children.len()
+ );
- let mut cache = self.cache[range.level].lock().await;
- cache.insert(range.clone(), cache_entry.clone());
- Ok(cache_entry)
- }
- .boxed()
+ let hash = if v.children.len() > 0 {
+ Some(blake2sum(&rmp_to_vec_all_named(&v)?[..]))
+ } else {
+ None
+ };
+ let cache_entry = RangeChecksumCache {
+ hash,
+ found_limit: v.found_limit,
+ time: v.time,
+ };
+
+ let mut cache = self.cache[range.level].lock().unwrap();
+ cache.insert(range.clone(), cache_entry.clone());
+ Ok(cache_entry)
}
async fn do_sync_with(
@@ -448,7 +523,6 @@ where
partition: TodoPartition,
root_ck: RangeChecksum,
who: UUID,
- retain: bool,
mut must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
let mut todo = VecDeque::new();
@@ -468,7 +542,7 @@ where
.await?;
if let TableRPC::<F>::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp {
if range.level > root_ck.bounds.level {
- let their_root_range_ck = self.range_checksum(&range, &mut must_exit).await?;
+ let their_root_range_ck = self.range_checksum(&range, &mut must_exit)?;
todo.push_back(their_root_range_ck);
} else {
todo.push_back(root_ck);
@@ -498,7 +572,7 @@ where
.rpc_client
.call(
who,
- TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step, retain)),
+ TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step)),
TABLE_SYNC_RPC_TIMEOUT,
)
.await?;
@@ -519,11 +593,11 @@ where
if differing.level == 0 {
items_to_send.push(differing.begin);
} else {
- let checksum = self.range_checksum(&differing, &mut must_exit).await?;
+ let checksum = self.range_checksum(&differing, &mut must_exit)?;
todo.push_back(checksum);
}
}
- if retain && diff_items.len() > 0 {
+ if diff_items.len() > 0 {
self.table.handle_update(&diff_items[..]).await?;
}
if items_to_send.len() > 0 {
@@ -575,11 +649,11 @@ where
) -> Result<SyncRPC, Error> {
match message {
SyncRPC::GetRootChecksumRange(begin, end) => {
- let root_cks = self.root_checksum(&begin, &end, &mut must_exit).await?;
+ let root_cks = self.root_checksum(&begin, &end, &mut must_exit)?;
Ok(SyncRPC::RootChecksumRange(root_cks.bounds))
}
- SyncRPC::Checksums(checksums, retain) => {
- self.handle_checksums_rpc(&checksums[..], *retain, &mut must_exit)
+ SyncRPC::Checksums(checksums) => {
+ self.handle_checksums_rpc(&checksums[..], &mut must_exit)
.await
}
_ => Err(Error::Message(format!("Unexpected sync RPC"))),
@@ -589,14 +663,13 @@ where
async fn handle_checksums_rpc(
self: &Arc<Self>,
checksums: &[RangeChecksum],
- retain: bool,
must_exit: &mut watch::Receiver<bool>,
) -> Result<SyncRPC, Error> {
let mut ret_ranges = vec![];
let mut ret_items = vec![];
for their_ckr in checksums.iter() {
- let our_ckr = self.range_checksum(&their_ckr.bounds, must_exit).await?;
+ let our_ckr = self.range_checksum(&their_ckr.bounds, must_exit)?;
for (their_range, their_hash) in their_ckr.children.iter() {
let differs = match our_ckr
.children
@@ -604,9 +677,8 @@ where
{
Err(_) => {
if their_range.level >= 1 {
- let cached_hash = self
- .range_checksum_cached_hash(&their_range, must_exit)
- .await?;
+ let cached_hash =
+ self.range_checksum_cached_hash(&their_range, must_exit)?;
cached_hash.hash.map(|h| h != *their_hash).unwrap_or(true)
} else {
true
@@ -616,7 +688,7 @@ where
};
if differs {
ret_ranges.push(their_range.clone());
- if retain && their_range.level == 0 {
+ if their_range.level == 0 {
if let Some(item_bytes) =
self.table.store.get(their_range.begin.as_slice())?
{
@@ -640,7 +712,7 @@ where
if our_range.level > 0 {
ret_ranges.push(our_range.clone());
}
- if retain && our_range.level == 0 {
+ if our_range.level == 0 {
if let Some(item_bytes) =
self.table.store.get(our_range.begin.as_slice())?
{
@@ -673,7 +745,7 @@ where
end: vec![],
level: i,
};
- let mut cache = self.cache[i].lock().await;
+ let mut cache = self.cache[i].lock().unwrap();
if let Some(cache_entry) = cache.range(..=needle).rev().next() {
if cache_entry.0.begin <= item_key && cache_entry.0.end > item_key {
let index = cache_entry.0.clone();