aboutsummaryrefslogtreecommitdiff
path: root/src/table/table_sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/table_sync.rs')
-rw-r--r--src/table/table_sync.rs129
1 files changed, 68 insertions, 61 deletions
diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs
index 51f8cd6f..7394be1b 100644
--- a/src/table/table_sync.rs
+++ b/src/table/table_sync.rs
@@ -16,18 +16,22 @@ use garage_util::data::*;
use garage_util::error::Error;
use crate::*;
+use crate::data::*;
+use crate::replication::*;
const MAX_DEPTH: usize = 16;
+
const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
-// Scan & sync every 12 hours
-const SCAN_INTERVAL: Duration = Duration::from_secs(12 * 60 * 60);
+// Do anti-entropy every 10 minutes
+const SCAN_INTERVAL: Duration = Duration::from_secs(10 * 60);
-// Expire cache after 30 minutes
-const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(30 * 60);
+const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(10 * 60);
pub struct TableSyncer<F: TableSchema, R: TableReplication> {
- table: Arc<Table<F, R>>,
+ data: Arc<TableData<F>>,
+ aux: Arc<TableAux<F, R>>,
+
todo: Mutex<SyncTodo>,
cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksumCache>>>,
}
@@ -106,10 +110,13 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- pub(crate) fn launch(table: Arc<Table<F, R>>) -> Arc<Self> {
- let todo = SyncTodo { todo: Vec::new() };
- let syncer = Arc::new(TableSyncer {
- table: table.clone(),
+ pub(crate) fn launch(data: Arc<TableData<F>>,
+ aux: Arc<TableAux<F, R>>) -> Arc<Self> {
+ let todo = SyncTodo{ todo: vec![] };
+
+ let syncer = Arc::new(Self {
+ data: data.clone(),
+ aux: aux.clone(),
todo: Mutex::new(todo),
cache: (0..MAX_DEPTH)
.map(|_| Mutex::new(BTreeMap::new()))
@@ -119,21 +126,21 @@ where
let (busy_tx, busy_rx) = mpsc::unbounded_channel();
let s1 = syncer.clone();
- table.system.background.spawn_worker(
- format!("table sync watcher for {}", table.name),
+ aux.system.background.spawn_worker(
+ format!("table sync watcher for {}", data.name),
move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
);
let s2 = syncer.clone();
- table.system.background.spawn_worker(
- format!("table syncer for {}", table.name),
+ aux.system.background.spawn_worker(
+ format!("table syncer for {}", data.name),
move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
);
let s3 = syncer.clone();
tokio::spawn(async move {
tokio::time::delay_for(Duration::from_secs(20)).await;
- s3.add_full_scan().await;
+ s3.add_full_scan();
});
syncer
@@ -144,8 +151,8 @@ where
mut must_exit: watch::Receiver<bool>,
mut busy_rx: mpsc::UnboundedReceiver<bool>,
) -> Result<(), Error> {
- let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone();
- let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone();
+ let mut prev_ring: Arc<Ring> = self.aux.system.ring.borrow().clone();
+ let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone();
let mut nothing_to_do_since = Some(Instant::now());
while !*must_exit.borrow() {
@@ -158,8 +165,8 @@ where
select! {
new_ring_r = s_ring_recv => {
if let Some(new_ring) = new_ring_r {
- debug!("({}) Adding ring difference to syncer todo list", self.table.name);
- self.todo.lock().unwrap().add_ring_difference(&self.table, &prev_ring, &new_ring);
+ debug!("({}) Adding ring difference to syncer todo list", self.data.name);
+ self.todo.lock().unwrap().add_ring_difference(&prev_ring, &new_ring, &self.data, &self.aux);
prev_ring = new_ring;
}
}
@@ -182,8 +189,8 @@ where
_ = s_timeout => {
if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) {
nothing_to_do_since = None;
- debug!("({}) Adding full scan to syncer todo list", self.table.name);
- self.add_full_scan().await;
+ debug!("({}) Adding full scan to syncer todo list", self.data.name);
+ self.add_full_scan();
}
}
}
@@ -191,8 +198,8 @@ where
Ok(())
}
- pub async fn add_full_scan(&self) {
- self.todo.lock().unwrap().add_full_scan(&self.table);
+ pub fn add_full_scan(&self) {
+ self.todo.lock().unwrap().add_full_scan(&self.data, &self.aux);
}
async fn syncer_task(
@@ -211,7 +218,7 @@ where
if let Err(e) = res {
warn!(
"({}) Error while syncing {:?}: {}",
- self.table.name, partition, e
+ self.data.name, partition, e
);
}
} else {
@@ -228,18 +235,18 @@ where
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
if partition.retain {
- let my_id = self.table.system.id;
+ let my_id = self.aux.system.id;
let nodes = self
- .table
+ .aux
.replication
- .write_nodes(&partition.begin, &self.table.system)
+ .write_nodes(&partition.begin, &self.aux.system)
.into_iter()
.filter(|node| *node != my_id)
.collect::<Vec<_>>();
debug!(
"({}) Preparing to sync {:?} with {:?}...",
- self.table.name, partition, nodes
+ self.data.name, partition, nodes
);
let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit)?;
@@ -259,10 +266,10 @@ where
while let Some(r) = sync_futures.next().await {
if let Err(e) = r {
n_errors += 1;
- warn!("({}) Sync error: {}", self.table.name, e);
+ warn!("({}) Sync error: {}", self.data.name, e);
}
}
- if n_errors > self.table.replication.max_write_errors() {
+ if n_errors > self.aux.replication.max_write_errors() {
return Err(Error::Message(format!(
"Sync failed with too many nodes (should have been: {:?}).",
nodes
@@ -293,7 +300,7 @@ where
while !*must_exit.borrow() {
let mut items = Vec::new();
- for item in self.table.store.range(begin.to_vec()..end.to_vec()) {
+ for item in self.data.store.range(begin.to_vec()..end.to_vec()) {
let (key, value) = item?;
items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
@@ -304,12 +311,12 @@ where
if items.len() > 0 {
let nodes = self
- .table
+ .aux
.replication
- .write_nodes(&begin, &self.table.system)
+ .write_nodes(&begin, &self.aux.system)
.into_iter()
.collect::<Vec<_>>();
- if nodes.contains(&self.table.system.id) {
+ if nodes.contains(&self.aux.system.id) {
warn!("Interrupting offload as partitions seem to have changed");
break;
}
@@ -340,7 +347,7 @@ where
let update_msg = Arc::new(TableRPC::<F>::Update(values));
for res in join_all(nodes.iter().map(|to| {
- self.table
+ self.aux
.rpc_client
.call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT)
}))
@@ -352,7 +359,7 @@ where
// All remote nodes have written those items, now we can delete them locally
let mut not_removed = 0;
for (k, v) in items.iter() {
- if !self.table.delete_if_equal(&k[..], &v[..])? {
+ if !self.data.delete_if_equal(&k[..], &v[..])? {
not_removed += 1;
}
}
@@ -399,7 +406,7 @@ where
if range.level == 1 {
let mut children = vec![];
for item in self
- .table
+ .data
.store
.range(range.begin.clone()..range.end.clone())
{
@@ -516,7 +523,7 @@ where
let v = self.range_checksum(&range, must_exit)?;
trace!(
"({}) New checksum calculated for {}-{}/{}, {} children",
- self.table.name,
+ self.data.name,
hex::encode(&range.begin)
.chars()
.take(16)
@@ -553,7 +560,7 @@ where
// If their root checksum has level > than us, use that as a reference
let root_cks_resp = self
- .table
+ .aux
.rpc_client
.call(
who,
@@ -582,7 +589,7 @@ where
let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y);
trace!(
"({}) Sync with {:?}: {} ({}) remaining",
- self.table.name,
+ self.data.name,
who,
todo.len(),
total_children
@@ -592,7 +599,7 @@ where
let step = todo.drain(..step_size).collect::<Vec<_>>();
let rpc_resp = self
- .table
+ .aux
.rpc_client
.call(
who,
@@ -606,7 +613,7 @@ where
if diff_ranges.len() > 0 || diff_items.len() > 0 {
info!(
"({}) Sync with {:?}: difference {} ranges, {} items",
- self.table.name,
+ self.data.name,
who,
diff_ranges.len(),
diff_items.len()
@@ -622,7 +629,7 @@ where
}
}
if diff_items.len() > 0 {
- self.table.handle_update(&diff_items[..])?;
+ self.data.update_many(&diff_items[..])?;
}
if items_to_send.len() > 0 {
self.send_items(who, items_to_send).await?;
@@ -640,19 +647,19 @@ where
async fn send_items(&self, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
info!(
"({}) Sending {} items to {:?}",
- self.table.name,
+ self.data.name,
item_list.len(),
who
);
let mut values = vec![];
for item in item_list.iter() {
- if let Some(v) = self.table.store.get(&item[..])? {
+ if let Some(v) = self.data.store.get(&item[..])? {
values.push(Arc::new(ByteBuf::from(v.as_ref())));
}
}
let rpc_resp = self
- .table
+ .aux
.rpc_client
.call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT)
.await?;
@@ -714,7 +721,7 @@ where
ret_ranges.push(their_range.clone());
if their_range.level == 0 {
if let Some(item_bytes) =
- self.table.store.get(their_range.begin.as_slice())?
+ self.data.store.get(their_range.begin.as_slice())?
{
ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
}
@@ -738,7 +745,7 @@ where
}
if our_range.level == 0 {
if let Some(item_bytes) =
- self.table.store.get(our_range.begin.as_slice())?
+ self.data.store.get(our_range.begin.as_slice())?
{
ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
}
@@ -753,7 +760,7 @@ where
if ret_ranges.len() > 0 || ret_items.len() > 0 {
trace!(
"({}) Checksum comparison RPC: {} different + {} items for {} received",
- self.table.name,
+ self.data.name,
ret_ranges.len(),
ret_items.len(),
n_checksums
@@ -782,13 +789,13 @@ where
}
impl SyncTodo {
- fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, table: &Table<F, R>) {
- let my_id = table.system.id;
+ fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, data: &TableData<F>, aux: &TableAux<F, R>) {
+ let my_id = aux.system.id;
self.todo.clear();
- let ring = table.system.ring.borrow().clone();
- let split_points = table.replication.split_points(&ring);
+ let ring = aux.system.ring.borrow().clone();
+ let split_points = aux.replication.split_points(&ring);
for i in 0..split_points.len() - 1 {
let begin = split_points[i];
@@ -797,12 +804,12 @@ impl SyncTodo {
continue;
}
- let nodes = table.replication.replication_nodes(&begin, &ring);
+ let nodes = aux.replication.replication_nodes(&begin, &ring);
let retain = nodes.contains(&my_id);
if !retain {
// Check if we have some data to send, otherwise skip
- if table.store.range(begin..end).next().is_none() {
+ if data.store.range(begin..end).next().is_none() {
continue;
}
}
@@ -813,25 +820,25 @@ impl SyncTodo {
fn add_ring_difference<F: TableSchema, R: TableReplication>(
&mut self,
- table: &Table<F, R>,
old_ring: &Ring,
new_ring: &Ring,
+ data: &TableData<F>, aux: &TableAux<F, R>,
) {
- let my_id = table.system.id;
+ let my_id = aux.system.id;
// If it is us who are entering or leaving the system,
// initiate a full sync instead of incremental sync
if old_ring.config.members.contains_key(&my_id)
!= new_ring.config.members.contains_key(&my_id)
{
- self.add_full_scan(table);
+ self.add_full_scan(data, aux);
return;
}
let mut all_points = None
.into_iter()
- .chain(table.replication.split_points(old_ring).drain(..))
- .chain(table.replication.split_points(new_ring).drain(..))
+ .chain(aux.replication.split_points(old_ring).drain(..))
+ .chain(aux.replication.split_points(new_ring).drain(..))
.chain(self.todo.iter().map(|x| x.begin))
.chain(self.todo.iter().map(|x| x.end))
.collect::<Vec<_>>();
@@ -845,11 +852,11 @@ impl SyncTodo {
for i in 0..all_points.len() - 1 {
let begin = all_points[i];
let end = all_points[i + 1];
- let was_ours = table
+ let was_ours = aux
.replication
.replication_nodes(&begin, &old_ring)
.contains(&my_id);
- let is_ours = table
+ let is_ours = aux
.replication
.replication_nodes(&begin, &new_ring)
.contains(&my_id);