diff options
Diffstat (limited to 'src/table/table.rs')
-rw-r--r-- | src/table/table.rs | 126 |
1 files changed, 95 insertions, 31 deletions
diff --git a/src/table/table.rs b/src/table/table.rs index 7f87a449..2a167604 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -1,4 +1,5 @@ -use std::collections::{BTreeMap, HashMap}; +use std::borrow::Borrow; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; use std::time::Duration; @@ -26,8 +27,9 @@ use crate::merkle::*; use crate::replication::*; use crate::schema::*; use crate::sync::*; +use crate::util::*; -const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); +pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> { pub system: Arc<System>, @@ -45,7 +47,13 @@ pub(crate) enum TableRpc<F: TableSchema> { ReadEntryResponse(Option<ByteBuf>), // Read range: read all keys in partition P, possibly starting at a certain sort key offset - ReadRange(F::P, Option<F::S>, Option<F::Filter>, usize), + ReadRange { + partition: F::P, + begin_sort_key: Option<F::S>, + filter: Option<F::Filter>, + limit: usize, + enumeration_order: EnumerationOrder, + }, Update(Vec<Arc<ByteBuf>>), } @@ -123,9 +131,13 @@ where Ok(()) } - pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { + pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error> + where + I: IntoIterator<Item = IE> + Send + Sync, + IE: Borrow<F::E> + Send + Sync, + { let tracer = opentelemetry::global::tracer("garage_table"); - let span = tracer.start(format!("{} insert_many {}", F::TABLE_NAME, entries.len())); + let span = tracer.start(format!("{} insert_many", F::TABLE_NAME)); self.insert_many_internal(entries) .bound_record_duration(&self.data.metrics.put_request_duration) @@ -137,10 +149,15 @@ where Ok(()) } - async fn insert_many_internal(&self, entries: &[F::E]) -> Result<(), Error> { + async fn insert_many_internal<I, IE>(&self, entries: I) -> Result<(), Error> + where + I: IntoIterator<Item = IE> + Send + Sync, + IE: Borrow<F::E> + Send + Sync, + { let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); - for entry in entries.iter() { + for entry in entries.into_iter() { + let entry = entry.borrow(); let hash = entry.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); @@ -261,12 +278,19 @@ where begin_sort_key: Option<F::S>, filter: Option<F::Filter>, limit: usize, + enumeration_order: EnumerationOrder, ) -> Result<Vec<F::E>, Error> { let tracer = opentelemetry::global::tracer("garage_table"); let span = tracer.start(format!("{} get_range", F::TABLE_NAME)); let res = self - .get_range_internal(partition_key, begin_sort_key, filter, limit) + .get_range_internal( + partition_key, + begin_sort_key, + filter, + limit, + enumeration_order, + ) .bound_record_duration(&self.data.metrics.get_request_duration) .with_context(Context::current_with_span(span)) .await?; @@ -282,11 +306,18 @@ where begin_sort_key: Option<F::S>, filter: Option<F::Filter>, limit: usize, + enumeration_order: EnumerationOrder, ) -> Result<Vec<F::E>, Error> { let hash = partition_key.hash(); let who = self.data.replication.read_nodes(&hash); - let rpc = TableRpc::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); + let rpc = TableRpc::<F>::ReadRange { + partition: partition_key.clone(), + begin_sort_key, + filter, + limit, + enumeration_order, + }; let resps = self .system @@ -302,44 +333,65 @@ where ) .await?; - let mut ret = BTreeMap::new(); - let mut to_repair = BTreeMap::new(); + let mut ret: BTreeMap<Vec<u8>, F::E> = BTreeMap::new(); + let mut to_repair = BTreeSet::new(); for resp in resps { if let TableRpc::Update(entries) = resp { for entry_bytes in entries.iter() { let entry = self.data.decode_entry(entry_bytes.as_slice())?; let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key()); - match ret.remove(&entry_key) { - None => { - ret.insert(entry_key, Some(entry)); - } - Some(Some(mut prev)) => { - let must_repair = prev != entry; - prev.merge(&entry); - if must_repair { - to_repair.insert(entry_key.clone(), Some(prev.clone())); + match ret.get_mut(&entry_key) { + Some(e) => { + if *e != entry { + e.merge(&entry); + to_repair.insert(entry_key.clone()); } - ret.insert(entry_key, Some(prev)); } - Some(None) => unreachable!(), + None => { + ret.insert(entry_key, entry); + } } } + } else { + return Err(Error::unexpected_rpc_message(resp)); } } + if !to_repair.is_empty() { let self2 = self.clone(); + let to_repair = to_repair + .into_iter() + .map(|k| ret.get(&k).unwrap().clone()) + .collect::<Vec<_>>(); self.system.background.spawn_cancellable(async move { - for (_, v) in to_repair.iter_mut() { - self2.repair_on_read(&who[..], v.take().unwrap()).await?; + for v in to_repair { + self2.repair_on_read(&who[..], v).await?; } Ok(()) }); } - let ret_vec = ret - .iter_mut() - .take(limit) - .map(|(_k, v)| v.take().unwrap()) - .collect::<Vec<_>>(); + + // At this point, the `ret` btreemap might contain more than `limit` + // items, because nodes might have returned us each `limit` items + // but for different keys. We have to take only the first `limit` items + // in this map, in the specified enumeration order, for two reasons: + // 1. To return to the user no more than the number of items that they requested + // 2. To return only items for which we have a read quorum: we do not know + // that we have a read quorum for the items after the first `limit` + // of them + let ret_vec = match enumeration_order { + EnumerationOrder::Forward => ret + .into_iter() + .take(limit) + .map(|(_k, v)| v) + .collect::<Vec<_>>(), + EnumerationOrder::Reverse => ret + .into_iter() + .rev() + .take(limit) + .map(|(_k, v)| v) + .collect::<Vec<_>>(), + }; Ok(ret_vec) } @@ -378,8 +430,20 @@ where let value = self.data.read_entry(key, sort_key)?; Ok(TableRpc::ReadEntryResponse(value)) } - TableRpc::ReadRange(key, begin_sort_key, filter, limit) => { - let values = self.data.read_range(key, begin_sort_key, filter, *limit)?; + TableRpc::ReadRange { + partition, + begin_sort_key, + filter, + limit, + enumeration_order, + } => { + let values = self.data.read_range( + partition, + begin_sort_key, + filter, + *limit, + *enumeration_order, + )?; Ok(TableRpc::Update(values)) } TableRpc::Update(pairs) => { |