diff options
Diffstat (limited to 'src/table/table.rs')
-rw-r--r-- | src/table/table.rs | 229 |
1 files changed, 42 insertions, 187 deletions
diff --git a/src/table/table.rs b/src/table/table.rs index 1f6b7d25..e203b178 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -2,9 +2,6 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; -use log::warn; - -use arc_swap::ArcSwapOption; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -13,25 +10,25 @@ use garage_util::data::*; use garage_util::error::Error; use garage_rpc::membership::System; -use garage_rpc::ring::Ring; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; +use crate::crdt::CRDT; +use crate::data::*; +use crate::gc::*; +use crate::merkle::*; +use crate::replication::*; use crate::schema::*; -use crate::table_sync::*; +use crate::sync::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); pub struct Table<F: TableSchema, R: TableReplication> { - pub instance: F, - pub replication: R, - - pub name: String, - pub(crate) rpc_client: Arc<RpcClient<TableRPC<F>>>, - pub system: Arc<System>, - pub store: sled::Tree, - pub syncer: ArcSwapOption<TableSyncer<F, R>>, + pub data: Arc<TableData<F, R>>, + pub merkle_updater: Arc<MerkleUpdater<F, R>>, + pub syncer: Arc<TableSyncer<F, R>>, + rpc_client: Arc<RpcClient<TableRPC<F>>>, } #[derive(Serialize, Deserialize)] @@ -45,30 +42,10 @@ pub(crate) enum TableRPC<F: TableSchema> { ReadRange(F::P, Option<F::S>, Option<F::Filter>, usize), Update(Vec<Arc<ByteBuf>>), - - SyncRPC(SyncRPC), } impl<F: TableSchema> RpcMessage for TableRPC<F> {} -pub trait TableReplication: Send + Sync { - // See examples in table_sharded.rs and table_fullcopy.rs - // To understand various replication methods - - // Which nodes to send reads from - fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>; - fn read_quorum(&self) -> usize; - - // Which nodes to send writes to - fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>; - fn write_quorum(&self, system: &System) -> usize; - fn max_write_errors(&self) -> usize; - - // Which are the nodes that do actually replicate the data - fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>; - fn split_points(&self, ring: &Ring) -> Vec<Hash>; -} - impl<F, R> Table<F, R> where F: TableSchema + 'static, @@ -76,7 +53,7 @@ where { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== - pub async fn new( + pub fn new( instance: F, replication: R, system: Arc<System>, @@ -84,31 +61,37 @@ where name: String, rpc_server: &mut RpcServer, ) -> Arc<Self> { - let store = db.open_tree(&name).expect("Unable to open DB tree"); - let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path); + let data = TableData::new(system.clone(), name, instance, replication, db); + + let merkle_updater = MerkleUpdater::launch(&system.background, data.clone()); + + let syncer = TableSyncer::launch( + system.clone(), + data.clone(), + merkle_updater.clone(), + rpc_server, + ); + TableGC::launch(system.clone(), data.clone(), rpc_server); + let table = Arc::new(Self { - instance, - replication, - name, - rpc_client, system, - store, - syncer: ArcSwapOption::from(None), + data, + merkle_updater, + syncer, + rpc_client, }); - table.clone().register_handler(rpc_server, rpc_path); - let syncer = TableSyncer::launch(table.clone()).await; - table.syncer.swap(Some(syncer)); + table.clone().register_handler(rpc_server, rpc_path); table } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self.replication.write_nodes(&hash, &self.system); + let who = self.data.replication.write_nodes(&hash); //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); @@ -118,7 +101,7 @@ where .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.replication.write_quorum(&self.system)) + RequestStrategy::with_quorum(self.data.replication.write_quorum()) .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; @@ -130,7 +113,7 @@ where for entry in entries.iter() { let hash = entry.partition_key().hash(); - let who = self.replication.write_nodes(&hash, &self.system); + let who = self.data.replication.write_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { @@ -154,7 +137,7 @@ where errors.push(e); } } - if errors.len() > self.replication.max_write_errors() { + if errors.len() > self.data.replication.max_write_errors() { Err(Error::Message("Too many errors".into())) } else { Ok(()) @@ -167,7 +150,7 @@ where sort_key: &F::S, ) -> Result<Option<F::E>, Error> { let hash = partition_key.hash(); - let who = self.replication.read_nodes(&hash, &self.system); + let who = self.data.replication.read_nodes(&hash); //eprintln!("get who: {:?}", who); let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); @@ -176,7 +159,7 @@ where .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.replication.read_quorum()) + RequestStrategy::with_quorum(self.data.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -187,7 +170,7 @@ where for resp in resps { if let TableRPC::ReadEntryResponse(value) = resp { if let Some(v_bytes) = value { - let v = self.decode_entry(v_bytes.as_slice())?; + let v = self.data.decode_entry(v_bytes.as_slice())?; ret = match ret { None => Some(v), Some(mut x) => { @@ -223,7 +206,7 @@ where limit: usize, ) -> Result<Vec<F::E>, Error> { let hash = partition_key.hash(); - let who = self.replication.read_nodes(&hash, &self.system); + let who = self.data.replication.read_nodes(&hash); let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); @@ -232,7 +215,7 @@ where .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.replication.read_quorum()) + RequestStrategy::with_quorum(self.data.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -243,8 +226,8 @@ where for resp in resps { if let TableRPC::Update(entries) = resp { for entry_bytes in entries.iter() { - let entry = self.decode_entry(entry_bytes.as_slice())?; - let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); + let entry = self.data.decode_entry(entry_bytes.as_slice())?; + let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key()); match ret.remove(&entry_key) { None => { ret.insert(entry_key, Some(entry)); @@ -313,146 +296,18 @@ where async fn handle(self: &Arc<Self>, msg: &TableRPC<F>) -> Result<TableRPC<F>, Error> { match msg { TableRPC::ReadEntry(key, sort_key) => { - let value = self.handle_read_entry(key, sort_key)?; + let value = self.data.read_entry(key, sort_key)?; Ok(TableRPC::ReadEntryResponse(value)) } TableRPC::ReadRange(key, begin_sort_key, filter, limit) => { - let values = self.handle_read_range(key, begin_sort_key, filter, *limit)?; + let values = self.data.read_range(key, begin_sort_key, filter, *limit)?; Ok(TableRPC::Update(values)) } TableRPC::Update(pairs) => { - self.handle_update(pairs).await?; + self.data.update_many(pairs)?; Ok(TableRPC::Ok) } - TableRPC::SyncRPC(rpc) => { - let syncer = self.syncer.load_full().unwrap(); - let response = syncer - .handle_rpc(rpc, self.system.background.stop_signal.clone()) - .await?; - Ok(TableRPC::SyncRPC(response)) - } _ => Err(Error::BadRPC(format!("Unexpected table RPC"))), } } - - fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> { - let tree_key = self.tree_key(p, s); - if let Some(bytes) = self.store.get(&tree_key)? { - Ok(Some(ByteBuf::from(bytes.to_vec()))) - } else { - Ok(None) - } - } - - fn handle_read_range( - &self, - p: &F::P, - s: &Option<F::S>, - filter: &Option<F::Filter>, - limit: usize, - ) -> Result<Vec<Arc<ByteBuf>>, Error> { - let partition_hash = p.hash(); - let first_key = match s { - None => partition_hash.to_vec(), - Some(sk) => self.tree_key(p, sk), - }; - let mut ret = vec![]; - for item in self.store.range(first_key..) { - let (key, value) = item?; - if &key[..32] != partition_hash.as_slice() { - break; - } - let keep = match filter { - None => true, - Some(f) => { - let entry = self.decode_entry(value.as_ref())?; - F::matches_filter(&entry, f) - } - }; - if keep { - ret.push(Arc::new(ByteBuf::from(value.as_ref()))); - } - if ret.len() >= limit { - break; - } - } - Ok(ret) - } - - pub async fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> { - let syncer = self.syncer.load_full().unwrap(); - - for update_bytes in entries.iter() { - let update = self.decode_entry(update_bytes.as_slice())?; - - let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - - let (old_entry, new_entry) = self.store.transaction(|db| { - let (old_entry, new_entry) = match db.get(&tree_key)? { - Some(prev_bytes) => { - let old_entry = self - .decode_entry(&prev_bytes) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); - (Some(old_entry), new_entry) - } - None => (None, update.clone()), - }; - - let new_bytes = rmp_to_vec_all_named(&new_entry) - .map_err(Error::RMPEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - db.insert(tree_key.clone(), new_bytes)?; - Ok((old_entry, new_entry)) - })?; - - if old_entry.as_ref() != Some(&new_entry) { - self.instance.updated(old_entry, Some(new_entry)); - syncer.invalidate(&tree_key[..]); - } - } - - Ok(()) - } - - pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { - let removed = self.store.transaction(|txn| { - if let Some(cur_v) = txn.get(k)? { - if cur_v == v { - txn.remove(k)?; - return Ok(true); - } - } - Ok(false) - })?; - if removed { - let old_entry = self.decode_entry(v)?; - self.instance.updated(Some(old_entry), None); - self.syncer.load_full().unwrap().invalidate(k); - } - Ok(removed) - } - - fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { - let mut ret = p.hash().to_vec(); - ret.extend(s.sort_key()); - ret - } - - fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> { - match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { - Ok(x) => Ok(x), - Err(e) => match F::try_migrate(bytes) { - Some(x) => Ok(x), - None => { - warn!("Unable to decode entry of {}: {}", self.name, e); - for line in hexdump::hexdump_iter(bytes) { - debug!("{}", line); - } - Err(e.into()) - } - }, - } - } } |