diff options
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/data.rs | 98 | ||||
-rw-r--r-- | src/table/schema.rs | 2 | ||||
-rw-r--r-- | src/table/table.rs | 126 | ||||
-rw-r--r-- | src/table/util.rs | 18 |
4 files changed, 186 insertions, 58 deletions
diff --git a/src/table/data.rs b/src/table/data.rs index ff7965f5..5cb10066 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -1,8 +1,9 @@ use core::borrow::Borrow; +use std::convert::TryInto; use std::sync::Arc; use serde_bytes::ByteBuf; -use sled::Transactional; +use sled::{IVec, Transactional}; use tokio::sync::Notify; use garage_util::data::*; @@ -16,12 +17,13 @@ use crate::gc::GcTodoEntry; use crate::metrics::*; use crate::replication::*; use crate::schema::*; +use crate::util::*; pub struct TableData<F: TableSchema, R: TableReplication> { system: Arc<System>, - pub(crate) instance: F, - pub(crate) replication: R, + pub instance: F, + pub replication: R, pub store: sled::Tree, @@ -83,18 +85,48 @@ where pub fn read_range( &self, - p: &F::P, - s: &Option<F::S>, + partition_key: &F::P, + start: &Option<F::S>, + filter: &Option<F::Filter>, + limit: usize, + enumeration_order: EnumerationOrder, + ) -> Result<Vec<Arc<ByteBuf>>, Error> { + let partition_hash = partition_key.hash(); + match enumeration_order { + EnumerationOrder::Forward => { + let first_key = match start { + None => partition_hash.to_vec(), + Some(sk) => self.tree_key(partition_key, sk), + }; + let range = self.store.range(first_key..); + self.read_range_aux(partition_hash, range, filter, limit) + } + EnumerationOrder::Reverse => match start { + Some(sk) => { + let last_key = self.tree_key(partition_key, sk); + let range = self.store.range(..=last_key).rev(); + self.read_range_aux(partition_hash, range, filter, limit) + } + None => { + let mut last_key = partition_hash.to_vec(); + let lower = u128::from_be_bytes(last_key[16..32].try_into().unwrap()); + last_key[16..32].copy_from_slice(&u128::to_be_bytes(lower + 1)); + let range = self.store.range(..last_key).rev(); + self.read_range_aux(partition_hash, range, filter, limit) + } + }, + } + } + + fn read_range_aux( + &self, + partition_hash: Hash, + range: impl Iterator<Item = sled::Result<(IVec, IVec)>>, filter: &Option<F::Filter>, limit: usize, ) -> Result<Vec<Arc<ByteBuf>>, Error> { - let partition_hash = p.hash(); - let first_key = match s { - None => partition_hash.to_vec(), - Some(sk) => self.tree_key(p, sk), - }; let mut ret = vec![]; - for item in self.store.range(first_key..) { + for item in range { let (key, value) = item?; if &key[..32] != partition_hash.as_slice() { break; @@ -136,17 +168,31 @@ where let update = self.decode_entry(update_bytes)?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); + self.update_entry_with(&tree_key[..], |ent| match ent { + Some(mut ent) => { + ent.merge(&update); + ent + } + None => update.clone(), + })?; + Ok(()) + } + + pub fn update_entry_with( + &self, + tree_key: &[u8], + f: impl Fn(Option<F::E>) -> F::E, + ) -> Result<Option<F::E>, Error> { let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - let (old_entry, old_bytes, new_entry) = match store.get(&tree_key)? { + let (old_entry, old_bytes, new_entry) = match store.get(tree_key)? { Some(old_bytes) => { let old_entry = self .decode_entry(&old_bytes) .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); + let new_entry = f(Some(old_entry.clone())); (Some(old_entry), Some(old_bytes), new_entry) } - None => (None, None, update.clone()), + None => (None, None, f(None)), }; // Scenario 1: the value changed, so of course there is a change @@ -163,8 +209,8 @@ where if value_changed || encoding_changed { let new_bytes_hash = blake2sum(&new_bytes[..]); - mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?; - store.insert(tree_key.clone(), new_bytes)?; + mkl_todo.insert(tree_key.to_vec(), new_bytes_hash.as_slice())?; + store.insert(tree_key.to_vec(), new_bytes)?; Ok(Some((old_entry, new_entry, new_bytes_hash))) } else { Ok(None) @@ -175,7 +221,7 @@ where self.metrics.internal_update_counter.add(1); let is_tombstone = new_entry.is_tombstone(); - self.instance.updated(old_entry, Some(new_entry)); + self.instance.updated(old_entry.as_ref(), Some(&new_entry)); self.merkle_todo_notify.notify_one(); if is_tombstone { // We are only responsible for GC'ing this item if we are the @@ -187,12 +233,14 @@ where let pk_hash = Hash::try_from(&tree_key[..32]).unwrap(); let nodes = self.replication.write_nodes(&pk_hash); if nodes.first() == Some(&self.system.id) { - GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?; + GcTodoEntry::new(tree_key.to_vec(), new_bytes_hash).save(&self.gc_todo)?; } } - } - Ok(()) + Ok(Some(new_entry)) + } else { + Ok(None) + } } pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { @@ -211,7 +259,7 @@ where self.metrics.internal_delete_counter.add(1); let old_entry = self.decode_entry(v)?; - self.instance.updated(Some(old_entry), None); + self.instance.updated(Some(&old_entry), None); self.merkle_todo_notify.notify_one(); } Ok(removed) @@ -235,7 +283,7 @@ where if let Some(old_v) = removed { let old_entry = self.decode_entry(&old_v[..])?; - self.instance.updated(Some(old_entry), None); + self.instance.updated(Some(&old_entry), None); self.merkle_todo_notify.notify_one(); Ok(true) } else { @@ -245,13 +293,13 @@ where // ---- Utility functions ---- - pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { + pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { let mut ret = p.hash().to_vec(); ret.extend(s.sort_key()); ret } - pub(crate) fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> { + pub fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> { match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { Ok(x) => Ok(x), Err(e) => match F::try_migrate(bytes) { diff --git a/src/table/schema.rs b/src/table/schema.rs index eba918a2..37327037 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -86,7 +86,7 @@ pub trait TableSchema: Send + Sync { // as the update itself is an unchangeable fact that will never go back // due to CRDT logic. Typically errors in propagation of info should be logged // to stderr. - fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {} + fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) {} fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool; } diff --git a/src/table/table.rs b/src/table/table.rs index 7f87a449..2a167604 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -1,4 +1,5 @@ -use std::collections::{BTreeMap, HashMap}; +use std::borrow::Borrow; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; use std::time::Duration; @@ -26,8 +27,9 @@ use crate::merkle::*; use crate::replication::*; use crate::schema::*; use crate::sync::*; +use crate::util::*; -const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); +pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> { pub system: Arc<System>, @@ -45,7 +47,13 @@ pub(crate) enum TableRpc<F: TableSchema> { ReadEntryResponse(Option<ByteBuf>), // Read range: read all keys in partition P, possibly starting at a certain sort key offset - ReadRange(F::P, Option<F::S>, Option<F::Filter>, usize), + ReadRange { + partition: F::P, + begin_sort_key: Option<F::S>, + filter: Option<F::Filter>, + limit: usize, + enumeration_order: EnumerationOrder, + }, Update(Vec<Arc<ByteBuf>>), } @@ -123,9 +131,13 @@ where Ok(()) } - pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { + pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error> + where + I: IntoIterator<Item = IE> + Send + Sync, + IE: Borrow<F::E> + Send + Sync, + { let tracer = opentelemetry::global::tracer("garage_table"); - let span = tracer.start(format!("{} insert_many {}", F::TABLE_NAME, entries.len())); + let span = tracer.start(format!("{} insert_many", F::TABLE_NAME)); self.insert_many_internal(entries) .bound_record_duration(&self.data.metrics.put_request_duration) @@ -137,10 +149,15 @@ where Ok(()) } - async fn insert_many_internal(&self, entries: &[F::E]) -> Result<(), Error> { + async fn insert_many_internal<I, IE>(&self, entries: I) -> Result<(), Error> + where + I: IntoIterator<Item = IE> + Send + Sync, + IE: Borrow<F::E> + Send + Sync, + { let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); - for entry in entries.iter() { + for entry in entries.into_iter() { + let entry = entry.borrow(); let hash = entry.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); @@ -261,12 +278,19 @@ where begin_sort_key: Option<F::S>, filter: Option<F::Filter>, limit: usize, + enumeration_order: EnumerationOrder, ) -> Result<Vec<F::E>, Error> { let tracer = opentelemetry::global::tracer("garage_table"); let span = tracer.start(format!("{} get_range", F::TABLE_NAME)); let res = self - .get_range_internal(partition_key, begin_sort_key, filter, limit) + .get_range_internal( + partition_key, + begin_sort_key, + filter, + limit, + enumeration_order, + ) .bound_record_duration(&self.data.metrics.get_request_duration) .with_context(Context::current_with_span(span)) .await?; @@ -282,11 +306,18 @@ where begin_sort_key: Option<F::S>, filter: Option<F::Filter>, limit: usize, + enumeration_order: EnumerationOrder, ) -> Result<Vec<F::E>, Error> { let hash = partition_key.hash(); let who = self.data.replication.read_nodes(&hash); - let rpc = TableRpc::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); + let rpc = TableRpc::<F>::ReadRange { + partition: partition_key.clone(), + begin_sort_key, + filter, + limit, + enumeration_order, + }; let resps = self .system @@ -302,44 +333,65 @@ where ) .await?; - let mut ret = BTreeMap::new(); - let mut to_repair = BTreeMap::new(); + let mut ret: BTreeMap<Vec<u8>, F::E> = BTreeMap::new(); + let mut to_repair = BTreeSet::new(); for resp in resps { if let TableRpc::Update(entries) = resp { for entry_bytes in entries.iter() { let entry = self.data.decode_entry(entry_bytes.as_slice())?; let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key()); - match ret.remove(&entry_key) { - None => { - ret.insert(entry_key, Some(entry)); - } - Some(Some(mut prev)) => { - let must_repair = prev != entry; - prev.merge(&entry); - if must_repair { - to_repair.insert(entry_key.clone(), Some(prev.clone())); + match ret.get_mut(&entry_key) { + Some(e) => { + if *e != entry { + e.merge(&entry); + to_repair.insert(entry_key.clone()); } - ret.insert(entry_key, Some(prev)); } - Some(None) => unreachable!(), + None => { + ret.insert(entry_key, entry); + } } } + } else { + return Err(Error::unexpected_rpc_message(resp)); } } + if !to_repair.is_empty() { let self2 = self.clone(); + let to_repair = to_repair + .into_iter() + .map(|k| ret.get(&k).unwrap().clone()) + .collect::<Vec<_>>(); self.system.background.spawn_cancellable(async move { - for (_, v) in to_repair.iter_mut() { - self2.repair_on_read(&who[..], v.take().unwrap()).await?; + for v in to_repair { + self2.repair_on_read(&who[..], v).await?; } Ok(()) }); } - let ret_vec = ret - .iter_mut() - .take(limit) - .map(|(_k, v)| v.take().unwrap()) - .collect::<Vec<_>>(); + + // At this point, the `ret` btreemap might contain more than `limit` + // items, because nodes might have returned us each `limit` items + // but for different keys. We have to take only the first `limit` items + // in this map, in the specified enumeration order, for two reasons: + // 1. To return to the user no more than the number of items that they requested + // 2. To return only items for which we have a read quorum: we do not know + // that we have a read quorum for the items after the first `limit` + // of them + let ret_vec = match enumeration_order { + EnumerationOrder::Forward => ret + .into_iter() + .take(limit) + .map(|(_k, v)| v) + .collect::<Vec<_>>(), + EnumerationOrder::Reverse => ret + .into_iter() + .rev() + .take(limit) + .map(|(_k, v)| v) + .collect::<Vec<_>>(), + }; Ok(ret_vec) } @@ -378,8 +430,20 @@ where let value = self.data.read_entry(key, sort_key)?; Ok(TableRpc::ReadEntryResponse(value)) } - TableRpc::ReadRange(key, begin_sort_key, filter, limit) => { - let values = self.data.read_range(key, begin_sort_key, filter, *limit)?; + TableRpc::ReadRange { + partition, + begin_sort_key, + filter, + limit, + enumeration_order, + } => { + let values = self.data.read_range( + partition, + begin_sort_key, + filter, + *limit, + *enumeration_order, + )?; Ok(TableRpc::Update(values)) } TableRpc::Update(pairs) => { diff --git a/src/table/util.rs b/src/table/util.rs index 2a5c3afe..20595a94 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -17,7 +17,7 @@ impl PartitionKey for EmptyKey { } } -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum DeletedFilter { Any, Deleted, @@ -33,3 +33,19 @@ impl DeletedFilter { } } } + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub enum EnumerationOrder { + Forward, + Reverse, +} + +impl EnumerationOrder { + pub fn from_reverse(reverse: bool) -> Self { + if reverse { + Self::Reverse + } else { + Self::Forward + } + } +} |