aboutsummaryrefslogtreecommitdiff
path: root/src/table_sync.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-19 13:22:28 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-19 13:22:28 +0200
commit7131553c53d4414d2da0e9b60e6e3425f1b46ec2 (patch)
tree22c4f225ebdbc600c9cbe38e11a01dbc228c5c11 /src/table_sync.rs
parent4ba54ccfca2ff8e56c58d0a652de256428282490 (diff)
downloadgarage-7131553c53d4414d2da0e9b60e6e3425f1b46ec2.tar.gz
garage-7131553c53d4414d2da0e9b60e6e3425f1b46ec2.zip
Refactor sharding logic; coming next: full replication with epidemic dissemination
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r--src/table_sync.rs197
1 files changed, 86 insertions, 111 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs
index 3ba2fc6a..b4555a77 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -1,5 +1,5 @@
use rand::Rng;
-use std::collections::{BTreeMap, BTreeSet, VecDeque};
+use std::collections::{BTreeMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -21,10 +21,12 @@ const MAX_DEPTH: usize = 16;
const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800);
-pub struct TableSyncer<F: TableSchema> {
- pub table: Arc<Table<F>>,
- pub todo: Mutex<SyncTodo>,
- pub cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksum>>>,
+const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(10);
+
+pub struct TableSyncer<F: TableSchema, R: TableReplication> {
+ table: Arc<Table<F, R>>,
+ todo: Mutex<SyncTodo>,
+ cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksum>>>,
}
#[derive(Serialize, Deserialize)]
@@ -36,21 +38,21 @@ pub enum SyncRPC {
}
pub struct SyncTodo {
- pub todo: Vec<Partition>,
+ todo: Vec<TodoPartition>,
}
#[derive(Debug, Clone)]
-pub struct Partition {
- pub begin: Hash,
- pub end: Hash,
- pub retain: bool,
+struct TodoPartition {
+ begin: Hash,
+ end: Hash,
+ retain: bool,
}
#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
pub struct SyncRange {
- pub begin: Vec<u8>,
- pub end: Vec<u8>,
- pub level: usize,
+ begin: Vec<u8>,
+ end: Vec<u8>,
+ level: usize,
}
impl std::cmp::PartialOrd for SyncRange {
@@ -66,16 +68,20 @@ impl std::cmp::Ord for SyncRange {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RangeChecksum {
- pub bounds: SyncRange,
- pub children: Vec<(SyncRange, Hash)>,
- pub found_limit: Option<Vec<u8>>,
+ bounds: SyncRange,
+ children: Vec<(SyncRange, Hash)>,
+ found_limit: Option<Vec<u8>>,
#[serde(skip, default = "std::time::Instant::now")]
- pub time: Instant,
+ time: Instant,
}
-impl<F: TableSchema + 'static> TableSyncer<F> {
- pub async fn launch(table: Arc<Table<F>>) -> Arc<Self> {
+impl<F, R> TableSyncer<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ pub async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> {
let todo = SyncTodo { todo: Vec::new() };
let syncer = Arc::new(TableSyncer {
table: table.clone(),
@@ -166,7 +172,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
async fn sync_partition(
self: Arc<Self>,
- partition: &Partition,
+ partition: &TodoPartition,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
eprintln!("({}) Preparing to sync {:?}...", self.table.name, partition);
@@ -175,8 +181,10 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
.await?;
let my_id = self.table.system.id.clone();
- let ring = self.table.system.ring.borrow().clone();
- let nodes = ring.walk_ring(&partition.begin, self.table.param.replication_factor);
+ let nodes = self
+ .table
+ .replication
+ .write_nodes(&partition.begin, &self.table.system);
let mut sync_futures = nodes
.iter()
.filter(|node| **node != my_id)
@@ -349,7 +357,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
async fn do_sync_with(
self: Arc<Self>,
- partition: Partition,
+ partition: TodoPartition,
root_ck: RangeChecksum,
who: UUID,
retain: bool,
@@ -367,7 +375,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
partition.begin.clone(),
partition.end.clone(),
)),
- self.table.param.timeout,
+ TABLE_SYNC_RPC_TIMEOUT,
)
.await?;
if let TableRPC::<F>::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp {
@@ -398,7 +406,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
.call(
&who,
&TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step, retain)),
- self.table.param.timeout,
+ TABLE_SYNC_RPC_TIMEOUT,
)
.await?;
if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) =
@@ -456,11 +464,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
let rpc_resp = self
.table
.rpc_client
- .call(
- &who,
- &TableRPC::<F>::Update(values),
- self.table.param.timeout,
- )
+ .call(&who, &TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT)
.await?;
if let TableRPC::<F>::Ok = rpc_resp {
Ok(())
@@ -490,7 +494,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
}
- pub async fn handle_checksums_rpc(
+ async fn handle_checksums_rpc(
self: &Arc<Self>,
checksums: &[RangeChecksum],
retain: bool,
@@ -589,99 +593,80 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
impl SyncTodo {
- fn add_full_scan<F: TableSchema>(&mut self, table: &Table<F>) {
+ fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, table: &Table<F, R>) {
let my_id = table.system.id.clone();
self.todo.clear();
- let ring: Arc<Ring> = table.system.ring.borrow().clone();
-
- for i in 0..ring.ring.len() {
- let nodes = ring.walk_ring_from_pos(i, table.param.replication_factor);
- let begin = ring.ring[i].location.clone();
-
- if i == 0 {
- self.add_full_scan_aux(table, [0u8; 32].into(), begin.clone(), &nodes[..], &my_id);
+ let ring = table.system.ring.borrow().clone();
+ let split_points = table.replication.split_points(&ring);
+
+ for i in 0..split_points.len() - 1 {
+ let begin = split_points[i].clone();
+ let end = split_points[i + 1].clone();
+ let nodes = table.replication.write_nodes_from_ring(&begin, &ring);
+
+ let retain = nodes.contains(&my_id);
+ if !retain {
+ // Check if we have some data to send, otherwise skip
+ if table
+ .store
+ .range(begin.clone()..end.clone())
+ .next()
+ .is_none()
+ {
+ continue;
+ }
}
- if i == ring.ring.len() - 1 {
- self.add_full_scan_aux(table, begin, [0xffu8; 32].into(), &nodes[..], &my_id);
- } else {
- let end = ring.ring[i + 1].location.clone();
- self.add_full_scan_aux(table, begin, end, &nodes[..], &my_id);
- }
+ self.todo.push(TodoPartition { begin, end, retain });
}
}
- fn add_full_scan_aux<F: TableSchema>(
+ fn add_ring_difference<F: TableSchema, R: TableReplication>(
&mut self,
- table: &Table<F>,
- begin: Hash,
- end: Hash,
- nodes: &[UUID],
- my_id: &UUID,
+ table: &Table<F, R>,
+ old_ring: &Ring,
+ new_ring: &Ring,
) {
- let retain = nodes.contains(my_id);
- if !retain {
- // Check if we have some data to send, otherwise skip
- if table
- .store
- .range(begin.clone()..end.clone())
- .next()
- .is_none()
- {
- return;
- }
- }
-
- self.todo.push(Partition { begin, end, retain });
- }
-
- fn add_ring_difference<F: TableSchema>(&mut self, table: &Table<F>, old: &Ring, new: &Ring) {
let my_id = table.system.id.clone();
- let old_ring = ring_points(old);
- let new_ring = ring_points(new);
- let both_ring = old_ring.union(&new_ring).cloned().collect::<BTreeSet<_>>();
-
- let prev_todo_begin = self
- .todo
- .iter()
- .map(|x| x.begin.clone())
- .collect::<BTreeSet<_>>();
- let prev_todo_end = self
- .todo
- .iter()
- .map(|x| x.end.clone())
- .collect::<BTreeSet<_>>();
- let prev_todo = prev_todo_begin
- .union(&prev_todo_end)
- .cloned()
- .collect::<BTreeSet<_>>();
-
- let all_points = both_ring.union(&prev_todo).cloned().collect::<Vec<_>>();
-
- self.todo.sort_by(|x, y| x.begin.cmp(&y.begin));
+ let mut all_points = None
+ .into_iter()
+ .chain(table.replication.split_points(old_ring).drain(..))
+ .chain(table.replication.split_points(new_ring).drain(..))
+ .chain(self.todo.iter().map(|x| x.begin.clone()))
+ .chain(self.todo.iter().map(|x| x.end.clone()))
+ .collect::<Vec<_>>();
+ all_points.sort();
+ all_points.dedup();
+
+ let mut old_todo = std::mem::replace(&mut self.todo, vec![]);
+ old_todo.sort_by(|x, y| x.begin.cmp(&y.begin));
let mut new_todo = vec![];
+
for i in 0..all_points.len() - 1 {
let begin = all_points[i].clone();
let end = all_points[i + 1].clone();
- let was_ours = old
- .walk_ring(&begin, table.param.replication_factor)
+ let was_ours = table
+ .replication
+ .write_nodes_from_ring(&begin, &old_ring)
.contains(&my_id);
- let is_ours = new
- .walk_ring(&begin, table.param.replication_factor)
+ let is_ours = table
+ .replication
+ .write_nodes_from_ring(&begin, &new_ring)
.contains(&my_id);
- let was_todo = match self.todo.binary_search_by(|x| x.begin.cmp(&begin)) {
+
+ let was_todo = match old_todo.binary_search_by(|x| x.begin.cmp(&begin)) {
Ok(_) => true,
Err(j) => {
- (j > 0 && self.todo[j - 1].begin < end && begin < self.todo[j - 1].end)
- || (j < self.todo.len()
- && self.todo[j].begin < end && begin < self.todo[j].end)
+ (j > 0 && old_todo[j - 1].begin < end && begin < old_todo[j - 1].end)
+ || (j < old_todo.len()
+ && old_todo[j].begin < end && begin < old_todo[j].end)
}
};
if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) {
- new_todo.push(Partition {
+ new_todo.push(TodoPartition {
begin,
end,
retain: is_ours,
@@ -692,7 +677,7 @@ impl SyncTodo {
self.todo = new_todo;
}
- fn pop_task(&mut self) -> Option<Partition> {
+ fn pop_task(&mut self) -> Option<TodoPartition> {
if self.todo.is_empty() {
return None;
}
@@ -707,13 +692,3 @@ impl SyncTodo {
}
}
}
-
-fn ring_points(ring: &Ring) -> BTreeSet<Hash> {
- let mut ret = BTreeSet::new();
- ret.insert([0u8; 32].into());
- ret.insert([0xFFu8; 32].into());
- for i in 0..ring.ring.len() {
- ret.insert(ring.ring[i].location.clone());
- }
- ret
-}