From f319a7d3740ba8b83c9c0eae27edfda1c1d14c03 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 10 Mar 2021 16:21:56 +0100 Subject: Refactor model stuff, including cleaner CRDTs --- src/table/table.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index 1f6b7d25..366ce925 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -17,6 +17,7 @@ use garage_rpc::ring::Ring; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; +use crate::crdt::CRDT; use crate::schema::*; use crate::table_sync::*; -- cgit v1.2.3 From 8d63738cb062e816fc01c6aa2b32936ad31ff65b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Mar 2021 13:47:21 +0100 Subject: Checkpoint: add merkle tree in data table --- src/table/table.rs | 91 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 62 insertions(+), 29 deletions(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index 366ce925..0e75754c 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -8,6 +8,7 @@ use arc_swap::ArcSwapOption; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; +use sled::Transactional; use garage_util::data::*; use garage_util::error::Error; @@ -18,6 +19,7 @@ use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use crate::crdt::CRDT; +use crate::merkle::*; use crate::schema::*; use crate::table_sync::*; @@ -33,6 +35,7 @@ pub struct Table { pub system: Arc, pub store: sled::Tree, pub syncer: ArcSwapOption>, + merkle_updater: Arc, } #[derive(Serialize, Deserialize)] @@ -77,7 +80,7 @@ where { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== - pub async fn new( + pub fn new( instance: F, replication: R, system: Arc, @@ -85,11 +88,27 @@ where name: String, rpc_server: &mut RpcServer, ) -> Arc { - let store = db.open_tree(&name).expect("Unable to open DB tree"); + let store = db + .open_tree(&format!("{}:table", name)) + .expect("Unable to open DB tree"); + + let merkle_todo_store = db + .open_tree(&format!("{}:merkle_todo", name)) + .expect("Unable to open DB Merkle TODO tree"); + let merkle_tree_store = db + .open_tree(&format!("{}:merkle_tree", name)) + .expect("Unable to open DB Merkle tree tree"); let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::>(&rpc_path); + let merkle_updater = MerkleUpdater::new( + name.clone(), + system.background.clone(), + merkle_todo_store, + merkle_tree_store, + ); + let table = Arc::new(Self { instance, replication, @@ -98,12 +117,15 @@ where system, store, syncer: ArcSwapOption::from(None), + merkle_updater, }); table.clone().register_handler(rpc_server, rpc_path); - let syncer = TableSyncer::launch(table.clone()).await; + let syncer = TableSyncer::launch(table.clone()); table.syncer.swap(Some(syncer)); + table.merkle_updater.launch(); + table } @@ -322,7 +344,7 @@ where Ok(TableRPC::Update(values)) } TableRPC::Update(pairs) => { - self.handle_update(pairs).await?; + self.handle_update(pairs)?; Ok(TableRPC::Ok) } TableRPC::SyncRPC(rpc) => { @@ -380,53 +402,64 @@ where Ok(ret) } - pub async fn handle_update(self: &Arc, entries: &[Arc]) -> Result<(), Error> { - let syncer = self.syncer.load_full().unwrap(); + // ========== CODE THAT ACTUALLY MODIFIES THE TREE ================ + pub fn handle_update(self: &Arc, entries: &[Arc]) -> Result<(), Error> { for update_bytes in entries.iter() { - let update = self.decode_entry(update_bytes.as_slice())?; - - let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - - let (old_entry, new_entry) = self.store.transaction(|db| { - let (old_entry, new_entry) = match db.get(&tree_key)? { - Some(prev_bytes) => { - let old_entry = self - .decode_entry(&prev_bytes) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); - (Some(old_entry), new_entry) - } - None => (None, update.clone()), - }; + self.update_entry(update_bytes.as_slice())?; + } + Ok(()) + } + pub(crate) fn update_entry(self: &Arc, update_bytes: &[u8]) -> Result<(), Error> { + let update = self.decode_entry(update_bytes)?; + let tree_key = self.tree_key(update.partition_key(), update.sort_key()); + + let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| { + let (old_entry, new_entry) = match db.get(&tree_key)? { + Some(prev_bytes) => { + let old_entry = self + .decode_entry(&prev_bytes) + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + let mut new_entry = old_entry.clone(); + new_entry.merge(&update); + (Some(old_entry), new_entry) + } + None => (None, update.clone()), + }; + + if Some(&new_entry) != old_entry.as_ref() { let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RMPEncode) .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?; db.insert(tree_key.clone(), new_bytes)?; - Ok((old_entry, new_entry)) - })?; - - if old_entry.as_ref() != Some(&new_entry) { - self.instance.updated(old_entry, Some(new_entry)); - syncer.invalidate(&tree_key[..]); + Ok(Some((old_entry, new_entry))) + } else { + Ok(None) } + })?; + + if let Some((old_entry, new_entry)) = changed { + self.instance.updated(old_entry, Some(new_entry)); + self.syncer.load_full().unwrap().invalidate(&tree_key[..]); } Ok(()) } pub(crate) fn delete_if_equal(self: &Arc, k: &[u8], v: &[u8]) -> Result { - let removed = self.store.transaction(|txn| { + let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| { if let Some(cur_v) = txn.get(k)? { if cur_v == v { txn.remove(k)?; + mkl_todo.insert(k, vec![])?; return Ok(true); } } Ok(false) })?; + if removed { let old_entry = self.decode_entry(v)?; self.instance.updated(Some(old_entry), None); -- cgit v1.2.3 From 94f3d287742ff90f179f528421c690b00b71a912 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Mar 2021 16:54:15 +0100 Subject: WIP big refactoring --- src/table/table.rs | 265 +++++++++++------------------------------------------ 1 file changed, 51 insertions(+), 214 deletions(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index 0e75754c..a4cb4b24 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -2,40 +2,35 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; -use log::warn; - -use arc_swap::ArcSwapOption; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; -use sled::Transactional; use garage_util::data::*; use garage_util::error::Error; use garage_rpc::membership::System; -use garage_rpc::ring::Ring; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use crate::crdt::CRDT; -use crate::merkle::*; +use crate::data::*; use crate::schema::*; use crate::table_sync::*; +use crate::replication::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); -pub struct Table { - pub instance: F, +pub struct TableAux { + pub system: Arc, pub replication: R, - - pub name: String, pub(crate) rpc_client: Arc>>, +} - pub system: Arc, - pub store: sled::Tree, - pub syncer: ArcSwapOption>, - merkle_updater: Arc, +pub struct Table { + pub data: Arc>, + pub aux: Arc>, + pub syncer: Arc>, } #[derive(Serialize, Deserialize)] @@ -55,23 +50,6 @@ pub(crate) enum TableRPC { impl RpcMessage for TableRPC {} -pub trait TableReplication: Send + Sync { - // See examples in table_sharded.rs and table_fullcopy.rs - // To understand various replication methods - - // Which nodes to send reads from - fn read_nodes(&self, hash: &Hash, system: &System) -> Vec; - fn read_quorum(&self) -> usize; - - // Which nodes to send writes to - fn write_nodes(&self, hash: &Hash, system: &System) -> Vec; - fn write_quorum(&self, system: &System) -> usize; - fn max_write_errors(&self) -> usize; - - // Which are the nodes that do actually replicate the data - fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec; - fn split_points(&self, ring: &Ring) -> Vec; -} impl Table where @@ -88,60 +66,51 @@ where name: String, rpc_server: &mut RpcServer, ) -> Arc { - let store = db - .open_tree(&format!("{}:table", name)) - .expect("Unable to open DB tree"); - - let merkle_todo_store = db - .open_tree(&format!("{}:merkle_todo", name)) - .expect("Unable to open DB Merkle TODO tree"); - let merkle_tree_store = db - .open_tree(&format!("{}:merkle_tree", name)) - .expect("Unable to open DB Merkle tree tree"); - let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::>(&rpc_path); - let merkle_updater = MerkleUpdater::new( - name.clone(), + let data = TableData::new( + name, + instance, + db, system.background.clone(), - merkle_todo_store, - merkle_tree_store, ); - let table = Arc::new(Self { - instance, + let aux = Arc::new(TableAux{ + system, replication, - name, rpc_client, - system, - store, - syncer: ArcSwapOption::from(None), - merkle_updater, }); - table.clone().register_handler(rpc_server, rpc_path); - let syncer = TableSyncer::launch(table.clone()); - table.syncer.swap(Some(syncer)); + let syncer = TableSyncer::launch( + data.clone(), + aux.clone(), + ); - table.merkle_updater.launch(); + let table = Arc::new(Self { + data, + aux, + syncer, + }); + + table.clone().register_handler(rpc_server, rpc_path); table } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self.replication.write_nodes(&hash, &self.system); + let who = self.aux.replication.write_nodes(&hash, &self.aux.system); //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = TableRPC::::Update(vec![e_enc]); - self.rpc_client + self.aux.rpc_client .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.replication.write_quorum(&self.system)) + RequestStrategy::with_quorum(self.aux.replication.write_quorum(&self.aux.system)) .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; @@ -153,7 +122,7 @@ where for entry in entries.iter() { let hash = entry.partition_key().hash(); - let who = self.replication.write_nodes(&hash, &self.system); + let who = self.aux.replication.write_nodes(&hash, &self.aux.system); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { @@ -166,7 +135,7 @@ where let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRPC::::Update(entries); - let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; + let resp = self.aux.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::>(); @@ -177,7 +146,7 @@ where errors.push(e); } } - if errors.len() > self.replication.max_write_errors() { + if errors.len() > self.aux.replication.max_write_errors() { Err(Error::Message("Too many errors".into())) } else { Ok(()) @@ -190,16 +159,17 @@ where sort_key: &F::S, ) -> Result, Error> { let hash = partition_key.hash(); - let who = self.replication.read_nodes(&hash, &self.system); + let who = self.aux.replication.read_nodes(&hash, &self.aux.system); //eprintln!("get who: {:?}", who); let rpc = TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self + .aux .rpc_client .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.replication.read_quorum()) + RequestStrategy::with_quorum(self.aux.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -210,7 +180,7 @@ where for resp in resps { if let TableRPC::ReadEntryResponse(value) = resp { if let Some(v_bytes) = value { - let v = self.decode_entry(v_bytes.as_slice())?; + let v = self.data.decode_entry(v_bytes.as_slice())?; ret = match ret { None => Some(v), Some(mut x) => { @@ -230,7 +200,7 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - self.system + self.aux.system .background .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); } @@ -246,16 +216,16 @@ where limit: usize, ) -> Result, Error> { let hash = partition_key.hash(); - let who = self.replication.read_nodes(&hash, &self.system); + let who = self.aux.replication.read_nodes(&hash, &self.aux.system); let rpc = TableRPC::::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); let resps = self - .rpc_client + .aux.rpc_client .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.replication.read_quorum()) + RequestStrategy::with_quorum(self.aux.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -266,8 +236,8 @@ where for resp in resps { if let TableRPC::Update(entries) = resp { for entry_bytes in entries.iter() { - let entry = self.decode_entry(entry_bytes.as_slice())?; - let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); + let entry = self.data.decode_entry(entry_bytes.as_slice())?; + let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key()); match ret.remove(&entry_key) { None => { ret.insert(entry_key, Some(entry)); @@ -287,7 +257,7 @@ where } if !to_repair.is_empty() { let self2 = self.clone(); - self.system.background.spawn_cancellable(async move { + self.aux.system.background.spawn_cancellable(async move { for (_, v) in to_repair.iter_mut() { self2.repair_on_read(&who[..], v.take().unwrap()).await?; } @@ -306,7 +276,7 @@ where async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); - self.rpc_client + self.aux.rpc_client .try_call_many( &who[..], TableRPC::::Update(vec![what_enc]), @@ -326,8 +296,8 @@ where }); let self2 = self.clone(); - self.rpc_client - .set_local_handler(self.system.id, move |msg| { + self.aux.rpc_client + .set_local_handler(self.aux.system.id, move |msg| { let self2 = self2.clone(); async move { self2.handle(&msg).await } }); @@ -336,157 +306,24 @@ where async fn handle(self: &Arc, msg: &TableRPC) -> Result, Error> { match msg { TableRPC::ReadEntry(key, sort_key) => { - let value = self.handle_read_entry(key, sort_key)?; + let value = self.data.read_entry(key, sort_key)?; Ok(TableRPC::ReadEntryResponse(value)) } TableRPC::ReadRange(key, begin_sort_key, filter, limit) => { - let values = self.handle_read_range(key, begin_sort_key, filter, *limit)?; + let values = self.data.read_range(key, begin_sort_key, filter, *limit)?; Ok(TableRPC::Update(values)) } TableRPC::Update(pairs) => { - self.handle_update(pairs)?; + self.data.update_many(pairs)?; Ok(TableRPC::Ok) } TableRPC::SyncRPC(rpc) => { - let syncer = self.syncer.load_full().unwrap(); - let response = syncer - .handle_rpc(rpc, self.system.background.stop_signal.clone()) + let response = self.syncer + .handle_rpc(rpc, self.aux.system.background.stop_signal.clone()) .await?; Ok(TableRPC::SyncRPC(response)) } _ => Err(Error::BadRPC(format!("Unexpected table RPC"))), } } - - fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result, Error> { - let tree_key = self.tree_key(p, s); - if let Some(bytes) = self.store.get(&tree_key)? { - Ok(Some(ByteBuf::from(bytes.to_vec()))) - } else { - Ok(None) - } - } - - fn handle_read_range( - &self, - p: &F::P, - s: &Option, - filter: &Option, - limit: usize, - ) -> Result>, Error> { - let partition_hash = p.hash(); - let first_key = match s { - None => partition_hash.to_vec(), - Some(sk) => self.tree_key(p, sk), - }; - let mut ret = vec![]; - for item in self.store.range(first_key..) { - let (key, value) = item?; - if &key[..32] != partition_hash.as_slice() { - break; - } - let keep = match filter { - None => true, - Some(f) => { - let entry = self.decode_entry(value.as_ref())?; - F::matches_filter(&entry, f) - } - }; - if keep { - ret.push(Arc::new(ByteBuf::from(value.as_ref()))); - } - if ret.len() >= limit { - break; - } - } - Ok(ret) - } - - // ========== CODE THAT ACTUALLY MODIFIES THE TREE ================ - - pub fn handle_update(self: &Arc, entries: &[Arc]) -> Result<(), Error> { - for update_bytes in entries.iter() { - self.update_entry(update_bytes.as_slice())?; - } - Ok(()) - } - - pub(crate) fn update_entry(self: &Arc, update_bytes: &[u8]) -> Result<(), Error> { - let update = self.decode_entry(update_bytes)?; - let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - - let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| { - let (old_entry, new_entry) = match db.get(&tree_key)? { - Some(prev_bytes) => { - let old_entry = self - .decode_entry(&prev_bytes) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); - (Some(old_entry), new_entry) - } - None => (None, update.clone()), - }; - - if Some(&new_entry) != old_entry.as_ref() { - let new_bytes = rmp_to_vec_all_named(&new_entry) - .map_err(Error::RMPEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?; - db.insert(tree_key.clone(), new_bytes)?; - Ok(Some((old_entry, new_entry))) - } else { - Ok(None) - } - })?; - - if let Some((old_entry, new_entry)) = changed { - self.instance.updated(old_entry, Some(new_entry)); - self.syncer.load_full().unwrap().invalidate(&tree_key[..]); - } - - Ok(()) - } - - pub(crate) fn delete_if_equal(self: &Arc, k: &[u8], v: &[u8]) -> Result { - let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| { - if let Some(cur_v) = txn.get(k)? { - if cur_v == v { - txn.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(true); - } - } - Ok(false) - })?; - - if removed { - let old_entry = self.decode_entry(v)?; - self.instance.updated(Some(old_entry), None); - self.syncer.load_full().unwrap().invalidate(k); - } - Ok(removed) - } - - fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { - let mut ret = p.hash().to_vec(); - ret.extend(s.sort_key()); - ret - } - - fn decode_entry(&self, bytes: &[u8]) -> Result { - match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { - Ok(x) => Ok(x), - Err(e) => match F::try_migrate(bytes) { - Some(x) => Ok(x), - None => { - warn!("Unable to decode entry of {}: {}", self.name, e); - for line in hexdump::hexdump_iter(bytes) { - debug!("{}", line); - } - Err(e.into()) - } - }, - } - } } -- cgit v1.2.3 From 046b649bcc3b147140fc2b0af0e071d3dd1b2c8d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Mar 2021 18:28:03 +0100 Subject: (not well tested) use merkle tree for sync --- src/table/table.rs | 50 ++++++++++++++++++++++---------------------------- 1 file changed, 22 insertions(+), 28 deletions(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index a4cb4b24..516c9358 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -15,9 +15,9 @@ use garage_rpc::rpc_server::*; use crate::crdt::CRDT; use crate::data::*; -use crate::schema::*; -use crate::table_sync::*; use crate::replication::*; +use crate::schema::*; +use crate::sync::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); @@ -50,7 +50,6 @@ pub(crate) enum TableRPC { impl RpcMessage for TableRPC {} - impl Table where F: TableSchema + 'static, @@ -69,29 +68,17 @@ where let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::>(&rpc_path); - let data = TableData::new( - name, - instance, - db, - system.background.clone(), - ); + let data = TableData::new(name, instance, db, system.background.clone()); - let aux = Arc::new(TableAux{ + let aux = Arc::new(TableAux { system, replication, rpc_client, }); - let syncer = TableSyncer::launch( - data.clone(), - aux.clone(), - ); + let syncer = TableSyncer::launch(data.clone(), aux.clone()); - let table = Arc::new(Self { - data, - aux, - syncer, - }); + let table = Arc::new(Self { data, aux, syncer }); table.clone().register_handler(rpc_server, rpc_path); @@ -106,7 +93,8 @@ where let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = TableRPC::::Update(vec![e_enc]); - self.aux.rpc_client + self.aux + .rpc_client .try_call_many( &who[..], rpc, @@ -135,7 +123,11 @@ where let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRPC::::Update(entries); - let resp = self.aux.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; + let resp = self + .aux + .rpc_client + .call(node, rpc, TABLE_RPC_TIMEOUT) + .await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::>(); @@ -200,7 +192,8 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - self.aux.system + self.aux + .system .background .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); } @@ -221,7 +214,8 @@ where let rpc = TableRPC::::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); let resps = self - .aux.rpc_client + .aux + .rpc_client .try_call_many( &who[..], rpc, @@ -276,7 +270,8 @@ where async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); - self.aux.rpc_client + self.aux + .rpc_client .try_call_many( &who[..], TableRPC::::Update(vec![what_enc]), @@ -296,7 +291,8 @@ where }); let self2 = self.clone(); - self.aux.rpc_client + self.aux + .rpc_client .set_local_handler(self.aux.system.id, move |msg| { let self2 = self2.clone(); async move { self2.handle(&msg).await } @@ -318,9 +314,7 @@ where Ok(TableRPC::Ok) } TableRPC::SyncRPC(rpc) => { - let response = self.syncer - .handle_rpc(rpc, self.aux.system.background.stop_signal.clone()) - .await?; + let response = self.syncer.handle_rpc(rpc).await?; Ok(TableRPC::SyncRPC(response)) } _ => Err(Error::BadRPC(format!("Unexpected table RPC"))), -- cgit v1.2.3 From 8860aa19b867183b83ee48efd9990cd34e567f53 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 12 Mar 2021 15:05:26 +0100 Subject: Make syncer have its own rpc client/server --- src/table/table.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index 516c9358..edb1be3f 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -24,7 +24,7 @@ const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); pub struct TableAux { pub system: Arc, pub replication: R, - pub(crate) rpc_client: Arc>>, + rpc_client: Arc>>, } pub struct Table { @@ -76,7 +76,7 @@ where rpc_client, }); - let syncer = TableSyncer::launch(data.clone(), aux.clone()); + let syncer = TableSyncer::launch(data.clone(), aux.clone(), rpc_server); let table = Arc::new(Self { data, aux, syncer }); -- cgit v1.2.3 From cbe7e1a66a9dceaaeae0467b4eefe51afd5b297c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 12 Mar 2021 15:07:23 +0100 Subject: Move table rpc client out of tableaux --- src/table/table.rs | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index edb1be3f..dd3394bd 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -21,16 +21,16 @@ use crate::sync::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); -pub struct TableAux { +pub struct TableAux { pub system: Arc, pub replication: R, - rpc_client: Arc>>, } pub struct Table { pub data: Arc>, - pub aux: Arc>, + pub aux: Arc>, pub syncer: Arc>, + rpc_client: Arc>>, } #[derive(Serialize, Deserialize)] @@ -73,12 +73,16 @@ where let aux = Arc::new(TableAux { system, replication, - rpc_client, }); let syncer = TableSyncer::launch(data.clone(), aux.clone(), rpc_server); - let table = Arc::new(Self { data, aux, syncer }); + let table = Arc::new(Self { + data, + aux, + syncer, + rpc_client, + }); table.clone().register_handler(rpc_server, rpc_path); @@ -93,8 +97,7 @@ where let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = TableRPC::::Update(vec![e_enc]); - self.aux - .rpc_client + self.rpc_client .try_call_many( &who[..], rpc, @@ -123,11 +126,7 @@ where let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRPC::::Update(entries); - let resp = self - .aux - .rpc_client - .call(node, rpc, TABLE_RPC_TIMEOUT) - .await?; + let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::>(); @@ -156,7 +155,6 @@ where let rpc = TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self - .aux .rpc_client .try_call_many( &who[..], @@ -214,7 +212,6 @@ where let rpc = TableRPC::::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); let resps = self - .aux .rpc_client .try_call_many( &who[..], @@ -270,8 +267,7 @@ where async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); - self.aux - .rpc_client + self.rpc_client .try_call_many( &who[..], TableRPC::::Update(vec![what_enc]), @@ -291,8 +287,7 @@ where }); let self2 = self.clone(); - self.aux - .rpc_client + self.rpc_client .set_local_handler(self.aux.system.id, move |msg| { let self2 = self2.clone(); async move { self2.handle(&msg).await } -- cgit v1.2.3 From c475471e7a8e7544f2be490898f4249cf27a17e9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 12 Mar 2021 19:57:37 +0100 Subject: Implement table gc, currently for block_ref and version only --- src/table/table.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index dd3394bd..7b0d9a24 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -18,6 +18,7 @@ use crate::data::*; use crate::replication::*; use crate::schema::*; use crate::sync::*; +use crate::gc::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); @@ -44,8 +45,6 @@ pub(crate) enum TableRPC { ReadRange(F::P, Option, Option, usize), Update(Vec>), - - SyncRPC(SyncRPC), } impl RpcMessage for TableRPC {} @@ -76,6 +75,7 @@ where }); let syncer = TableSyncer::launch(data.clone(), aux.clone(), rpc_server); + TableGC::launch(data.clone(), aux.clone(), rpc_server); let table = Arc::new(Self { data, @@ -308,10 +308,6 @@ where self.data.update_many(pairs)?; Ok(TableRPC::Ok) } - TableRPC::SyncRPC(rpc) => { - let response = self.syncer.handle_rpc(rpc).await?; - Ok(TableRPC::SyncRPC(response)) - } _ => Err(Error::BadRPC(format!("Unexpected table RPC"))), } } -- cgit v1.2.3 From 831eb35763fdaeecb7b6d6aa13ebd78da14db04e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 12 Mar 2021 21:52:19 +0100 Subject: cargo fmt --- src/table/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index 7b0d9a24..2d3c5fe9 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -15,10 +15,10 @@ use garage_rpc::rpc_server::*; use crate::crdt::CRDT; use crate::data::*; +use crate::gc::*; use crate::replication::*; use crate::schema::*; use crate::sync::*; -use crate::gc::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); -- cgit v1.2.3 From 1d9961e4118af0e26068e1d6c5c6c009a1292a88 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 16 Mar 2021 11:14:27 +0100 Subject: Simplify replication logic --- src/table/table.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index 2d3c5fe9..2ce5868f 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -91,7 +91,7 @@ where pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self.aux.replication.write_nodes(&hash, &self.aux.system); + let who = self.aux.replication.write_nodes(&hash); //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); @@ -101,7 +101,7 @@ where .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.aux.replication.write_quorum(&self.aux.system)) + RequestStrategy::with_quorum(self.aux.replication.write_quorum()) .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; @@ -113,7 +113,7 @@ where for entry in entries.iter() { let hash = entry.partition_key().hash(); - let who = self.aux.replication.write_nodes(&hash, &self.aux.system); + let who = self.aux.replication.write_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { @@ -150,7 +150,7 @@ where sort_key: &F::S, ) -> Result, Error> { let hash = partition_key.hash(); - let who = self.aux.replication.read_nodes(&hash, &self.aux.system); + let who = self.aux.replication.read_nodes(&hash); //eprintln!("get who: {:?}", who); let rpc = TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); @@ -207,7 +207,7 @@ where limit: usize, ) -> Result, Error> { let hash = partition_key.hash(); - let who = self.aux.replication.read_nodes(&hash, &self.aux.system); + let who = self.aux.replication.read_nodes(&hash); let rpc = TableRPC::::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); -- cgit v1.2.3 From 515029d026937d29395379c76188f509984b8ace Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 16 Mar 2021 11:43:58 +0100 Subject: Refactor code --- src/table/table.rs | 53 ++++++++++++++++++++++++++--------------------------- 1 file changed, 26 insertions(+), 27 deletions(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index 2ce5868f..f00b4239 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -16,20 +16,17 @@ use garage_rpc::rpc_server::*; use crate::crdt::CRDT; use crate::data::*; use crate::gc::*; +use crate::merkle::*; use crate::replication::*; use crate::schema::*; use crate::sync::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); -pub struct TableAux { - pub system: Arc, - pub replication: R, -} - pub struct Table { - pub data: Arc>, - pub aux: Arc>, + pub system: Arc, + pub data: Arc>, + pub merkle_updater: Arc>, pub syncer: Arc>, rpc_client: Arc>>, } @@ -67,19 +64,22 @@ where let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::>(&rpc_path); - let data = TableData::new(name, instance, db, system.background.clone()); + let data = TableData::new(name, instance, replication, db); - let aux = Arc::new(TableAux { - system, - replication, - }); + let merkle_updater = MerkleUpdater::launch(data.clone(), system.background.clone()); - let syncer = TableSyncer::launch(data.clone(), aux.clone(), rpc_server); - TableGC::launch(data.clone(), aux.clone(), rpc_server); + let syncer = TableSyncer::launch( + system.clone(), + data.clone(), + merkle_updater.clone(), + rpc_server, + ); + TableGC::launch(data.clone(), system.clone(), rpc_server); let table = Arc::new(Self { + system, data, - aux, + merkle_updater, syncer, rpc_client, }); @@ -91,7 +91,7 @@ where pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self.aux.replication.write_nodes(&hash); + let who = self.data.replication.write_nodes(&hash); //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); @@ -101,7 +101,7 @@ where .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.aux.replication.write_quorum()) + RequestStrategy::with_quorum(self.data.replication.write_quorum()) .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; @@ -113,7 +113,7 @@ where for entry in entries.iter() { let hash = entry.partition_key().hash(); - let who = self.aux.replication.write_nodes(&hash); + let who = self.data.replication.write_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { @@ -137,7 +137,7 @@ where errors.push(e); } } - if errors.len() > self.aux.replication.max_write_errors() { + if errors.len() > self.data.replication.max_write_errors() { Err(Error::Message("Too many errors".into())) } else { Ok(()) @@ -150,7 +150,7 @@ where sort_key: &F::S, ) -> Result, Error> { let hash = partition_key.hash(); - let who = self.aux.replication.read_nodes(&hash); + let who = self.data.replication.read_nodes(&hash); //eprintln!("get who: {:?}", who); let rpc = TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); @@ -159,7 +159,7 @@ where .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.aux.replication.read_quorum()) + RequestStrategy::with_quorum(self.data.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -190,8 +190,7 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - self.aux - .system + self.system .background .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); } @@ -207,7 +206,7 @@ where limit: usize, ) -> Result, Error> { let hash = partition_key.hash(); - let who = self.aux.replication.read_nodes(&hash); + let who = self.data.replication.read_nodes(&hash); let rpc = TableRPC::::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); @@ -216,7 +215,7 @@ where .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.aux.replication.read_quorum()) + RequestStrategy::with_quorum(self.data.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -248,7 +247,7 @@ where } if !to_repair.is_empty() { let self2 = self.clone(); - self.aux.system.background.spawn_cancellable(async move { + self.system.background.spawn_cancellable(async move { for (_, v) in to_repair.iter_mut() { self2.repair_on_read(&who[..], v.take().unwrap()).await?; } @@ -288,7 +287,7 @@ where let self2 = self.clone(); self.rpc_client - .set_local_handler(self.aux.system.id, move |msg| { + .set_local_handler(self.system.id, move |msg| { let self2 = self2.clone(); async move { self2.handle(&msg).await } }); -- cgit v1.2.3 From 0aad2f2e066b5914ac94bb319e7679e2e7761b2b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 16 Mar 2021 11:47:39 +0100 Subject: some reordering --- src/table/table.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index f00b4239..421c8bf5 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -66,7 +66,7 @@ where let data = TableData::new(name, instance, replication, db); - let merkle_updater = MerkleUpdater::launch(data.clone(), system.background.clone()); + let merkle_updater = MerkleUpdater::launch(&system.background, data.clone()); let syncer = TableSyncer::launch( system.clone(), @@ -74,7 +74,7 @@ where merkle_updater.clone(), rpc_server, ); - TableGC::launch(data.clone(), system.clone(), rpc_server); + TableGC::launch(system.clone(), data.clone(), rpc_server); let table = Arc::new(Self { system, -- cgit v1.2.3 From 7b10245dfb741b7f801d1f3eaa56c6cb4f385d65 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 16 Mar 2021 18:42:33 +0100 Subject: Leader-based GC --- src/table/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/table/table.rs') diff --git a/src/table/table.rs b/src/table/table.rs index 421c8bf5..e203b178 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -64,7 +64,7 @@ where let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::>(&rpc_path); - let data = TableData::new(name, instance, replication, db); + let data = TableData::new(system.clone(), name, instance, replication, db); let merkle_updater = MerkleUpdater::launch(&system.background, data.clone()); -- cgit v1.2.3