diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-19 13:22:28 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-19 13:22:28 +0200 |
commit | 7131553c53d4414d2da0e9b60e6e3425f1b46ec2 (patch) | |
tree | 22c4f225ebdbc600c9cbe38e11a01dbc228c5c11 | |
parent | 4ba54ccfca2ff8e56c58d0a652de256428282490 (diff) | |
download | garage-7131553c53d4414d2da0e9b60e6e3425f1b46ec2.tar.gz garage-7131553c53d4414d2da0e9b60e6e3425f1b46ec2.zip |
Refactor sharding logic; coming next: full replication with epidemic dissemination
-rw-r--r-- | src/main.rs | 17 | ||||
-rw-r--r-- | src/membership.rs | 2 | ||||
-rw-r--r-- | src/object_table.rs | 3 | ||||
-rw-r--r-- | src/server.rs | 22 | ||||
-rw-r--r-- | src/table.rs | 87 | ||||
-rw-r--r-- | src/table_sharded.rs | 55 | ||||
-rw-r--r-- | src/table_sync.rs | 197 | ||||
-rw-r--r-- | src/version_table.rs | 3 |
8 files changed, 221 insertions, 165 deletions
diff --git a/src/main.rs b/src/main.rs index 3e91b21d..cc9da8e2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ mod error; mod background; mod membership; mod table; +mod table_sharded; mod table_sync; mod block; @@ -22,12 +23,15 @@ use std::collections::HashSet; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; +use std::time::Duration; use structopt::StructOpt; use error::Error; use membership::*; use rpc_client::*; -use server::{TlsConfig, DEFAULT_TIMEOUT}; +use server::TlsConfig; + +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); #[derive(StructOpt, Debug)] #[structopt(name = "garage")] @@ -158,11 +162,8 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re for adv in status.iter() { if let Some(cfg) = config.members.get(&adv.id) { println!( - "{}\t{}\t{}\t{}", - hex::encode(&adv.id), - cfg.datacenter, - cfg.n_tokens, - adv.addr + "{:?}\t{}\t{}\t{}", + adv.id, cfg.datacenter, cfg.n_tokens, adv.addr ); } } @@ -176,7 +177,7 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re println!("\nFailed nodes:"); for (id, cfg) in config.members.iter() { if !status.iter().any(|x| x.id == *id) { - println!("{}\t{}\t{}", hex::encode(&id), cfg.datacenter, cfg.n_tokens); + println!("{:?}\t{}\t{}", id, cfg.datacenter, cfg.n_tokens); } } } @@ -188,7 +189,7 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re println!("\nUnconfigured nodes:"); for adv in status.iter() { if !config.members.contains_key(&adv.id) { - println!("{}\t{}", hex::encode(&adv.id), adv.addr); + println!("{:?}\t{}", adv.id, adv.addr); } } } diff --git a/src/membership.rs b/src/membership.rs index e2f4bb90..08dd5f2f 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -198,7 +198,7 @@ impl Ring { self.walk_ring_from_pos(start, n) } - pub fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec<UUID> { + fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec<UUID> { if n >= self.config.members.len() { return self.config.members.keys().cloned().collect::<Vec<_>>(); } diff --git a/src/object_table.rs b/src/object_table.rs index 82a64cd1..59ce3b7f 100644 --- a/src/object_table.rs +++ b/src/object_table.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use crate::background::BackgroundRunner; use crate::data::*; use crate::table::*; +use crate::table_sharded::*; use crate::version_table::*; @@ -90,7 +91,7 @@ impl Entry<String, String> for Object { pub struct ObjectTable { pub background: Arc<BackgroundRunner>, - pub version_table: Arc<Table<VersionTable>>, + pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, } #[async_trait] diff --git a/src/server.rs b/src/server.rs index e728c667..6b4b5b6b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,7 +2,6 @@ use std::io::{Read, Write}; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; -use std::time::Duration; pub use futures_util::future::FutureExt; use serde::Deserialize; @@ -14,6 +13,7 @@ use crate::error::Error; use crate::membership::System; use crate::rpc_server::RpcServer; use crate::table::*; +use crate::table_sharded::*; use crate::block::*; use crate::block_ref_table::*; @@ -22,8 +22,6 @@ use crate::version_table::*; use crate::api_server; -pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); - #[derive(Deserialize, Debug, Clone)] pub struct Config { pub metadata_dir: PathBuf, @@ -59,9 +57,9 @@ pub struct Garage { pub system: Arc<System>, pub block_manager: Arc<BlockManager>, - pub object_table: Arc<Table<ObjectTable>>, - pub version_table: Arc<Table<VersionTable>>, - pub block_ref_table: Arc<Table<BlockRefTable>>, + pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>, + pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, + pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, } impl Garage { @@ -79,18 +77,16 @@ impl Garage { let block_manager = BlockManager::new(&db, config.data_dir.clone(), system.clone(), rpc_server); - let data_rep_param = TableReplicationParams { + let data_rep_param = TableShardedReplication { replication_factor: system.config.data_replication_factor, write_quorum: (system.config.data_replication_factor + 1) / 2, read_quorum: 1, - timeout: DEFAULT_TIMEOUT, }; - let meta_rep_param = TableReplicationParams { + let meta_rep_param = TableShardedReplication { replication_factor: system.config.meta_replication_factor, write_quorum: (system.config.meta_replication_factor + 1) / 2, read_quorum: (system.config.meta_replication_factor + 1) / 2, - timeout: DEFAULT_TIMEOUT, }; println!("Initialize block_ref_table..."); @@ -99,10 +95,10 @@ impl Garage { background: background.clone(), block_manager: block_manager.clone(), }, + data_rep_param.clone(), system.clone(), &db, "block_ref".to_string(), - data_rep_param.clone(), rpc_server, ) .await; @@ -113,10 +109,10 @@ impl Garage { background: background.clone(), block_ref_table: block_ref_table.clone(), }, + meta_rep_param.clone(), system.clone(), &db, "version".to_string(), - meta_rep_param.clone(), rpc_server, ) .await; @@ -127,10 +123,10 @@ impl Garage { background: background.clone(), version_table: version_table.clone(), }, + meta_rep_param.clone(), system.clone(), &db, "object".to_string(), - meta_rep_param.clone(), rpc_server, ) .await; diff --git a/src/table.rs b/src/table.rs index f7354376..d5357277 100644 --- a/src/table.rs +++ b/src/table.rs @@ -10,30 +10,23 @@ use serde_bytes::ByteBuf; use crate::data::*; use crate::error::Error; -use crate::membership::System; +use crate::membership::{Ring, System}; use crate::rpc_client::*; use crate::rpc_server::*; use crate::table_sync::*; -pub struct Table<F: TableSchema> { +const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); + +pub struct Table<F: TableSchema, R: TableReplication> { pub instance: F, + pub replication: R, pub name: String, pub rpc_client: Arc<RpcClient<TableRPC<F>>>, pub system: Arc<System>, pub store: sled::Tree, - pub syncer: ArcSwapOption<TableSyncer<F>>, - - pub param: TableReplicationParams, -} - -#[derive(Clone)] -pub struct TableReplicationParams { - pub replication_factor: usize, - pub read_quorum: usize, - pub write_quorum: usize, - pub timeout: Duration, + pub syncer: ArcSwapOption<TableSyncer<F, R>>, } #[derive(Serialize, Deserialize)] @@ -112,15 +105,38 @@ pub trait TableSchema: Send + Sync { } } -impl<F: TableSchema + 'static> Table<F> { +pub trait TableReplication: Send + Sync { + // See examples in table_sharded.rs and table_fullcopy.rs + // To understand various replication methods + + // Which nodes to send reads from + fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>; + fn read_quorum(&self) -> usize; + + // Which nodes to send writes to + fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>; + fn write_quorum(&self) -> usize; + fn max_write_errors(&self) -> usize; + fn epidemic_writes(&self) -> bool; + + // Which are the nodes that do actually replicate the data + fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>; + fn split_points(&self, ring: &Ring) -> Vec<Hash>; +} + +impl<F, R> Table<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== pub async fn new( instance: F, + replication: R, system: Arc<System>, db: &sled::Db, name: String, - param: TableReplicationParams, rpc_server: &mut RpcServer, ) -> Arc<Self> { let store = db.open_tree(&name).expect("Unable to open DB tree"); @@ -130,11 +146,11 @@ impl<F: TableSchema + 'static> Table<F> { let table = Arc::new(Self { instance, + replication, name, rpc_client, system, store, - param, syncer: ArcSwapOption::from(None), }); table.clone().register_handler(rpc_server, rpc_path); @@ -147,15 +163,19 @@ impl<F: TableSchema + 'static> Table<F> { pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let ring = self.system.ring.borrow().clone(); - let who = ring.walk_ring(&hash, self.param.replication_factor); + let who = self.replication.write_nodes(&hash, &self.system); //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = TableRPC::<F>::Update(vec![e_enc]); self.rpc_client - .try_call_many(&who[..], rpc, self.param.write_quorum, self.param.timeout) + .try_call_many( + &who[..], + rpc, + self.replication.write_quorum(), + TABLE_RPC_TIMEOUT, + ) .await?; Ok(()) } @@ -165,8 +185,7 @@ impl<F: TableSchema + 'static> Table<F> { for entry in entries.iter() { let hash = entry.partition_key().hash(); - let ring = self.system.ring.borrow().clone(); - let who = ring.walk_ring(&hash, self.param.replication_factor); + let who = self.replication.write_nodes(&hash, &self.system); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { @@ -179,7 +198,7 @@ impl<F: TableSchema + 'static> Table<F> { let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRPC::<F>::Update(entries); - let resp = self.rpc_client.call(&node, rpc, self.param.timeout).await?; + let resp = self.rpc_client.call(&node, rpc, TABLE_RPC_TIMEOUT).await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::<FuturesUnordered<_>>(); @@ -190,7 +209,7 @@ impl<F: TableSchema + 'static> Table<F> { errors.push(e); } } - if errors.len() > self.param.replication_factor - self.param.write_quorum { + if errors.len() > self.replication.max_write_errors() { Err(Error::Message("Too many errors".into())) } else { Ok(()) @@ -203,14 +222,18 @@ impl<F: TableSchema + 'static> Table<F> { sort_key: &F::S, ) -> Result<Option<F::E>, Error> { let hash = partition_key.hash(); - let ring = self.system.ring.borrow().clone(); - let who = ring.walk_ring(&hash, self.param.replication_factor); + let who = self.replication.read_nodes(&hash, &self.system); //eprintln!("get who: {:?}", who); let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self .rpc_client - .try_call_many(&who[..], rpc, self.param.read_quorum, self.param.timeout) + .try_call_many( + &who[..], + rpc, + self.replication.read_quorum(), + TABLE_RPC_TIMEOUT, + ) .await?; let mut ret = None; @@ -254,14 +277,18 @@ impl<F: TableSchema + 'static> Table<F> { limit: usize, ) -> Result<Vec<F::E>, Error> { let hash = partition_key.hash(); - let ring = self.system.ring.borrow().clone(); - let who = ring.walk_ring(&hash, self.param.replication_factor); + let who = self.replication.read_nodes(&hash, &self.system); let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit); let resps = self .rpc_client - .try_call_many(&who[..], rpc, self.param.read_quorum, self.param.timeout) + .try_call_many( + &who[..], + rpc, + self.replication.read_quorum(), + TABLE_RPC_TIMEOUT, + ) .await?; let mut ret = BTreeMap::new(); @@ -315,7 +342,7 @@ impl<F: TableSchema + 'static> Table<F> { &who[..], TableRPC::<F>::Update(vec![what_enc]), who.len(), - self.param.timeout, + TABLE_RPC_TIMEOUT, ) .await?; Ok(()) diff --git a/src/table_sharded.rs b/src/table_sharded.rs new file mode 100644 index 00000000..485a9212 --- /dev/null +++ b/src/table_sharded.rs @@ -0,0 +1,55 @@ +use crate::data::*; +use crate::membership::{System, Ring}; +use crate::table::*; + +#[derive(Clone)] +pub struct TableShardedReplication { + pub replication_factor: usize, + pub read_quorum: usize, + pub write_quorum: usize, +} + +impl TableReplication for TableShardedReplication { + // Sharded replication schema: + // - based on the ring of nodes, a certain set of neighbors + // store entries, given as a function of the position of the + // entry's hash in the ring + // - reads are done on all of the nodes that replicate the data + // - writes as well + + fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> { + let ring = system.ring.borrow().clone(); + ring.walk_ring(&hash, self.replication_factor) + } + fn read_quorum(&self) -> usize { + self.read_quorum + } + + fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> { + let ring = system.ring.borrow().clone(); + ring.walk_ring(&hash, self.replication_factor) + } + fn write_quorum(&self) -> usize { + self.write_quorum + } + fn max_write_errors(&self) -> usize { + self.replication_factor - self.write_quorum + } + fn epidemic_writes(&self) -> bool { + false + } + + fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID> { + ring.walk_ring(&hash, self.replication_factor) + } + fn split_points(&self, ring: &Ring) -> Vec<Hash> { + let mut ret = vec![]; + + ret.push([0u8; 32].into()); + for entry in ring.ring.iter() { + ret.push(entry.location.clone()); + } + ret.push([0xFFu8; 32].into()); + ret + } +} diff --git a/src/table_sync.rs b/src/table_sync.rs index 3ba2fc6a..b4555a77 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -1,5 +1,5 @@ use rand::Rng; -use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -21,10 +21,12 @@ const MAX_DEPTH: usize = 16; const SCAN_INTERVAL: Duration = Duration::from_secs(3600); const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800); -pub struct TableSyncer<F: TableSchema> { - pub table: Arc<Table<F>>, - pub todo: Mutex<SyncTodo>, - pub cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksum>>>, +const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(10); + +pub struct TableSyncer<F: TableSchema, R: TableReplication> { + table: Arc<Table<F, R>>, + todo: Mutex<SyncTodo>, + cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksum>>>, } #[derive(Serialize, Deserialize)] @@ -36,21 +38,21 @@ pub enum SyncRPC { } pub struct SyncTodo { - pub todo: Vec<Partition>, + todo: Vec<TodoPartition>, } #[derive(Debug, Clone)] -pub struct Partition { - pub begin: Hash, - pub end: Hash, - pub retain: bool, +struct TodoPartition { + begin: Hash, + end: Hash, + retain: bool, } #[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] pub struct SyncRange { - pub begin: Vec<u8>, - pub end: Vec<u8>, - pub level: usize, + begin: Vec<u8>, + end: Vec<u8>, + level: usize, } impl std::cmp::PartialOrd for SyncRange { @@ -66,16 +68,20 @@ impl std::cmp::Ord for SyncRange { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RangeChecksum { - pub bounds: SyncRange, - pub children: Vec<(SyncRange, Hash)>, - pub found_limit: Option<Vec<u8>>, + bounds: SyncRange, + children: Vec<(SyncRange, Hash)>, + found_limit: Option<Vec<u8>>, #[serde(skip, default = "std::time::Instant::now")] - pub time: Instant, + time: Instant, } -impl<F: TableSchema + 'static> TableSyncer<F> { - pub async fn launch(table: Arc<Table<F>>) -> Arc<Self> { +impl<F, R> TableSyncer<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + pub async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> { let todo = SyncTodo { todo: Vec::new() }; let syncer = Arc::new(TableSyncer { table: table.clone(), @@ -166,7 +172,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { async fn sync_partition( self: Arc<Self>, - partition: &Partition, + partition: &TodoPartition, must_exit: &mut watch::Receiver<bool>, ) -> Result<(), Error> { eprintln!("({}) Preparing to sync {:?}...", self.table.name, partition); @@ -175,8 +181,10 @@ impl<F: TableSchema + 'static> TableSyncer<F> { .await?; let my_id = self.table.system.id.clone(); - let ring = self.table.system.ring.borrow().clone(); - let nodes = ring.walk_ring(&partition.begin, self.table.param.replication_factor); + let nodes = self + .table + .replication + .write_nodes(&partition.begin, &self.table.system); let mut sync_futures = nodes .iter() .filter(|node| **node != my_id) @@ -349,7 +357,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { async fn do_sync_with( self: Arc<Self>, - partition: Partition, + partition: TodoPartition, root_ck: RangeChecksum, who: UUID, retain: bool, @@ -367,7 +375,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { partition.begin.clone(), partition.end.clone(), )), - self.table.param.timeout, + TABLE_SYNC_RPC_TIMEOUT, ) .await?; if let TableRPC::<F>::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp { @@ -398,7 +406,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { .call( &who, &TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step, retain)), - self.table.param.timeout, + TABLE_SYNC_RPC_TIMEOUT, ) .await?; if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) = @@ -456,11 +464,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { let rpc_resp = self .table .rpc_client - .call( - &who, - &TableRPC::<F>::Update(values), - self.table.param.timeout, - ) + .call(&who, &TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT) .await?; if let TableRPC::<F>::Ok = rpc_resp { Ok(()) @@ -490,7 +494,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } } - pub async fn handle_checksums_rpc( + async fn handle_checksums_rpc( self: &Arc<Self>, checksums: &[RangeChecksum], retain: bool, @@ -589,99 +593,80 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } impl SyncTodo { - fn add_full_scan<F: TableSchema>(&mut self, table: &Table<F>) { + fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, table: &Table<F, R>) { let my_id = table.system.id.clone(); self.todo.clear(); - let ring: Arc<Ring> = table.system.ring.borrow().clone(); - - for i in 0..ring.ring.len() { - let nodes = ring.walk_ring_from_pos(i, table.param.replication_factor); - let begin = ring.ring[i].location.clone(); - - if i == 0 { - self.add_full_scan_aux(table, [0u8; 32].into(), begin.clone(), &nodes[..], &my_id); + let ring = table.system.ring.borrow().clone(); + let split_points = table.replication.split_points(&ring); + + for i in 0..split_points.len() - 1 { + let begin = split_points[i].clone(); + let end = split_points[i + 1].clone(); + let nodes = table.replication.write_nodes_from_ring(&begin, &ring); + + let retain = nodes.contains(&my_id); + if !retain { + // Check if we have some data to send, otherwise skip + if table + .store + .range(begin.clone()..end.clone()) + .next() + .is_none() + { + continue; + } } - if i == ring.ring.len() - 1 { - self.add_full_scan_aux(table, begin, [0xffu8; 32].into(), &nodes[..], &my_id); - } else { - let end = ring.ring[i + 1].location.clone(); - self.add_full_scan_aux(table, begin, end, &nodes[..], &my_id); - } + self.todo.push(TodoPartition { begin, end, retain }); } } - fn add_full_scan_aux<F: TableSchema>( + fn add_ring_difference<F: TableSchema, R: TableReplication>( &mut self, - table: &Table<F>, - begin: Hash, - end: Hash, - nodes: &[UUID], - my_id: &UUID, + table: &Table<F, R>, + old_ring: &Ring, + new_ring: &Ring, ) { - let retain = nodes.contains(my_id); - if !retain { - // Check if we have some data to send, otherwise skip - if table - .store - .range(begin.clone()..end.clone()) - .next() - .is_none() - { - return; - } - } - - self.todo.push(Partition { begin, end, retain }); - } - - fn add_ring_difference<F: TableSchema>(&mut self, table: &Table<F>, old: &Ring, new: &Ring) { let my_id = table.system.id.clone(); - let old_ring = ring_points(old); - let new_ring = ring_points(new); - let both_ring = old_ring.union(&new_ring).cloned().collect::<BTreeSet<_>>(); - - let prev_todo_begin = self - .todo - .iter() - .map(|x| x.begin.clone()) - .collect::<BTreeSet<_>>(); - let prev_todo_end = self - .todo - .iter() - .map(|x| x.end.clone()) - .collect::<BTreeSet<_>>(); - let prev_todo = prev_todo_begin - .union(&prev_todo_end) - .cloned() - .collect::<BTreeSet<_>>(); - - let all_points = both_ring.union(&prev_todo).cloned().collect::<Vec<_>>(); - - self.todo.sort_by(|x, y| x.begin.cmp(&y.begin)); + let mut all_points = None + .into_iter() + .chain(table.replication.split_points(old_ring).drain(..)) + .chain(table.replication.split_points(new_ring).drain(..)) + .chain(self.todo.iter().map(|x| x.begin.clone())) + .chain(self.todo.iter().map(|x| x.end.clone())) + .collect::<Vec<_>>(); + all_points.sort(); + all_points.dedup(); + + let mut old_todo = std::mem::replace(&mut self.todo, vec![]); + old_todo.sort_by(|x, y| x.begin.cmp(&y.begin)); let mut new_todo = vec![]; + for i in 0..all_points.len() - 1 { let begin = all_points[i].clone(); let end = all_points[i + 1].clone(); - let was_ours = old - .walk_ring(&begin, table.param.replication_factor) + let was_ours = table + .replication + .write_nodes_from_ring(&begin, &old_ring) .contains(&my_id); - let is_ours = new - .walk_ring(&begin, table.param.replication_factor) + let is_ours = table + .replication + .write_nodes_from_ring(&begin, &new_ring) .contains(&my_id); - let was_todo = match self.todo.binary_search_by(|x| x.begin.cmp(&begin)) { + + let was_todo = match old_todo.binary_search_by(|x| x.begin.cmp(&begin)) { Ok(_) => true, Err(j) => { - (j > 0 && self.todo[j - 1].begin < end && begin < self.todo[j - 1].end) - || (j < self.todo.len() - && self.todo[j].begin < end && begin < self.todo[j].end) + (j > 0 && old_todo[j - 1].begin < end && begin < old_todo[j - 1].end) + || (j < old_todo.len() + && old_todo[j].begin < end && begin < old_todo[j].end) } }; if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) { - new_todo.push(Partition { + new_todo.push(TodoPartition { begin, end, retain: is_ours, @@ -692,7 +677,7 @@ impl SyncTodo { self.todo = new_todo; } - fn pop_task(&mut self) -> Option<Partition> { + fn pop_task(&mut self) -> Option<TodoPartition> { if self.todo.is_empty() { return None; } @@ -707,13 +692,3 @@ impl SyncTodo { } } } - -fn ring_points(ring: &Ring) -> BTreeSet<Hash> { - let mut ret = BTreeSet::new(); - ret.insert([0u8; 32].into()); - ret.insert([0xFFu8; 32].into()); - for i in 0..ring.ring.len() { - ret.insert(ring.ring[i].location.clone()); - } - ret -} diff --git a/src/version_table.rs b/src/version_table.rs index 7e7623fc..24109981 100644 --- a/src/version_table.rs +++ b/src/version_table.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use crate::background::BackgroundRunner; use crate::data::*; use crate::table::*; +use crate::table_sharded::*; use crate::block_ref_table::*; @@ -56,7 +57,7 @@ impl Entry<Hash, EmptySortKey> for Version { pub struct VersionTable { pub background: Arc<BackgroundRunner>, - pub block_ref_table: Arc<Table<BlockRefTable>>, + pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, } #[async_trait] |