From 5768bf362262f78376af14517c4921941986192e Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 10 May 2022 13:16:57 +0200 Subject: First implementation of K2V (#293) **Specification:** View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md) - [x] Specify the structure of K2V triples - [x] Specify the DVVS format used for causality detection - [x] Specify the K2V index (just a counter of number of values per partition key) - [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem - [x] Specify index endpoint: ReadIndex - [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch - [x] Move to JSON objects instead of tuples - [x] Specify endpoints for polling for updates on single values (PollItem) **Implementation:** - [x] Table for K2V items, causal contexts - [x] Indexing mechanism and table for K2V index - [x] Make API handlers a bit more generic - [x] K2V API endpoint - [x] K2V API router - [x] ReadItem - [x] InsertItem - [x] DeleteItem - [x] PollItem - [x] ReadIndex - [x] InsertBatch - [x] ReadBatch - [x] DeleteBatch **Testing:** - [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values) - [x] Actual tests: - [x] Adapt testing framework - [x] Simple test with InsertItem + ReadItem - [x] Test with several Insert/Read/DeleteItem + ReadIndex - [x] Test all combinations of return formats for ReadItem - [x] Test with ReadBatch, InsertBatch, DeleteBatch - [x] Test with PollItem - [x] Test error codes - [ ] Fix most broken stuff - [x] test PollItem broken randomly - [x] when invalid causality tokens are given, errors should be 4xx not 5xx **Improvements:** - [x] Descending range queries - [x] Specify - [x] Implement - [x] Add test - [x] Batch updates to index counter - [x] Put K2V behind `k2v` feature flag Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293 Co-authored-by: Alex Co-committed-by: Alex --- src/model/Cargo.toml | 5 + src/model/block_ref_table.rs | 74 --------- src/model/garage.rs | 97 +++++++++--- src/model/helper/bucket.rs | 3 +- src/model/index_counter.rs | 305 +++++++++++++++++++++++++++++++++++ src/model/k2v/causality.rs | 96 +++++++++++ src/model/k2v/counter_table.rs | 20 +++ src/model/k2v/item_table.rs | 291 ++++++++++++++++++++++++++++++++++ src/model/k2v/mod.rs | 7 + src/model/k2v/poll.rs | 50 ++++++ src/model/k2v/rpc.rs | 343 ++++++++++++++++++++++++++++++++++++++++ src/model/lib.rs | 9 +- src/model/object_table.rs | 334 -------------------------------------- src/model/s3/block_ref_table.rs | 74 +++++++++ src/model/s3/mod.rs | 3 + src/model/s3/object_table.rs | 337 +++++++++++++++++++++++++++++++++++++++ src/model/s3/version_table.rs | 207 ++++++++++++++++++++++++ src/model/version_table.rs | 204 ------------------------ 18 files changed, 1823 insertions(+), 636 deletions(-) delete mode 100644 src/model/block_ref_table.rs create mode 100644 src/model/index_counter.rs create mode 100644 src/model/k2v/causality.rs create mode 100644 src/model/k2v/counter_table.rs create mode 100644 src/model/k2v/item_table.rs create mode 100644 src/model/k2v/mod.rs create mode 100644 src/model/k2v/poll.rs create mode 100644 src/model/k2v/rpc.rs delete mode 100644 src/model/object_table.rs create mode 100644 src/model/s3/block_ref_table.rs create mode 100644 src/model/s3/mod.rs create mode 100644 src/model/s3/object_table.rs create mode 100644 src/model/s3/version_table.rs delete mode 100644 src/model/version_table.rs (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 007cec89..133fe44e 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -22,8 +22,10 @@ garage_model_050 = { package = "garage_model", version = "0.5.1" } async-trait = "0.1.7" arc-swap = "1.0" +blake2 = "0.9" err-derive = "0.3" hex = "0.4" +base64 = "0.13" tracing = "0.1.30" rand = "0.8" zstd = { version = "0.9", default-features = false } @@ -42,3 +44,6 @@ opentelemetry = "0.17" #netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } #netapp = { version = "0.4", path = "../../../netapp" } netapp = "0.4" + +[features] +k2v = [ "garage_util/k2v" ] diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs deleted file mode 100644 index b6945403..00000000 --- a/src/model/block_ref_table.rs +++ /dev/null @@ -1,74 +0,0 @@ -use serde::{Deserialize, Serialize}; -use std::sync::Arc; - -use garage_util::data::*; - -use garage_table::crdt::Crdt; -use garage_table::*; - -use garage_block::manager::*; - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct BlockRef { - /// Hash (blake2 sum) of the block, used as partition key - pub block: Hash, - - /// Id of the Version for the object containing this block, used as sorting key - pub version: Uuid, - - // Keep track of deleted status - /// Is the Version that contains this block deleted - pub deleted: crdt::Bool, -} - -impl Entry for BlockRef { - fn partition_key(&self) -> &Hash { - &self.block - } - fn sort_key(&self) -> &Uuid { - &self.version - } - fn is_tombstone(&self) -> bool { - self.deleted.get() - } -} - -impl Crdt for BlockRef { - fn merge(&mut self, other: &Self) { - self.deleted.merge(&other.deleted); - } -} - -pub struct BlockRefTable { - pub block_manager: Arc, -} - -impl TableSchema for BlockRefTable { - const TABLE_NAME: &'static str = "block_ref"; - - type P = Hash; - type S = Uuid; - type E = BlockRef; - type Filter = DeletedFilter; - - fn updated(&self, old: Option, new: Option) { - #[allow(clippy::or_fun_call)] - let block = &old.as_ref().or(new.as_ref()).unwrap().block; - let was_before = old.as_ref().map(|x| !x.deleted.get()).unwrap_or(false); - let is_after = new.as_ref().map(|x| !x.deleted.get()).unwrap_or(false); - if is_after && !was_before { - if let Err(e) = self.block_manager.block_incref(block) { - warn!("block_incref failed for block {:?}: {}", block, e); - } - } - if was_before && !is_after { - if let Err(e) = self.block_manager.block_decref(block) { - warn!("block_decref failed for block {:?}: {}", block, e); - } - } - } - - fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted.get()) - } -} diff --git a/src/model/garage.rs b/src/model/garage.rs index abdb920a..03e21f8a 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -13,13 +13,19 @@ use garage_table::replication::TableFullReplication; use garage_table::replication::TableShardedReplication; use garage_table::*; -use crate::block_ref_table::*; +use crate::s3::block_ref_table::*; +use crate::s3::object_table::*; +use crate::s3::version_table::*; + use crate::bucket_alias_table::*; use crate::bucket_table::*; use crate::helper; use crate::key_table::*; -use crate::object_table::*; -use crate::version_table::*; + +#[cfg(feature = "k2v")] +use crate::index_counter::*; +#[cfg(feature = "k2v")] +use crate::k2v::{counter_table::*, item_table::*, poll::*, rpc::*}; /// An entire Garage full of data pub struct Garage { @@ -35,16 +41,32 @@ pub struct Garage { /// The block manager pub block_manager: Arc, - /// Table containing informations about buckets + /// Table containing buckets pub bucket_table: Arc>, - /// Table containing informations about bucket aliases + /// Table containing bucket aliases pub bucket_alias_table: Arc>, - /// Table containing informations about api keys + /// Table containing api keys pub key_table: Arc>, + /// Table containing S3 objects pub object_table: Arc>, + /// Table containing S3 object versions pub version_table: Arc>, + /// Table containing S3 block references (not blocks themselves) pub block_ref_table: Arc>, + + #[cfg(feature = "k2v")] + pub k2v: GarageK2V, +} + +#[cfg(feature = "k2v")] +pub struct GarageK2V { + /// Table containing K2V items + pub item_table: Arc>, + /// Indexing table containing K2V item counters + pub counter_table: Arc>, + /// K2V RPC handler + pub rpc: Arc, } impl Garage { @@ -95,6 +117,21 @@ impl Garage { system.clone(), ); + // ---- admin tables ---- + info!("Initialize bucket_table..."); + let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db); + + info!("Initialize bucket_alias_table..."); + let bucket_alias_table = Table::new( + BucketAliasTable, + control_rep_param.clone(), + system.clone(), + &db, + ); + info!("Initialize key_table_table..."); + let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db); + + // ---- S3 tables ---- info!("Initialize block_ref_table..."); let block_ref_table = Table::new( BlockRefTable { @@ -117,29 +154,20 @@ impl Garage { ); info!("Initialize object_table..."); + #[allow(clippy::redundant_clone)] let object_table = Table::new( ObjectTable { background: background.clone(), version_table: version_table.clone(), }, - meta_rep_param, - system.clone(), - &db, - ); - - info!("Initialize bucket_table..."); - let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db); - - info!("Initialize bucket_alias_table..."); - let bucket_alias_table = Table::new( - BucketAliasTable, - control_rep_param.clone(), + meta_rep_param.clone(), system.clone(), &db, ); - info!("Initialize key_table_table..."); - let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db); + // ---- K2V ---- + #[cfg(feature = "k2v")] + let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param); info!("Initialize Garage..."); @@ -155,6 +183,8 @@ impl Garage { object_table, version_table, block_ref_table, + #[cfg(feature = "k2v")] + k2v, }) } @@ -162,3 +192,30 @@ impl Garage { helper::bucket::BucketHelper(self) } } + +#[cfg(feature = "k2v")] +impl GarageK2V { + fn new(system: Arc, db: &sled::Db, meta_rep_param: TableShardedReplication) -> Self { + info!("Initialize K2V counter table..."); + let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db); + info!("Initialize K2V subscription manager..."); + let subscriptions = Arc::new(SubscriptionManager::new()); + info!("Initialize K2V item table..."); + let item_table = Table::new( + K2VItemTable { + counter_table: counter_table.clone(), + subscriptions: subscriptions.clone(), + }, + meta_rep_param, + system.clone(), + db, + ); + let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions); + + Self { + item_table, + counter_table, + rpc, + } + } +} diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 706faf26..54d2f97b 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -1,4 +1,4 @@ -use garage_table::util::EmptyKey; +use garage_table::util::*; use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::{Error as GarageError, OkOrMessage}; @@ -116,6 +116,7 @@ impl<'a> BucketHelper<'a> { None, Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())), 10, + EnumerationOrder::Forward, ) .await? .into_iter() diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs new file mode 100644 index 00000000..123154d4 --- /dev/null +++ b/src/model/index_counter.rs @@ -0,0 +1,305 @@ +use std::collections::{hash_map, BTreeMap, HashMap}; +use std::marker::PhantomData; +use std::sync::Arc; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use tokio::sync::{mpsc, watch}; + +use garage_rpc::ring::Ring; +use garage_rpc::system::System; +use garage_util::data::*; +use garage_util::error::*; + +use garage_table::crdt::*; +use garage_table::replication::TableShardedReplication; +use garage_table::*; + +pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static { + const NAME: &'static str; + type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type S: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; +} + +/// A counter entry in the global table +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct CounterEntry { + pub pk: T::P, + pub sk: T::S, + pub values: BTreeMap, +} + +impl Entry for CounterEntry { + fn partition_key(&self) -> &T::P { + &self.pk + } + fn sort_key(&self) -> &T::S { + &self.sk + } + fn is_tombstone(&self) -> bool { + self.values + .iter() + .all(|(_, v)| v.node_values.iter().all(|(_, (_, v))| *v == 0)) + } +} + +impl CounterEntry { + pub fn filtered_values(&self, ring: &Ring) -> HashMap { + let nodes = &ring.layout.node_id_vec[..]; + self.filtered_values_with_nodes(nodes) + } + + pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap { + let mut ret = HashMap::new(); + for (name, vals) in self.values.iter() { + let new_vals = vals + .node_values + .iter() + .filter(|(n, _)| nodes.contains(n)) + .map(|(_, (_, v))| *v) + .collect::>(); + if !new_vals.is_empty() { + ret.insert( + name.clone(), + new_vals.iter().fold(i64::MIN, |a, b| std::cmp::max(a, *b)), + ); + } + } + + ret + } +} + +/// A counter entry in the global table +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct CounterValue { + pub node_values: BTreeMap, +} + +impl Crdt for CounterEntry { + fn merge(&mut self, other: &Self) { + for (name, e2) in other.values.iter() { + if let Some(e) = self.values.get_mut(name) { + e.merge(e2); + } else { + self.values.insert(name.clone(), e2.clone()); + } + } + } +} + +impl Crdt for CounterValue { + fn merge(&mut self, other: &Self) { + for (node, (t2, e2)) in other.node_values.iter() { + if let Some((t, e)) = self.node_values.get_mut(node) { + if t2 > t { + *e = *e2; + } + } else { + self.node_values.insert(*node, (*t2, *e2)); + } + } + } +} + +pub struct CounterTable { + _phantom_t: PhantomData, +} + +impl TableSchema for CounterTable { + const TABLE_NAME: &'static str = T::NAME; + + type P = T::P; + type S = T::S; + type E = CounterEntry; + type Filter = (DeletedFilter, Vec); + + fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) { + // nothing for now + } + + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + if filter.0 == DeletedFilter::Any { + return true; + } + + let is_tombstone = entry + .filtered_values_with_nodes(&filter.1[..]) + .iter() + .all(|(_, v)| *v == 0); + filter.0.apply(is_tombstone) + } +} + +// ---- + +pub struct IndexCounter { + this_node: Uuid, + local_counter: sled::Tree, + propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>, + pub table: Arc, TableShardedReplication>>, +} + +impl IndexCounter { + pub fn new( + system: Arc, + replication: TableShardedReplication, + db: &sled::Db, + ) -> Arc { + let background = system.background.clone(); + + let (propagate_tx, propagate_rx) = mpsc::unbounded_channel(); + + let this = Arc::new(Self { + this_node: system.id, + local_counter: db + .open_tree(format!("local_counter:{}", T::NAME)) + .expect("Unable to open local counter tree"), + propagate_tx, + table: Table::new( + CounterTable { + _phantom_t: Default::default(), + }, + replication, + system, + db, + ), + }); + + let this2 = this.clone(); + background.spawn_worker( + format!("{} index counter propagator", T::NAME), + move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit), + ); + this + } + + pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> { + let tree_key = self.table.data.tree_key(pk, sk); + + let new_entry = self.local_counter.transaction(|tx| { + let mut entry = match tx.get(&tree_key[..])? { + Some(old_bytes) => { + rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) + .map_err(Error::RmpDecode) + .map_err(sled::transaction::ConflictableTransactionError::Abort)? + } + None => LocalCounterEntry { + values: BTreeMap::new(), + }, + }; + + for (s, inc) in counts.iter() { + let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); + ent.0 += 1; + ent.1 += *inc; + } + + let new_entry_bytes = rmp_to_vec_all_named(&entry) + .map_err(Error::RmpEncode) + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + tx.insert(&tree_key[..], new_entry_bytes)?; + + Ok(entry) + })?; + + if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), new_entry)) { + error!( + "Could not propagate updated counter values, failed to send to channel: {}", + e + ); + } + + Ok(()) + } + + async fn propagate_loop( + self: Arc, + mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>, + must_exit: watch::Receiver, + ) { + // This loop batches updates to counters to be sent all at once. + // They are sent once the propagate_rx channel has been emptied (or is closed). + let mut buf = HashMap::new(); + let mut errors = 0; + + loop { + let (ent, closed) = match propagate_rx.try_recv() { + Ok(ent) => (Some(ent), false), + Err(mpsc::error::TryRecvError::Empty) if buf.is_empty() => { + match propagate_rx.recv().await { + Some(ent) => (Some(ent), false), + None => (None, true), + } + } + Err(mpsc::error::TryRecvError::Empty) => (None, false), + Err(mpsc::error::TryRecvError::Disconnected) => (None, true), + }; + + if let Some((pk, sk, counters)) = ent { + let tree_key = self.table.data.tree_key(&pk, &sk); + let dist_entry = counters.into_counter_entry::(self.this_node, pk, sk); + match buf.entry(tree_key) { + hash_map::Entry::Vacant(e) => { + e.insert(dist_entry); + } + hash_map::Entry::Occupied(mut e) => { + e.get_mut().merge(&dist_entry); + } + } + // As long as we can add entries, loop back and add them to batch + // before sending batch to other nodes + continue; + } + + if !buf.is_empty() { + let entries = buf.iter().map(|(_k, v)| v); + if let Err(e) = self.table.insert_many(entries).await { + errors += 1; + if errors >= 2 && *must_exit.borrow() { + error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e); + break; + } + warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + + buf.clear(); + errors = 0; + } + + if closed || *must_exit.borrow() { + break; + } + } + } +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +struct LocalCounterEntry { + values: BTreeMap, +} + +impl LocalCounterEntry { + fn into_counter_entry( + self, + this_node: Uuid, + pk: T::P, + sk: T::S, + ) -> CounterEntry { + CounterEntry { + pk, + sk, + values: self + .values + .into_iter() + .map(|(name, (ts, v))| { + let mut node_values = BTreeMap::new(); + node_values.insert(this_node, (ts, v)); + (name, CounterValue { node_values }) + }) + .collect(), + } + } +} diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs new file mode 100644 index 00000000..8c76a32b --- /dev/null +++ b/src/model/k2v/causality.rs @@ -0,0 +1,96 @@ +use std::collections::BTreeMap; +use std::convert::TryInto; + +use serde::{Deserialize, Serialize}; + +use garage_util::data::*; + +/// Node IDs used in K2V are u64 integers that are the abbreviation +/// of full Garage node IDs which are 256-bit UUIDs. +pub type K2VNodeId = u64; + +pub fn make_node_id(node_id: Uuid) -> K2VNodeId { + let mut tmp = [0u8; 8]; + tmp.copy_from_slice(&node_id.as_slice()[..8]); + u64::from_be_bytes(tmp) +} + +#[derive(PartialEq, Debug, Serialize, Deserialize)] +pub struct CausalContext { + pub vector_clock: BTreeMap, +} + +impl CausalContext { + /// Empty causality context + pub fn new_empty() -> Self { + Self { + vector_clock: BTreeMap::new(), + } + } + /// Make binary representation and encode in base64 + pub fn serialize(&self) -> String { + let mut ints = Vec::with_capacity(2 * self.vector_clock.len()); + for (node, time) in self.vector_clock.iter() { + ints.push(*node); + ints.push(*time); + } + let checksum = ints.iter().fold(0, |acc, v| acc ^ *v); + + let mut bytes = u64::to_be_bytes(checksum).to_vec(); + for i in ints { + bytes.extend(u64::to_be_bytes(i)); + } + + base64::encode_config(bytes, base64::URL_SAFE_NO_PAD) + } + /// Parse from base64-encoded binary representation + pub fn parse(s: &str) -> Result { + let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD) + .map_err(|e| format!("bad causality token base64: {}", e))?; + if bytes.len() % 16 != 8 || bytes.len() < 8 { + return Err("bad causality token length".into()); + } + + let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap()); + let mut ret = CausalContext { + vector_clock: BTreeMap::new(), + }; + + for i in 0..(bytes.len() / 16) { + let node_id = u64::from_be_bytes(bytes[8 + i * 16..16 + i * 16].try_into().unwrap()); + let time = u64::from_be_bytes(bytes[16 + i * 16..24 + i * 16].try_into().unwrap()); + ret.vector_clock.insert(node_id, time); + } + + let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t); + + if check != checksum { + return Err("bad causality token checksum".into()); + } + + Ok(ret) + } + /// Check if this causal context contains newer items than another one + pub fn is_newer_than(&self, other: &Self) -> bool { + self.vector_clock + .iter() + .any(|(k, v)| v > other.vector_clock.get(k).unwrap_or(&0)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_causality_token_serialization() { + let ct = CausalContext { + vector_clock: [(4, 42), (1928131023, 76), (0xefc0c1c47f9de433, 2)] + .iter() + .cloned() + .collect(), + }; + + assert_eq!(CausalContext::parse(&ct.serialize()).unwrap(), ct); + } +} diff --git a/src/model/k2v/counter_table.rs b/src/model/k2v/counter_table.rs new file mode 100644 index 00000000..4856eb2b --- /dev/null +++ b/src/model/k2v/counter_table.rs @@ -0,0 +1,20 @@ +use garage_util::data::*; + +use crate::index_counter::*; + +pub const ENTRIES: &str = "entries"; +pub const CONFLICTS: &str = "conflicts"; +pub const VALUES: &str = "values"; +pub const BYTES: &str = "bytes"; + +#[derive(PartialEq, Clone)] +pub struct K2VCounterTable; + +impl CounterSchema for K2VCounterTable { + const NAME: &'static str = "k2v_index_counter"; + + // Partition key = bucket id + type P = Uuid; + // Sort key = K2V item's partition key + type S = String; +} diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs new file mode 100644 index 00000000..8b7cc08a --- /dev/null +++ b/src/model/k2v/item_table.rs @@ -0,0 +1,291 @@ +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::sync::Arc; + +use garage_util::data::*; + +use garage_table::crdt::*; +use garage_table::*; + +use crate::index_counter::*; +use crate::k2v::causality::*; +use crate::k2v::counter_table::*; +use crate::k2v::poll::*; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct K2VItem { + pub partition: K2VItemPartition, + pub sort_key: String, + + items: BTreeMap, +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, Hash, Eq)] +pub struct K2VItemPartition { + pub bucket_id: Uuid, + pub partition_key: String, +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +struct DvvsEntry { + t_discard: u64, + values: Vec<(u64, DvvsValue)>, +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum DvvsValue { + Value(#[serde(with = "serde_bytes")] Vec), + Deleted, +} + +impl K2VItem { + /// Creates a new K2VItem when no previous entry existed in the db + pub fn new(bucket_id: Uuid, partition_key: String, sort_key: String) -> Self { + Self { + partition: K2VItemPartition { + bucket_id, + partition_key, + }, + sort_key, + items: BTreeMap::new(), + } + } + /// Updates a K2VItem with a new value or a deletion event + pub fn update( + &mut self, + this_node: Uuid, + context: &Option, + new_value: DvvsValue, + ) { + if let Some(context) = context { + for (node, t_discard) in context.vector_clock.iter() { + if let Some(e) = self.items.get_mut(node) { + e.t_discard = std::cmp::max(e.t_discard, *t_discard); + } else { + self.items.insert( + *node, + DvvsEntry { + t_discard: *t_discard, + values: vec![], + }, + ); + } + } + } + + self.discard(); + + let node_id = make_node_id(this_node); + let e = self.items.entry(node_id).or_insert(DvvsEntry { + t_discard: 0, + values: vec![], + }); + let t_prev = e.max_time(); + e.values.push((t_prev + 1, new_value)); + } + + /// Extract the causality context of a K2V Item + pub fn causal_context(&self) -> CausalContext { + let mut cc = CausalContext::new_empty(); + for (node, ent) in self.items.iter() { + cc.vector_clock.insert(*node, ent.max_time()); + } + cc + } + + /// Extract the list of values + pub fn values(&'_ self) -> Vec<&'_ DvvsValue> { + let mut ret = vec![]; + for (_, ent) in self.items.iter() { + for (_, v) in ent.values.iter() { + if !ret.contains(&v) { + ret.push(v); + } + } + } + ret + } + + fn discard(&mut self) { + for (_, ent) in self.items.iter_mut() { + ent.discard(); + } + } + + // returns counters: (non-deleted entries, conflict entries, non-tombstone values, bytes used) + fn stats(&self) -> (i64, i64, i64, i64) { + let values = self.values(); + + let n_entries = if self.is_tombstone() { 0 } else { 1 }; + let n_conflicts = if values.len() > 1 { 1 } else { 0 }; + let n_values = values + .iter() + .filter(|v| matches!(v, DvvsValue::Value(_))) + .count() as i64; + let n_bytes = values + .iter() + .map(|v| match v { + DvvsValue::Deleted => 0, + DvvsValue::Value(v) => v.len() as i64, + }) + .sum(); + + (n_entries, n_conflicts, n_values, n_bytes) + } +} + +impl DvvsEntry { + fn max_time(&self) -> u64 { + self.values + .iter() + .fold(self.t_discard, |acc, (vts, _)| std::cmp::max(acc, *vts)) + } + + fn discard(&mut self) { + self.values = std::mem::take(&mut self.values) + .into_iter() + .filter(|(t, _)| *t > self.t_discard) + .collect::>(); + } +} + +impl Crdt for K2VItem { + fn merge(&mut self, other: &Self) { + for (node, e2) in other.items.iter() { + if let Some(e) = self.items.get_mut(node) { + e.merge(e2); + } else { + self.items.insert(*node, e2.clone()); + } + } + } +} + +impl Crdt for DvvsEntry { + fn merge(&mut self, other: &Self) { + self.t_discard = std::cmp::max(self.t_discard, other.t_discard); + self.discard(); + + let t_max = self.max_time(); + for (vt, vv) in other.values.iter() { + if *vt > t_max { + self.values.push((*vt, vv.clone())); + } + } + } +} + +impl PartitionKey for K2VItemPartition { + fn hash(&self) -> Hash { + use blake2::{Blake2b, Digest}; + + let mut hasher = Blake2b::new(); + hasher.update(self.bucket_id.as_slice()); + hasher.update(self.partition_key.as_bytes()); + let mut hash = [0u8; 32]; + hash.copy_from_slice(&hasher.finalize()[..32]); + hash.into() + } +} + +impl Entry for K2VItem { + fn partition_key(&self) -> &K2VItemPartition { + &self.partition + } + fn sort_key(&self) -> &String { + &self.sort_key + } + fn is_tombstone(&self) -> bool { + self.values() + .iter() + .all(|v| matches!(v, DvvsValue::Deleted)) + } +} + +pub struct K2VItemTable { + pub(crate) counter_table: Arc>, + pub(crate) subscriptions: Arc, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct ItemFilter { + pub exclude_only_tombstones: bool, + pub conflicts_only: bool, +} + +impl TableSchema for K2VItemTable { + const TABLE_NAME: &'static str = "k2v_item"; + + type P = K2VItemPartition; + type S = String; + type E = K2VItem; + type Filter = ItemFilter; + + fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + // 1. Count + let (old_entries, old_conflicts, old_values, old_bytes) = match old { + None => (0, 0, 0, 0), + Some(e) => e.stats(), + }; + let (new_entries, new_conflicts, new_values, new_bytes) = match new { + None => (0, 0, 0, 0), + Some(e) => e.stats(), + }; + + let count_pk = old + .map(|e| e.partition.bucket_id) + .unwrap_or_else(|| new.unwrap().partition.bucket_id); + let count_sk = old + .map(|e| &e.partition.partition_key) + .unwrap_or_else(|| &new.unwrap().partition.partition_key); + + if let Err(e) = self.counter_table.count( + &count_pk, + count_sk, + &[ + (ENTRIES, new_entries - old_entries), + (CONFLICTS, new_conflicts - old_conflicts), + (VALUES, new_values - old_values), + (BYTES, new_bytes - old_bytes), + ], + ) { + error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e); + } + + // 2. Notify + if let Some(new_ent) = new { + self.subscriptions.notify(new_ent); + } + } + + #[allow(clippy::nonminimal_bool)] + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + let v = entry.values(); + !(filter.conflicts_only && v.len() < 2) + && !(filter.exclude_only_tombstones && entry.is_tombstone()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_dvvsentry_merge_simple() { + let e1 = DvvsEntry { + t_discard: 4, + values: vec![ + (5, DvvsValue::Value(vec![15])), + (6, DvvsValue::Value(vec![16])), + ], + }; + let e2 = DvvsEntry { + t_discard: 5, + values: vec![(6, DvvsValue::Value(vec![16])), (7, DvvsValue::Deleted)], + }; + + let mut e3 = e1.clone(); + e3.merge(&e2); + assert_eq!(e2, e3); + } +} diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs new file mode 100644 index 00000000..664172a6 --- /dev/null +++ b/src/model/k2v/mod.rs @@ -0,0 +1,7 @@ +pub mod causality; + +pub mod counter_table; +pub mod item_table; + +pub mod poll; +pub mod rpc; diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs new file mode 100644 index 00000000..93105207 --- /dev/null +++ b/src/model/k2v/poll.rs @@ -0,0 +1,50 @@ +use std::collections::HashMap; +use std::sync::Mutex; + +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast; + +use crate::k2v::item_table::*; + +#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PollKey { + pub partition: K2VItemPartition, + pub sort_key: String, +} + +#[derive(Default)] +pub struct SubscriptionManager { + subscriptions: Mutex>>, +} + +impl SubscriptionManager { + pub fn new() -> Self { + Self::default() + } + + pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver { + let mut subs = self.subscriptions.lock().unwrap(); + if let Some(s) = subs.get(key) { + s.subscribe() + } else { + let (tx, rx) = broadcast::channel(8); + subs.insert(key.clone(), tx); + rx + } + } + + pub fn notify(&self, item: &K2VItem) { + let key = PollKey { + partition: item.partition.clone(), + sort_key: item.sort_key.clone(), + }; + let mut subs = self.subscriptions.lock().unwrap(); + if let Some(s) = subs.get(&key) { + if s.send(item.clone()).is_err() { + // no more subscribers, remove channel from here + // (we will re-create it later if we need to subscribe again) + subs.remove(&key); + } + } + } +} diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs new file mode 100644 index 00000000..90101d0f --- /dev/null +++ b/src/model/k2v/rpc.rs @@ -0,0 +1,343 @@ +//! Module that implements RPCs specific to K2V. +//! This is necessary for insertions into the K2V store, +//! as they have to be transmitted to one of the nodes responsible +//! for storing the entry to be processed (the API entry +//! node does not process the entry directly, as this would +//! mean the vector clock gets much larger than needed). + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use serde::{Deserialize, Serialize}; +use tokio::select; + +use garage_util::crdt::*; +use garage_util::data::*; +use garage_util::error::*; + +use garage_rpc::system::System; +use garage_rpc::*; + +use garage_table::replication::{TableReplication, TableShardedReplication}; +use garage_table::table::TABLE_RPC_TIMEOUT; +use garage_table::{PartitionKey, Table}; + +use crate::k2v::causality::*; +use crate::k2v::item_table::*; +use crate::k2v::poll::*; + +/// RPC messages for K2V +#[derive(Debug, Serialize, Deserialize)] +enum K2VRpc { + Ok, + InsertItem(InsertedItem), + InsertManyItems(Vec), + PollItem { + key: PollKey, + causal_context: CausalContext, + timeout_msec: u64, + }, + PollItemResponse(Option), +} + +#[derive(Debug, Serialize, Deserialize)] +struct InsertedItem { + partition: K2VItemPartition, + sort_key: String, + causal_context: Option, + value: DvvsValue, +} + +impl Rpc for K2VRpc { + type Response = Result; +} + +/// The block manager, handling block exchange between nodes, and block storage on local node +pub struct K2VRpcHandler { + system: Arc, + item_table: Arc>, + endpoint: Arc>, + subscriptions: Arc, +} + +impl K2VRpcHandler { + pub fn new( + system: Arc, + item_table: Arc>, + subscriptions: Arc, + ) -> Arc { + let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string()); + + let rpc_handler = Arc::new(Self { + system, + item_table, + endpoint, + subscriptions, + }); + rpc_handler.endpoint.set_handler(rpc_handler.clone()); + + rpc_handler + } + + // ---- public interface ---- + + pub async fn insert( + &self, + bucket_id: Uuid, + partition_key: String, + sort_key: String, + causal_context: Option, + value: DvvsValue, + ) -> Result<(), Error> { + let partition = K2VItemPartition { + bucket_id, + partition_key, + }; + let mut who = self + .item_table + .data + .replication + .write_nodes(&partition.hash()); + who.sort(); + + self.system + .rpc + .try_call_many( + &self.endpoint, + &who[..], + K2VRpc::InsertItem(InsertedItem { + partition, + sort_key, + causal_context, + value, + }), + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(1) + .with_timeout(TABLE_RPC_TIMEOUT) + .interrupt_after_quorum(true), + ) + .await?; + + Ok(()) + } + + pub async fn insert_batch( + &self, + bucket_id: Uuid, + items: Vec<(String, String, Option, DvvsValue)>, + ) -> Result<(), Error> { + let n_items = items.len(); + + let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); + + for (partition_key, sort_key, causal_context, value) in items { + let partition = K2VItemPartition { + bucket_id, + partition_key, + }; + let mut who = self + .item_table + .data + .replication + .write_nodes(&partition.hash()); + who.sort(); + + call_list.entry(who).or_default().push(InsertedItem { + partition, + sort_key, + causal_context, + value, + }); + } + + debug!( + "K2V insert_batch: {} requests to insert {} items", + call_list.len(), + n_items + ); + let call_futures = call_list.into_iter().map(|(nodes, items)| async move { + let resp = self + .system + .rpc + .try_call_many( + &self.endpoint, + &nodes[..], + K2VRpc::InsertManyItems(items), + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(1) + .with_timeout(TABLE_RPC_TIMEOUT) + .interrupt_after_quorum(true), + ) + .await?; + Ok::<_, Error>((nodes, resp)) + }); + + let mut resps = call_futures.collect::>(); + while let Some(resp) = resps.next().await { + resp?; + } + + Ok(()) + } + + pub async fn poll( + &self, + bucket_id: Uuid, + partition_key: String, + sort_key: String, + causal_context: CausalContext, + timeout_msec: u64, + ) -> Result, Error> { + let poll_key = PollKey { + partition: K2VItemPartition { + bucket_id, + partition_key, + }, + sort_key, + }; + let nodes = self + .item_table + .data + .replication + .write_nodes(&poll_key.partition.hash()); + + let resps = self + .system + .rpc + .try_call_many( + &self.endpoint, + &nodes[..], + K2VRpc::PollItem { + key: poll_key, + causal_context, + timeout_msec, + }, + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(self.item_table.data.replication.read_quorum()) + .with_timeout(Duration::from_millis(timeout_msec) + TABLE_RPC_TIMEOUT), + ) + .await?; + + let mut resp: Option = None; + for v in resps { + match v { + K2VRpc::PollItemResponse(Some(x)) => { + if let Some(y) = &mut resp { + y.merge(&x); + } else { + resp = Some(x); + } + } + K2VRpc::PollItemResponse(None) => { + return Ok(None); + } + v => return Err(Error::unexpected_rpc_message(v)), + } + } + + Ok(resp) + } + + // ---- internal handlers ---- + + async fn handle_insert(&self, item: &InsertedItem) -> Result { + let new = self.local_insert(item)?; + + // Propagate to rest of network + if let Some(updated) = new { + self.item_table.insert(&updated).await?; + } + + Ok(K2VRpc::Ok) + } + + async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result { + let mut updated_vec = vec![]; + + for item in items { + let new = self.local_insert(item)?; + + if let Some(updated) = new { + updated_vec.push(updated); + } + } + + // Propagate to rest of network + if !updated_vec.is_empty() { + self.item_table.insert_many(&updated_vec).await?; + } + + Ok(K2VRpc::Ok) + } + + fn local_insert(&self, item: &InsertedItem) -> Result, Error> { + let tree_key = self + .item_table + .data + .tree_key(&item.partition, &item.sort_key); + + self.item_table + .data + .update_entry_with(&tree_key[..], |ent| { + let mut ent = ent.unwrap_or_else(|| { + K2VItem::new( + item.partition.bucket_id, + item.partition.partition_key.clone(), + item.sort_key.clone(), + ) + }); + ent.update(self.system.id, &item.causal_context, item.value.clone()); + ent + }) + } + + async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result { + let mut chan = self.subscriptions.subscribe(key); + + let mut value = self + .item_table + .data + .read_entry(&key.partition, &key.sort_key)? + .map(|bytes| self.item_table.data.decode_entry(&bytes[..])) + .transpose()? + .unwrap_or_else(|| { + K2VItem::new( + key.partition.bucket_id, + key.partition.partition_key.clone(), + key.sort_key.clone(), + ) + }); + + while !value.causal_context().is_newer_than(ct) { + value = chan.recv().await?; + } + + Ok(value) + } +} + +#[async_trait] +impl EndpointHandler for K2VRpcHandler { + async fn handle(self: &Arc, message: &K2VRpc, _from: NodeID) -> Result { + match message { + K2VRpc::InsertItem(item) => self.handle_insert(item).await, + K2VRpc::InsertManyItems(items) => self.handle_insert_many(&items[..]).await, + K2VRpc::PollItem { + key, + causal_context, + timeout_msec, + } => { + let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec)); + select! { + ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), + _ = delay => Ok(K2VRpc::PollItemResponse(None)), + } + } + m => Err(Error::unexpected_rpc_message(m)), + } + } +} diff --git a/src/model/lib.rs b/src/model/lib.rs index 05a4cdc7..7c9d9270 100644 --- a/src/model/lib.rs +++ b/src/model/lib.rs @@ -3,12 +3,15 @@ extern crate tracing; pub mod permission; -pub mod block_ref_table; +pub mod index_counter; + pub mod bucket_alias_table; pub mod bucket_table; pub mod key_table; -pub mod object_table; -pub mod version_table; + +#[cfg(feature = "k2v")] +pub mod k2v; +pub mod s3; pub mod garage; pub mod helper; diff --git a/src/model/object_table.rs b/src/model/object_table.rs deleted file mode 100644 index da53878e..00000000 --- a/src/model/object_table.rs +++ /dev/null @@ -1,334 +0,0 @@ -use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; -use std::sync::Arc; - -use garage_util::background::BackgroundRunner; -use garage_util::data::*; - -use garage_table::crdt::*; -use garage_table::replication::TableShardedReplication; -use garage_table::*; - -use crate::version_table::*; - -use garage_model_050::object_table as old; - -/// An object -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct Object { - /// The bucket in which the object is stored, used as partition key - pub bucket_id: Uuid, - - /// The key at which the object is stored in its bucket, used as sorting key - pub key: String, - - /// The list of currenty stored versions of the object - versions: Vec, -} - -impl Object { - /// Initialize an Object struct from parts - pub fn new(bucket_id: Uuid, key: String, versions: Vec) -> Self { - let mut ret = Self { - bucket_id, - key, - versions: vec![], - }; - for v in versions { - ret.add_version(v) - .expect("Twice the same ObjectVersion in Object constructor"); - } - ret - } - - /// Adds a version if it wasn't already present - #[allow(clippy::result_unit_err)] - pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> { - match self - .versions - .binary_search_by(|v| v.cmp_key().cmp(&new.cmp_key())) - { - Err(i) => { - self.versions.insert(i, new); - Ok(()) - } - Ok(_) => Err(()), - } - } - - /// Get a list of currently stored versions of `Object` - pub fn versions(&self) -> &[ObjectVersion] { - &self.versions[..] - } -} - -/// Informations about a version of an object -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersion { - /// Id of the version - pub uuid: Uuid, - /// Timestamp of when the object was created - pub timestamp: u64, - /// State of the version - pub state: ObjectVersionState, -} - -/// State of an object version -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub enum ObjectVersionState { - /// The version is being received - Uploading(ObjectVersionHeaders), - /// The version is fully received - Complete(ObjectVersionData), - /// The version uploaded containded errors or the upload was explicitly aborted - Aborted, -} - -impl Crdt for ObjectVersionState { - fn merge(&mut self, other: &Self) { - use ObjectVersionState::*; - match other { - Aborted => { - *self = Aborted; - } - Complete(b) => match self { - Aborted => {} - Complete(a) => { - a.merge(b); - } - Uploading(_) => { - *self = Complete(b.clone()); - } - }, - Uploading(_) => {} - } - } -} - -/// Data stored in object version -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub enum ObjectVersionData { - /// The object was deleted, this Version is a tombstone to mark it as such - DeleteMarker, - /// The object is short, it's stored inlined - Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec), - /// The object is not short, Hash of first block is stored here, next segments hashes are - /// stored in the version table - FirstBlock(ObjectVersionMeta, Hash), -} - -impl AutoCrdt for ObjectVersionData { - const WARN_IF_DIFFERENT: bool = true; -} - -/// Metadata about the object version -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersionMeta { - /// Headers to send to the client - pub headers: ObjectVersionHeaders, - /// Size of the object - pub size: u64, - /// etag of the object - pub etag: String, -} - -/// Additional headers for an object -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersionHeaders { - /// Content type of the object - pub content_type: String, - /// Any other http headers to send - pub other: BTreeMap, -} - -impl ObjectVersion { - fn cmp_key(&self) -> (u64, Uuid) { - (self.timestamp, self.uuid) - } - - /// Is the object version currently being uploaded - pub fn is_uploading(&self) -> bool { - matches!(self.state, ObjectVersionState::Uploading(_)) - } - - /// Is the object version completely received - pub fn is_complete(&self) -> bool { - matches!(self.state, ObjectVersionState::Complete(_)) - } - - /// Is the object version available (received and not a tombstone) - pub fn is_data(&self) -> bool { - match self.state { - ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false, - ObjectVersionState::Complete(_) => true, - _ => false, - } - } -} - -impl Entry for Object { - fn partition_key(&self) -> &Uuid { - &self.bucket_id - } - fn sort_key(&self) -> &String { - &self.key - } - fn is_tombstone(&self) -> bool { - self.versions.len() == 1 - && self.versions[0].state - == ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) - } -} - -impl Crdt for Object { - fn merge(&mut self, other: &Self) { - // Merge versions from other into here - for other_v in other.versions.iter() { - match self - .versions - .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) - { - Ok(i) => { - self.versions[i].state.merge(&other_v.state); - } - Err(i) => { - self.versions.insert(i, other_v.clone()); - } - } - } - - // Remove versions which are obsolete, i.e. those that come - // before the last version which .is_complete(). - let last_complete = self - .versions - .iter() - .enumerate() - .rev() - .find(|(_, v)| v.is_complete()) - .map(|(vi, _)| vi); - - if let Some(last_vi) = last_complete { - self.versions = self.versions.drain(last_vi..).collect::>(); - } - } -} - -pub struct ObjectTable { - pub background: Arc, - pub version_table: Arc>, -} - -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] -pub enum ObjectFilter { - IsData, - IsUploading, -} - -impl TableSchema for ObjectTable { - const TABLE_NAME: &'static str = "object"; - - type P = Uuid; - type S = String; - type E = Object; - type Filter = ObjectFilter; - - fn updated(&self, old: Option, new: Option) { - let version_table = self.version_table.clone(); - self.background.spawn(async move { - if let (Some(old_v), Some(new_v)) = (old, new) { - // Propagate deletion of old versions - for v in old_v.versions.iter() { - let newly_deleted = match new_v - .versions - .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) - { - Err(_) => true, - Ok(i) => { - new_v.versions[i].state == ObjectVersionState::Aborted - && v.state != ObjectVersionState::Aborted - } - }; - if newly_deleted { - let deleted_version = - Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); - version_table.insert(&deleted_version).await?; - } - } - } - Ok(()) - }) - } - - fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - match filter { - ObjectFilter::IsData => entry.versions.iter().any(|v| v.is_data()), - ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()), - } - } - - fn try_migrate(bytes: &[u8]) -> Option { - let old_obj = rmp_serde::decode::from_read_ref::<_, old::Object>(bytes).ok()?; - Some(migrate_object(old_obj)) - } -} - -// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv -// (we just want to change bucket into bucket_id by hashing it) - -fn migrate_object(o: old::Object) -> Object { - let versions = o - .versions() - .iter() - .cloned() - .map(migrate_object_version) - .collect(); - Object { - bucket_id: blake2sum(o.bucket.as_bytes()), - key: o.key, - versions, - } -} - -fn migrate_object_version(v: old::ObjectVersion) -> ObjectVersion { - ObjectVersion { - uuid: Uuid::try_from(v.uuid.as_slice()).unwrap(), - timestamp: v.timestamp, - state: match v.state { - old::ObjectVersionState::Uploading(h) => { - ObjectVersionState::Uploading(migrate_object_version_headers(h)) - } - old::ObjectVersionState::Complete(d) => { - ObjectVersionState::Complete(migrate_object_version_data(d)) - } - old::ObjectVersionState::Aborted => ObjectVersionState::Aborted, - }, - } -} - -fn migrate_object_version_headers(h: old::ObjectVersionHeaders) -> ObjectVersionHeaders { - ObjectVersionHeaders { - content_type: h.content_type, - other: h.other, - } -} - -fn migrate_object_version_data(d: old::ObjectVersionData) -> ObjectVersionData { - match d { - old::ObjectVersionData::DeleteMarker => ObjectVersionData::DeleteMarker, - old::ObjectVersionData::Inline(m, b) => { - ObjectVersionData::Inline(migrate_object_version_meta(m), b) - } - old::ObjectVersionData::FirstBlock(m, h) => ObjectVersionData::FirstBlock( - migrate_object_version_meta(m), - Hash::try_from(h.as_slice()).unwrap(), - ), - } -} - -fn migrate_object_version_meta(m: old::ObjectVersionMeta) -> ObjectVersionMeta { - ObjectVersionMeta { - headers: migrate_object_version_headers(m.headers), - size: m.size, - etag: m.etag, - } -} diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs new file mode 100644 index 00000000..9b3991bf --- /dev/null +++ b/src/model/s3/block_ref_table.rs @@ -0,0 +1,74 @@ +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +use garage_util::data::*; + +use garage_table::crdt::Crdt; +use garage_table::*; + +use garage_block::manager::*; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct BlockRef { + /// Hash (blake2 sum) of the block, used as partition key + pub block: Hash, + + /// Id of the Version for the object containing this block, used as sorting key + pub version: Uuid, + + // Keep track of deleted status + /// Is the Version that contains this block deleted + pub deleted: crdt::Bool, +} + +impl Entry for BlockRef { + fn partition_key(&self) -> &Hash { + &self.block + } + fn sort_key(&self) -> &Uuid { + &self.version + } + fn is_tombstone(&self) -> bool { + self.deleted.get() + } +} + +impl Crdt for BlockRef { + fn merge(&mut self, other: &Self) { + self.deleted.merge(&other.deleted); + } +} + +pub struct BlockRefTable { + pub block_manager: Arc, +} + +impl TableSchema for BlockRefTable { + const TABLE_NAME: &'static str = "block_ref"; + + type P = Hash; + type S = Uuid; + type E = BlockRef; + type Filter = DeletedFilter; + + fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + #[allow(clippy::or_fun_call)] + let block = &old.or(new).unwrap().block; + let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false); + let is_after = new.map(|x| !x.deleted.get()).unwrap_or(false); + if is_after && !was_before { + if let Err(e) = self.block_manager.block_incref(block) { + warn!("block_incref failed for block {:?}: {}", block, e); + } + } + if was_before && !is_after { + if let Err(e) = self.block_manager.block_decref(block) { + warn!("block_decref failed for block {:?}: {}", block, e); + } + } + } + + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + filter.apply(entry.deleted.get()) + } +} diff --git a/src/model/s3/mod.rs b/src/model/s3/mod.rs new file mode 100644 index 00000000..4e94337d --- /dev/null +++ b/src/model/s3/mod.rs @@ -0,0 +1,3 @@ +pub mod block_ref_table; +pub mod object_table; +pub mod version_table; diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs new file mode 100644 index 00000000..3d9a89f7 --- /dev/null +++ b/src/model/s3/object_table.rs @@ -0,0 +1,337 @@ +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::sync::Arc; + +use garage_util::background::BackgroundRunner; +use garage_util::data::*; + +use garage_table::crdt::*; +use garage_table::replication::TableShardedReplication; +use garage_table::*; + +use crate::s3::version_table::*; + +use garage_model_050::object_table as old; + +/// An object +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Object { + /// The bucket in which the object is stored, used as partition key + pub bucket_id: Uuid, + + /// The key at which the object is stored in its bucket, used as sorting key + pub key: String, + + /// The list of currenty stored versions of the object + versions: Vec, +} + +impl Object { + /// Initialize an Object struct from parts + pub fn new(bucket_id: Uuid, key: String, versions: Vec) -> Self { + let mut ret = Self { + bucket_id, + key, + versions: vec![], + }; + for v in versions { + ret.add_version(v) + .expect("Twice the same ObjectVersion in Object constructor"); + } + ret + } + + /// Adds a version if it wasn't already present + #[allow(clippy::result_unit_err)] + pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> { + match self + .versions + .binary_search_by(|v| v.cmp_key().cmp(&new.cmp_key())) + { + Err(i) => { + self.versions.insert(i, new); + Ok(()) + } + Ok(_) => Err(()), + } + } + + /// Get a list of currently stored versions of `Object` + pub fn versions(&self) -> &[ObjectVersion] { + &self.versions[..] + } +} + +/// Informations about a version of an object +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersion { + /// Id of the version + pub uuid: Uuid, + /// Timestamp of when the object was created + pub timestamp: u64, + /// State of the version + pub state: ObjectVersionState, +} + +/// State of an object version +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum ObjectVersionState { + /// The version is being received + Uploading(ObjectVersionHeaders), + /// The version is fully received + Complete(ObjectVersionData), + /// The version uploaded containded errors or the upload was explicitly aborted + Aborted, +} + +impl Crdt for ObjectVersionState { + fn merge(&mut self, other: &Self) { + use ObjectVersionState::*; + match other { + Aborted => { + *self = Aborted; + } + Complete(b) => match self { + Aborted => {} + Complete(a) => { + a.merge(b); + } + Uploading(_) => { + *self = Complete(b.clone()); + } + }, + Uploading(_) => {} + } + } +} + +/// Data stored in object version +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub enum ObjectVersionData { + /// The object was deleted, this Version is a tombstone to mark it as such + DeleteMarker, + /// The object is short, it's stored inlined + Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec), + /// The object is not short, Hash of first block is stored here, next segments hashes are + /// stored in the version table + FirstBlock(ObjectVersionMeta, Hash), +} + +impl AutoCrdt for ObjectVersionData { + const WARN_IF_DIFFERENT: bool = true; +} + +/// Metadata about the object version +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersionMeta { + /// Headers to send to the client + pub headers: ObjectVersionHeaders, + /// Size of the object + pub size: u64, + /// etag of the object + pub etag: String, +} + +/// Additional headers for an object +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersionHeaders { + /// Content type of the object + pub content_type: String, + /// Any other http headers to send + pub other: BTreeMap, +} + +impl ObjectVersion { + fn cmp_key(&self) -> (u64, Uuid) { + (self.timestamp, self.uuid) + } + + /// Is the object version currently being uploaded + pub fn is_uploading(&self) -> bool { + matches!(self.state, ObjectVersionState::Uploading(_)) + } + + /// Is the object version completely received + pub fn is_complete(&self) -> bool { + matches!(self.state, ObjectVersionState::Complete(_)) + } + + /// Is the object version available (received and not a tombstone) + pub fn is_data(&self) -> bool { + match self.state { + ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false, + ObjectVersionState::Complete(_) => true, + _ => false, + } + } +} + +impl Entry for Object { + fn partition_key(&self) -> &Uuid { + &self.bucket_id + } + fn sort_key(&self) -> &String { + &self.key + } + fn is_tombstone(&self) -> bool { + self.versions.len() == 1 + && self.versions[0].state + == ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) + } +} + +impl Crdt for Object { + fn merge(&mut self, other: &Self) { + // Merge versions from other into here + for other_v in other.versions.iter() { + match self + .versions + .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) + { + Ok(i) => { + self.versions[i].state.merge(&other_v.state); + } + Err(i) => { + self.versions.insert(i, other_v.clone()); + } + } + } + + // Remove versions which are obsolete, i.e. those that come + // before the last version which .is_complete(). + let last_complete = self + .versions + .iter() + .enumerate() + .rev() + .find(|(_, v)| v.is_complete()) + .map(|(vi, _)| vi); + + if let Some(last_vi) = last_complete { + self.versions = self.versions.drain(last_vi..).collect::>(); + } + } +} + +pub struct ObjectTable { + pub background: Arc, + pub version_table: Arc>, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub enum ObjectFilter { + IsData, + IsUploading, +} + +impl TableSchema for ObjectTable { + const TABLE_NAME: &'static str = "object"; + + type P = Uuid; + type S = String; + type E = Object; + type Filter = ObjectFilter; + + fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + let version_table = self.version_table.clone(); + let old = old.cloned(); + let new = new.cloned(); + + self.background.spawn(async move { + if let (Some(old_v), Some(new_v)) = (old, new) { + // Propagate deletion of old versions + for v in old_v.versions.iter() { + let newly_deleted = match new_v + .versions + .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) + { + Err(_) => true, + Ok(i) => { + new_v.versions[i].state == ObjectVersionState::Aborted + && v.state != ObjectVersionState::Aborted + } + }; + if newly_deleted { + let deleted_version = + Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); + version_table.insert(&deleted_version).await?; + } + } + } + Ok(()) + }) + } + + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + match filter { + ObjectFilter::IsData => entry.versions.iter().any(|v| v.is_data()), + ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()), + } + } + + fn try_migrate(bytes: &[u8]) -> Option { + let old_obj = rmp_serde::decode::from_read_ref::<_, old::Object>(bytes).ok()?; + Some(migrate_object(old_obj)) + } +} + +// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv +// (we just want to change bucket into bucket_id by hashing it) + +fn migrate_object(o: old::Object) -> Object { + let versions = o + .versions() + .iter() + .cloned() + .map(migrate_object_version) + .collect(); + Object { + bucket_id: blake2sum(o.bucket.as_bytes()), + key: o.key, + versions, + } +} + +fn migrate_object_version(v: old::ObjectVersion) -> ObjectVersion { + ObjectVersion { + uuid: Uuid::try_from(v.uuid.as_slice()).unwrap(), + timestamp: v.timestamp, + state: match v.state { + old::ObjectVersionState::Uploading(h) => { + ObjectVersionState::Uploading(migrate_object_version_headers(h)) + } + old::ObjectVersionState::Complete(d) => { + ObjectVersionState::Complete(migrate_object_version_data(d)) + } + old::ObjectVersionState::Aborted => ObjectVersionState::Aborted, + }, + } +} + +fn migrate_object_version_headers(h: old::ObjectVersionHeaders) -> ObjectVersionHeaders { + ObjectVersionHeaders { + content_type: h.content_type, + other: h.other, + } +} + +fn migrate_object_version_data(d: old::ObjectVersionData) -> ObjectVersionData { + match d { + old::ObjectVersionData::DeleteMarker => ObjectVersionData::DeleteMarker, + old::ObjectVersionData::Inline(m, b) => { + ObjectVersionData::Inline(migrate_object_version_meta(m), b) + } + old::ObjectVersionData::FirstBlock(m, h) => ObjectVersionData::FirstBlock( + migrate_object_version_meta(m), + Hash::try_from(h.as_slice()).unwrap(), + ), + } +} + +fn migrate_object_version_meta(m: old::ObjectVersionMeta) -> ObjectVersionMeta { + ObjectVersionMeta { + headers: migrate_object_version_headers(m.headers), + size: m.size, + etag: m.etag, + } +} diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs new file mode 100644 index 00000000..ad096772 --- /dev/null +++ b/src/model/s3/version_table.rs @@ -0,0 +1,207 @@ +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +use garage_util::background::BackgroundRunner; +use garage_util::data::*; + +use garage_table::crdt::*; +use garage_table::replication::TableShardedReplication; +use garage_table::*; + +use crate::s3::block_ref_table::*; + +use garage_model_050::version_table as old; + +/// A version of an object +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Version { + /// UUID of the version, used as partition key + pub uuid: Uuid, + + // Actual data: the blocks for this version + // In the case of a multipart upload, also store the etags + // of individual parts and check them when doing CompleteMultipartUpload + /// Is this version deleted + pub deleted: crdt::Bool, + /// list of blocks of data composing the version + pub blocks: crdt::Map, + /// Etag of each part in case of a multipart upload, empty otherwise + pub parts_etags: crdt::Map, + + // Back link to bucket+key so that we can figure if + // this was deleted later on + /// Bucket in which the related object is stored + pub bucket_id: Uuid, + /// Key in which the related object is stored + pub key: String, +} + +impl Version { + pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self { + Self { + uuid, + deleted: deleted.into(), + blocks: crdt::Map::new(), + parts_etags: crdt::Map::new(), + bucket_id, + key, + } + } + + pub fn has_part_number(&self, part_number: u64) -> bool { + let case1 = self + .parts_etags + .items() + .binary_search_by(|(k, _)| k.cmp(&part_number)) + .is_ok(); + let case2 = self + .blocks + .items() + .binary_search_by(|(k, _)| k.part_number.cmp(&part_number)) + .is_ok(); + case1 || case2 + } +} + +#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] +pub struct VersionBlockKey { + /// Number of the part + pub part_number: u64, + /// Offset of this sub-segment in its part + pub offset: u64, +} + +impl Ord for VersionBlockKey { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.part_number + .cmp(&other.part_number) + .then(self.offset.cmp(&other.offset)) + } +} + +impl PartialOrd for VersionBlockKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// Informations about a single block +#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] +pub struct VersionBlock { + /// Blake2 sum of the block + pub hash: Hash, + /// Size of the block + pub size: u64, +} + +impl AutoCrdt for VersionBlock { + const WARN_IF_DIFFERENT: bool = true; +} + +impl Entry for Version { + fn partition_key(&self) -> &Uuid { + &self.uuid + } + fn sort_key(&self) -> &EmptyKey { + &EmptyKey + } + fn is_tombstone(&self) -> bool { + self.deleted.get() + } +} + +impl Crdt for Version { + fn merge(&mut self, other: &Self) { + self.deleted.merge(&other.deleted); + + if self.deleted.get() { + self.blocks.clear(); + self.parts_etags.clear(); + } else { + self.blocks.merge(&other.blocks); + self.parts_etags.merge(&other.parts_etags); + } + } +} + +pub struct VersionTable { + pub background: Arc, + pub block_ref_table: Arc>, +} + +impl TableSchema for VersionTable { + const TABLE_NAME: &'static str = "version"; + + type P = Uuid; + type S = EmptyKey; + type E = Version; + type Filter = DeletedFilter; + + fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + let block_ref_table = self.block_ref_table.clone(); + let old = old.cloned(); + let new = new.cloned(); + + self.background.spawn(async move { + if let (Some(old_v), Some(new_v)) = (old, new) { + // Propagate deletion of version blocks + if new_v.deleted.get() && !old_v.deleted.get() { + let deleted_block_refs = old_v + .blocks + .items() + .iter() + .map(|(_k, vb)| BlockRef { + block: vb.hash, + version: old_v.uuid, + deleted: true.into(), + }) + .collect::>(); + block_ref_table.insert_many(&deleted_block_refs[..]).await?; + } + } + Ok(()) + }) + } + + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + filter.apply(entry.deleted.get()) + } + + fn try_migrate(bytes: &[u8]) -> Option { + let old = rmp_serde::decode::from_read_ref::<_, old::Version>(bytes).ok()?; + + let blocks = old + .blocks + .items() + .iter() + .map(|(k, v)| { + ( + VersionBlockKey { + part_number: k.part_number, + offset: k.offset, + }, + VersionBlock { + hash: Hash::try_from(v.hash.as_slice()).unwrap(), + size: v.size, + }, + ) + }) + .collect::>(); + + let parts_etags = old + .parts_etags + .items() + .iter() + .map(|(k, v)| (*k, v.clone())) + .collect::>(); + + Some(Version { + uuid: Hash::try_from(old.uuid.as_slice()).unwrap(), + deleted: crdt::Bool::new(old.deleted.get()), + blocks, + parts_etags, + bucket_id: blake2sum(old.bucket.as_bytes()), + key: old.key, + }) + } +} diff --git a/src/model/version_table.rs b/src/model/version_table.rs deleted file mode 100644 index 839b1f4f..00000000 --- a/src/model/version_table.rs +++ /dev/null @@ -1,204 +0,0 @@ -use serde::{Deserialize, Serialize}; -use std::sync::Arc; - -use garage_util::background::BackgroundRunner; -use garage_util::data::*; - -use garage_table::crdt::*; -use garage_table::replication::TableShardedReplication; -use garage_table::*; - -use crate::block_ref_table::*; - -use garage_model_050::version_table as old; - -/// A version of an object -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct Version { - /// UUID of the version, used as partition key - pub uuid: Uuid, - - // Actual data: the blocks for this version - // In the case of a multipart upload, also store the etags - // of individual parts and check them when doing CompleteMultipartUpload - /// Is this version deleted - pub deleted: crdt::Bool, - /// list of blocks of data composing the version - pub blocks: crdt::Map, - /// Etag of each part in case of a multipart upload, empty otherwise - pub parts_etags: crdt::Map, - - // Back link to bucket+key so that we can figure if - // this was deleted later on - /// Bucket in which the related object is stored - pub bucket_id: Uuid, - /// Key in which the related object is stored - pub key: String, -} - -impl Version { - pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self { - Self { - uuid, - deleted: deleted.into(), - blocks: crdt::Map::new(), - parts_etags: crdt::Map::new(), - bucket_id, - key, - } - } - - pub fn has_part_number(&self, part_number: u64) -> bool { - let case1 = self - .parts_etags - .items() - .binary_search_by(|(k, _)| k.cmp(&part_number)) - .is_ok(); - let case2 = self - .blocks - .items() - .binary_search_by(|(k, _)| k.part_number.cmp(&part_number)) - .is_ok(); - case1 || case2 - } -} - -#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] -pub struct VersionBlockKey { - /// Number of the part - pub part_number: u64, - /// Offset of this sub-segment in its part - pub offset: u64, -} - -impl Ord for VersionBlockKey { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.part_number - .cmp(&other.part_number) - .then(self.offset.cmp(&other.offset)) - } -} - -impl PartialOrd for VersionBlockKey { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -/// Informations about a single block -#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] -pub struct VersionBlock { - /// Blake2 sum of the block - pub hash: Hash, - /// Size of the block - pub size: u64, -} - -impl AutoCrdt for VersionBlock { - const WARN_IF_DIFFERENT: bool = true; -} - -impl Entry for Version { - fn partition_key(&self) -> &Uuid { - &self.uuid - } - fn sort_key(&self) -> &EmptyKey { - &EmptyKey - } - fn is_tombstone(&self) -> bool { - self.deleted.get() - } -} - -impl Crdt for Version { - fn merge(&mut self, other: &Self) { - self.deleted.merge(&other.deleted); - - if self.deleted.get() { - self.blocks.clear(); - self.parts_etags.clear(); - } else { - self.blocks.merge(&other.blocks); - self.parts_etags.merge(&other.parts_etags); - } - } -} - -pub struct VersionTable { - pub background: Arc, - pub block_ref_table: Arc>, -} - -impl TableSchema for VersionTable { - const TABLE_NAME: &'static str = "version"; - - type P = Uuid; - type S = EmptyKey; - type E = Version; - type Filter = DeletedFilter; - - fn updated(&self, old: Option, new: Option) { - let block_ref_table = self.block_ref_table.clone(); - self.background.spawn(async move { - if let (Some(old_v), Some(new_v)) = (old, new) { - // Propagate deletion of version blocks - if new_v.deleted.get() && !old_v.deleted.get() { - let deleted_block_refs = old_v - .blocks - .items() - .iter() - .map(|(_k, vb)| BlockRef { - block: vb.hash, - version: old_v.uuid, - deleted: true.into(), - }) - .collect::>(); - block_ref_table.insert_many(&deleted_block_refs[..]).await?; - } - } - Ok(()) - }) - } - - fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted.get()) - } - - fn try_migrate(bytes: &[u8]) -> Option { - let old = rmp_serde::decode::from_read_ref::<_, old::Version>(bytes).ok()?; - - let blocks = old - .blocks - .items() - .iter() - .map(|(k, v)| { - ( - VersionBlockKey { - part_number: k.part_number, - offset: k.offset, - }, - VersionBlock { - hash: Hash::try_from(v.hash.as_slice()).unwrap(), - size: v.size, - }, - ) - }) - .collect::>(); - - let parts_etags = old - .parts_etags - .items() - .iter() - .map(|(k, v)| (*k, v.clone())) - .collect::>(); - - Some(Version { - uuid: Hash::try_from(old.uuid.as_slice()).unwrap(), - deleted: crdt::Bool::new(old.deleted.get()), - blocks, - parts_etags, - bucket_id: blake2sum(old.bucket.as_bytes()), - key: old.key, - }) - } -} -- cgit v1.2.3 From 382e74c798263d042b1c6ca3788c866a8c69c4f4 Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 24 May 2022 12:16:39 +0200 Subject: First version of admin API (#298) **Spec:** - [x] Start writing - [x] Specify all layout endpoints - [x] Specify all endpoints for operations on keys - [x] Specify all endpoints for operations on key/bucket permissions - [x] Specify all endpoints for operations on buckets - [x] Specify all endpoints for operations on bucket aliases View rendered spec at **Code:** - [x] Refactor code for admin api to use common api code that was created for K2V **General endpoints:** - [x] Metrics - [x] GetClusterStatus - [x] ConnectClusterNodes - [x] GetClusterLayout - [x] UpdateClusterLayout - [x] ApplyClusterLayout - [x] RevertClusterLayout **Key-related endpoints:** - [x] ListKeys - [x] CreateKey - [x] ImportKey - [x] GetKeyInfo - [x] UpdateKey - [x] DeleteKey **Bucket-related endpoints:** - [x] ListBuckets - [x] CreateBucket - [x] GetBucketInfo - [x] DeleteBucket - [x] PutBucketWebsite - [x] DeleteBucketWebsite **Operations on key/bucket permissions:** - [x] BucketAllowKey - [x] BucketDenyKey **Operations on bucket aliases:** - [x] GlobalAliasBucket - [x] GlobalUnaliasBucket - [x] LocalAliasBucket - [x] LocalUnaliasBucket **And also:** - [x] Separate error type for the admin API (this PR includes a quite big refactoring of error handling) - [x] Add management of website access - [ ] Check that nothing is missing wrt what can be done using the CLI - [ ] Improve formatting of the spec - [x] Make sure everyone is cool with the API design Fix #231 Fix #295 Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/298 Co-authored-by: Alex Co-committed-by: Alex --- src/model/garage.rs | 4 ++ src/model/helper/bucket.rs | 152 +++++++++++++++++++++++---------------------- src/model/helper/error.rs | 10 +++ src/model/helper/key.rs | 102 ++++++++++++++++++++++++++++++ src/model/helper/mod.rs | 1 + 5 files changed, 196 insertions(+), 73 deletions(-) create mode 100644 src/model/helper/key.rs (limited to 'src/model') diff --git a/src/model/garage.rs b/src/model/garage.rs index 03e21f8a..2f99bd68 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -191,6 +191,10 @@ impl Garage { pub fn bucket_helper(&self) -> helper::bucket::BucketHelper { helper::bucket::BucketHelper(self) } + + pub fn key_helper(&self) -> helper::key::KeyHelper { + helper::key::KeyHelper(self) + } } #[cfg(feature = "k2v")] diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 54d2f97b..130ba5be 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -1,15 +1,18 @@ -use garage_table::util::*; use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::{Error as GarageError, OkOrMessage}; use garage_util::time::*; +use garage_table::util::*; + use crate::bucket_alias_table::*; use crate::bucket_table::*; use crate::garage::Garage; use crate::helper::error::*; -use crate::key_table::{Key, KeyFilter}; +use crate::helper::key::KeyHelper; +use crate::key_table::*; use crate::permission::BucketKeyPerm; +use crate::s3::object_table::ObjectFilter; pub struct BucketHelper<'a>(pub(crate) &'a Garage); @@ -49,6 +52,23 @@ impl<'a> BucketHelper<'a> { } } + #[allow(clippy::ptr_arg)] + pub async fn resolve_bucket(&self, bucket_name: &String, api_key: &Key) -> Result { + let api_key_params = api_key + .state + .as_option() + .ok_or_message("Key should not be deleted at this point")?; + + if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) { + Ok(*bucket_id) + } else { + Ok(self + .resolve_global_bucket_name(bucket_name) + .await? + .ok_or_else(|| Error::NoSuchBucket(bucket_name.to_string()))?) + } + } + /// Returns a Bucket if it is present in bucket table, /// even if it is in deleted state. Querying a non-existing /// bucket ID returns an internal error. @@ -71,64 +91,7 @@ impl<'a> BucketHelper<'a> { .get(&EmptyKey, &bucket_id) .await? .filter(|b| !b.is_deleted()) - .ok_or_bad_request(format!( - "Bucket {:?} does not exist or has been deleted", - bucket_id - )) - } - - /// Returns a Key if it is present in key table, - /// even if it is in deleted state. Querying a non-existing - /// key ID returns an internal error. - pub async fn get_internal_key(&self, key_id: &String) -> Result { - Ok(self - .0 - .key_table - .get(&EmptyKey, key_id) - .await? - .ok_or_message(format!("Key {} does not exist", key_id))?) - } - - /// Returns a Key if it is present in key table, - /// only if it is in non-deleted state. - /// Querying a non-existing key ID or a deleted key - /// returns a bad request error. - pub async fn get_existing_key(&self, key_id: &String) -> Result { - self.0 - .key_table - .get(&EmptyKey, key_id) - .await? - .filter(|b| !b.state.is_deleted()) - .ok_or_bad_request(format!("Key {} does not exist or has been deleted", key_id)) - } - - /// Returns a Key if it is present in key table, - /// looking it up by key ID or by a match on its name, - /// only if it is in non-deleted state. - /// Querying a non-existing key ID or a deleted key - /// returns a bad request error. - pub async fn get_existing_matching_key(&self, pattern: &str) -> Result { - let candidates = self - .0 - .key_table - .get_range( - &EmptyKey, - None, - Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())), - 10, - EnumerationOrder::Forward, - ) - .await? - .into_iter() - .collect::>(); - if candidates.len() != 1 { - Err(Error::BadRequest(format!( - "{} matching keys", - candidates.len() - ))) - } else { - Ok(candidates.into_iter().next().unwrap()) - } + .ok_or_else(|| Error::NoSuchBucket(hex::encode(bucket_id))) } /// Sets a new alias for a bucket in global namespace. @@ -142,10 +105,7 @@ impl<'a> BucketHelper<'a> { alias_name: &String, ) -> Result<(), Error> { if !is_valid_bucket_name(alias_name) { - return Err(Error::BadRequest(format!( - "{}: {}", - alias_name, INVALID_BUCKET_NAME_MESSAGE - ))); + return Err(Error::InvalidBucketName(alias_name.to_string())); } let mut bucket = self.get_existing_bucket(bucket_id).await?; @@ -176,7 +136,7 @@ impl<'a> BucketHelper<'a> { let alias = match alias { None => BucketAlias::new(alias_name.clone(), alias_ts, Some(bucket_id)) - .ok_or_bad_request(format!("{}: {}", alias_name, INVALID_BUCKET_NAME_MESSAGE))?, + .ok_or_else(|| Error::InvalidBucketName(alias_name.clone()))?, Some(mut a) => { a.state = Lww::raw(alias_ts, Some(bucket_id)); a @@ -264,7 +224,7 @@ impl<'a> BucketHelper<'a> { .bucket_alias_table .get(&EmptyKey, alias_name) .await? - .ok_or_message(format!("Alias {} not found", alias_name))?; + .ok_or_else(|| Error::NoSuchBucket(alias_name.to_string()))?; // Checks ok, remove alias let alias_ts = match bucket.state.as_option() { @@ -303,15 +263,14 @@ impl<'a> BucketHelper<'a> { key_id: &String, alias_name: &String, ) -> Result<(), Error> { + let key_helper = KeyHelper(self.0); + if !is_valid_bucket_name(alias_name) { - return Err(Error::BadRequest(format!( - "{}: {}", - alias_name, INVALID_BUCKET_NAME_MESSAGE - ))); + return Err(Error::InvalidBucketName(alias_name.to_string())); } let mut bucket = self.get_existing_bucket(bucket_id).await?; - let mut key = self.get_existing_key(key_id).await?; + let mut key = key_helper.get_existing_key(key_id).await?; let mut key_param = key.state.as_option_mut().unwrap(); @@ -360,8 +319,10 @@ impl<'a> BucketHelper<'a> { key_id: &String, alias_name: &String, ) -> Result<(), Error> { + let key_helper = KeyHelper(self.0); + let mut bucket = self.get_existing_bucket(bucket_id).await?; - let mut key = self.get_existing_key(key_id).await?; + let mut key = key_helper.get_existing_key(key_id).await?; let mut bucket_p = bucket.state.as_option_mut().unwrap(); @@ -429,8 +390,10 @@ impl<'a> BucketHelper<'a> { key_id: &String, mut perm: BucketKeyPerm, ) -> Result<(), Error> { + let key_helper = KeyHelper(self.0); + let mut bucket = self.get_internal_bucket(bucket_id).await?; - let mut key = self.get_internal_key(key_id).await?; + let mut key = key_helper.get_internal_key(key_id).await?; if let Some(bstate) = bucket.state.as_option() { if let Some(kp) = bstate.authorized_keys.get(key_id) { @@ -466,4 +429,47 @@ impl<'a> BucketHelper<'a> { Ok(()) } + + pub async fn is_bucket_empty(&self, bucket_id: Uuid) -> Result { + let objects = self + .0 + .object_table + .get_range( + &bucket_id, + None, + Some(ObjectFilter::IsData), + 10, + EnumerationOrder::Forward, + ) + .await?; + if !objects.is_empty() { + return Ok(false); + } + + #[cfg(feature = "k2v")] + { + use garage_rpc::ring::Ring; + use std::sync::Arc; + + let ring: Arc = self.0.system.ring.borrow().clone(); + let k2vindexes = self + .0 + .k2v + .counter_table + .table + .get_range( + &bucket_id, + None, + Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())), + 10, + EnumerationOrder::Forward, + ) + .await?; + if !k2vindexes.is_empty() { + return Ok(false); + } + } + + Ok(true) + } } diff --git a/src/model/helper/error.rs b/src/model/helper/error.rs index 30b2ba32..3ca8f55c 100644 --- a/src/model/helper/error.rs +++ b/src/model/helper/error.rs @@ -10,6 +10,16 @@ pub enum Error { #[error(display = "Bad request: {}", _0)] BadRequest(String), + + /// Bucket name is not valid according to AWS S3 specs + #[error(display = "Invalid bucket name: {}", _0)] + InvalidBucketName(String), + + #[error(display = "Access key not found: {}", _0)] + NoSuchAccessKey(String), + + #[error(display = "Bucket not found: {}", _0)] + NoSuchBucket(String), } impl From for Error { diff --git a/src/model/helper/key.rs b/src/model/helper/key.rs new file mode 100644 index 00000000..c1a8e974 --- /dev/null +++ b/src/model/helper/key.rs @@ -0,0 +1,102 @@ +use garage_table::util::*; +use garage_util::crdt::*; +use garage_util::error::OkOrMessage; + +use crate::garage::Garage; +use crate::helper::bucket::BucketHelper; +use crate::helper::error::*; +use crate::key_table::{Key, KeyFilter}; +use crate::permission::BucketKeyPerm; + +pub struct KeyHelper<'a>(pub(crate) &'a Garage); + +#[allow(clippy::ptr_arg)] +impl<'a> KeyHelper<'a> { + /// Returns a Key if it is present in key table, + /// even if it is in deleted state. Querying a non-existing + /// key ID returns an internal error. + pub async fn get_internal_key(&self, key_id: &String) -> Result { + Ok(self + .0 + .key_table + .get(&EmptyKey, key_id) + .await? + .ok_or_message(format!("Key {} does not exist", key_id))?) + } + + /// Returns a Key if it is present in key table, + /// only if it is in non-deleted state. + /// Querying a non-existing key ID or a deleted key + /// returns a bad request error. + pub async fn get_existing_key(&self, key_id: &String) -> Result { + self.0 + .key_table + .get(&EmptyKey, key_id) + .await? + .filter(|b| !b.state.is_deleted()) + .ok_or_else(|| Error::NoSuchAccessKey(key_id.to_string())) + } + + /// Returns a Key if it is present in key table, + /// looking it up by key ID or by a match on its name, + /// only if it is in non-deleted state. + /// Querying a non-existing key ID or a deleted key + /// returns a bad request error. + pub async fn get_existing_matching_key(&self, pattern: &str) -> Result { + let candidates = self + .0 + .key_table + .get_range( + &EmptyKey, + None, + Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())), + 10, + EnumerationOrder::Forward, + ) + .await? + .into_iter() + .collect::>(); + if candidates.len() != 1 { + Err(Error::BadRequest(format!( + "{} matching keys", + candidates.len() + ))) + } else { + Ok(candidates.into_iter().next().unwrap()) + } + } + + /// Deletes an API access key + pub async fn delete_key(&self, key: &mut Key) -> Result<(), Error> { + let bucket_helper = BucketHelper(self.0); + + let state = key.state.as_option_mut().unwrap(); + + // --- done checking, now commit --- + // (the step at unset_local_bucket_alias will fail if a bucket + // does not have another alias, the deletion will be + // interrupted in the middle if that happens) + + // 1. Delete local aliases + for (alias, _, to) in state.local_aliases.items().iter() { + if let Some(bucket_id) = to { + bucket_helper + .unset_local_bucket_alias(*bucket_id, &key.key_id, alias) + .await?; + } + } + + // 2. Remove permissions on all authorized buckets + for (ab_id, _auth) in state.authorized_buckets.items().iter() { + bucket_helper + .set_bucket_key_permissions(*ab_id, &key.key_id, BucketKeyPerm::NO_PERMISSIONS) + .await?; + } + + // 3. Actually delete key + key.state = Deletable::delete(); + self.0.key_table.insert(key).await?; + + Ok(()) + } +} diff --git a/src/model/helper/mod.rs b/src/model/helper/mod.rs index 2f4e8898..dd947c86 100644 --- a/src/model/helper/mod.rs +++ b/src/model/helper/mod.rs @@ -1,2 +1,3 @@ pub mod bucket; pub mod error; +pub mod key; -- cgit v1.2.3 From b44d3fc796484a50cd6854f20c9b46e5fddedc9d Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 8 Jun 2022 10:01:44 +0200 Subject: Abstract database behind generic interface and implement alternative drivers (#322) - [x] Design interface - [x] Implement Sled backend - [x] Re-implement the SledCountedTree hack ~~on Sled backend~~ on all backends (i.e. over the abstraction) - [x] Convert Garage code to use generic interface - [x] Proof-read converted Garage code - [ ] Test everything well - [x] Implement sqlite backend - [x] Implement LMDB backend - [ ] (Implement Persy backend?) - [ ] (Implement other backends? (like RocksDB, ...)) - [x] Implement backend choice in config file and garage server module - [x] Add CLI for converting between DB formats - Exploit the new interface to put more things in transactions - [x] `.updated()` trigger on Garage tables Fix #284 **Bugs** - [x] When exporting sqlite, trees iterate empty?? - [x] LMDB doesn't work **Known issues for various back-ends** - Sled: - Eats all my RAM and also all my disk space - `.len()` has to traverse the whole table - Is actually quite slow on some operations - And is actually pretty bad code... - Sqlite: - Requires a lock to be taken on all operations. The lock is also taken when iterating on a table with `.iter()`, and the lock isn't released until the iterator is dropped. This means that we must be VERY carefull to not do anything else inside a `.iter()` loop or else we will have a deadlock! Most such cases have been eliminated from the Garage codebase, but there might still be some that remain. If your Garage-over-Sqlite seems to hang/freeze, this is the reason. - (adapter uses a bunch of unsafe code) - Heed (LMDB): - Not suited for 32-bit machines as it has to map the whole DB in memory. - (adpater uses a tiny bit of unsafe code) **My recommendation:** avoid 32-bit machines and use LMDB as much as possible. **Converting databases** is actually quite easy. For example from Sled to LMDB: ```bash cd src/db cargo run --features cli --bin convert -- -i path/to/garage/meta/db -a sled -o path/to/garage/meta/db.lmdb -b lmdb ``` Then, just add this to your `config.toml`: ```toml db_engine = "lmdb" ``` Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/322 Co-authored-by: Alex Co-committed-by: Alex --- src/model/Cargo.toml | 3 +- src/model/garage.rs | 8 ++++-- src/model/index_counter.rs | 62 ++++++++++++++++++++--------------------- src/model/k2v/item_table.rs | 24 +++++++++++++--- src/model/migrate.rs | 6 +++- src/model/s3/block_ref_table.rs | 21 ++++++++------ src/model/s3/object_table.rs | 12 ++++++-- src/model/s3/version_table.rs | 13 +++++++-- 8 files changed, 94 insertions(+), 55 deletions(-) (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 133fe44e..d908dc01 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -14,6 +14,7 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +garage_db = { version = "0.8.0", path = "../db" } garage_rpc = { version = "0.7.0", path = "../rpc" } garage_table = { version = "0.7.0", path = "../table" } garage_block = { version = "0.7.0", path = "../block" } @@ -30,8 +31,6 @@ tracing = "0.1.30" rand = "0.8" zstd = { version = "0.9", default-features = false } -sled = "0.34" - rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/model/garage.rs b/src/model/garage.rs index 2f99bd68..280f3dc7 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -2,6 +2,8 @@ use std::sync::Arc; use netapp::NetworkKey; +use garage_db as db; + use garage_util::background::*; use garage_util::config::*; @@ -33,7 +35,7 @@ pub struct Garage { pub config: Config, /// The local database - pub db: sled::Db, + pub db: db::Db, /// A background job runner pub background: Arc, /// The membership manager @@ -71,7 +73,7 @@ pub struct GarageK2V { impl Garage { /// Create and run garage - pub fn new(config: Config, db: sled::Db, background: Arc) -> Arc { + pub fn new(config: Config, db: db::Db, background: Arc) -> Arc { let network_key = NetworkKey::from_slice( &hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..], ) @@ -199,7 +201,7 @@ impl Garage { #[cfg(feature = "k2v")] impl GarageK2V { - fn new(system: Arc, db: &sled::Db, meta_rep_param: TableShardedReplication) -> Self { + fn new(system: Arc, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self { info!("Initialize K2V counter table..."); let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db); info!("Initialize K2V subscription manager..."); diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 123154d4..2602d5d9 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -6,6 +6,8 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch}; +use garage_db as db; + use garage_rpc::ring::Ring; use garage_rpc::system::System; use garage_util::data::*; @@ -114,10 +116,6 @@ impl TableSchema for CounterTable { type E = CounterEntry; type Filter = (DeletedFilter, Vec); - fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) { - // nothing for now - } - fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { if filter.0 == DeletedFilter::Any { return true; @@ -135,7 +133,7 @@ impl TableSchema for CounterTable { pub struct IndexCounter { this_node: Uuid, - local_counter: sled::Tree, + local_counter: db::Tree, propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>, pub table: Arc, TableShardedReplication>>, } @@ -144,7 +142,7 @@ impl IndexCounter { pub fn new( system: Arc, replication: TableShardedReplication, - db: &sled::Db, + db: &db::Db, ) -> Arc { let background = system.background.clone(); @@ -174,36 +172,36 @@ impl IndexCounter { this } - pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> { + pub fn count( + &self, + tx: &mut db::Transaction, + pk: &T::P, + sk: &T::S, + counts: &[(&str, i64)], + ) -> db::TxResult<(), Error> { let tree_key = self.table.data.tree_key(pk, sk); - let new_entry = self.local_counter.transaction(|tx| { - let mut entry = match tx.get(&tree_key[..])? { - Some(old_bytes) => { - rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) - .map_err(Error::RmpDecode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)? - } - None => LocalCounterEntry { - values: BTreeMap::new(), - }, - }; - - for (s, inc) in counts.iter() { - let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); - ent.0 += 1; - ent.1 += *inc; - } - - let new_entry_bytes = rmp_to_vec_all_named(&entry) - .map_err(Error::RmpEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - tx.insert(&tree_key[..], new_entry_bytes)?; + let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { + Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) + .map_err(Error::RmpDecode) + .map_err(db::TxError::Abort)?, + None => LocalCounterEntry { + values: BTreeMap::new(), + }, + }; + + for (s, inc) in counts.iter() { + let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); + ent.0 += 1; + ent.1 += *inc; + } - Ok(entry) - })?; + let new_entry_bytes = rmp_to_vec_all_named(&entry) + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)?; + tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; - if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), new_entry)) { + if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) { error!( "Could not propagate updated counter values, failed to send to channel: {}", e diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 8b7cc08a..991fe66d 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::sync::Arc; +use garage_db as db; use garage_util::data::*; use garage_table::crdt::*; @@ -221,7 +222,12 @@ impl TableSchema for K2VItemTable { type E = K2VItem; type Filter = ItemFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + fn updated( + &self, + tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { // 1. Count let (old_entries, old_conflicts, old_values, old_bytes) = match old { None => (0, 0, 0, 0), @@ -239,7 +245,8 @@ impl TableSchema for K2VItemTable { .map(|e| &e.partition.partition_key) .unwrap_or_else(|| &new.unwrap().partition.partition_key); - if let Err(e) = self.counter_table.count( + let counter_res = self.counter_table.count( + tx, &count_pk, count_sk, &[ @@ -248,14 +255,23 @@ impl TableSchema for K2VItemTable { (VALUES, new_values - old_values), (BYTES, new_bytes - old_bytes), ], - ) { - error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e); + ); + if let Err(e) = db::unabort(counter_res)? { + // This result can be returned by `counter_table.count()` for instance + // if messagepack serialization or deserialization fails at some step. + // Warn admin but ignore this error for now, that's all we can do. + error!( + "Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!", + count_pk, count_sk, e + ); } // 2. Notify if let Some(new_ent) = new { self.subscriptions.notify(new_ent); } + + Ok(()) } #[allow(clippy::nonminimal_bool)] diff --git a/src/model/migrate.rs b/src/model/migrate.rs index 7e61957a..25acb4b0 100644 --- a/src/model/migrate.rs +++ b/src/model/migrate.rs @@ -25,11 +25,15 @@ impl Migrate { .open_tree("bucket:table") .map_err(GarageError::from)?; - for res in tree.iter() { + let mut old_buckets = vec![]; + for res in tree.iter().map_err(GarageError::from)? { let (_k, v) = res.map_err(GarageError::from)?; let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..]) .map_err(GarageError::from)?; + old_buckets.push(bucket); + } + for bucket in old_buckets { if let old_bucket::BucketState::Present(p) = bucket.state.get() { self.migrate_buckets050_do_bucket(&bucket, p).await?; } diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs index 9b3991bf..9589b4aa 100644 --- a/src/model/s3/block_ref_table.rs +++ b/src/model/s3/block_ref_table.rs @@ -1,6 +1,8 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; +use garage_db as db; + use garage_util::data::*; use garage_table::crdt::Crdt; @@ -51,21 +53,22 @@ impl TableSchema for BlockRefTable { type E = BlockRef; type Filter = DeletedFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { - #[allow(clippy::or_fun_call)] - let block = &old.or(new).unwrap().block; + fn updated( + &self, + tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { + let block = old.or(new).unwrap().block; let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false); let is_after = new.map(|x| !x.deleted.get()).unwrap_or(false); if is_after && !was_before { - if let Err(e) = self.block_manager.block_incref(block) { - warn!("block_incref failed for block {:?}: {}", block, e); - } + self.block_manager.block_incref(tx, block)?; } if was_before && !is_after { - if let Err(e) = self.block_manager.block_decref(block) { - warn!("block_decref failed for block {:?}: {}", block, e); - } + self.block_manager.block_decref(tx, block)?; } + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 3d9a89f7..62f5d8d9 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::sync::Arc; +use garage_db as db; + use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -232,7 +234,12 @@ impl TableSchema for ObjectTable { type E = Object; type Filter = ObjectFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + fn updated( + &self, + _tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { let version_table = self.version_table.clone(); let old = old.cloned(); let new = new.cloned(); @@ -259,7 +266,8 @@ impl TableSchema for ObjectTable { } } Ok(()) - }) + }); + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index ad096772..881c245a 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -1,6 +1,8 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; +use garage_db as db; + use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -137,7 +139,12 @@ impl TableSchema for VersionTable { type E = Version; type Filter = DeletedFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + fn updated( + &self, + _tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { let block_ref_table = self.block_ref_table.clone(); let old = old.cloned(); let new = new.cloned(); @@ -160,7 +167,9 @@ impl TableSchema for VersionTable { } } Ok(()) - }) + }); + + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { -- cgit v1.2.3 From 77e3fd6db2c9cd3a10889bd071e95ef839cfbefc Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 15 Jun 2022 20:20:28 +0200 Subject: improve internal item counter mechanisms and implement bucket quotas (#326) - [x] Refactoring of internal counting API - [x] Repair procedure for counters (it's an offline procedure!!!) - [x] New counter for objects in buckets - [x] Add quotas to buckets struct - [x] Add CLI to manage bucket quotas - [x] Add admin API to manage bucket quotas - [x] Apply quotas by adding checks on put operations - [x] Proof-read Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/326 Co-authored-by: Alex Co-committed-by: Alex --- src/model/bucket_table.rs | 19 +++- src/model/garage.rs | 67 +++++++++-- src/model/index_counter.rs | 250 +++++++++++++++++++++++++++++++++-------- src/model/k2v/counter_table.rs | 20 ---- src/model/k2v/item_table.rs | 102 +++++++++-------- src/model/k2v/mod.rs | 1 - src/model/migrate.rs | 1 + src/model/s3/object_table.rs | 61 +++++++++- 8 files changed, 392 insertions(+), 129 deletions(-) delete mode 100644 src/model/k2v/counter_table.rs (limited to 'src/model') diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 7c7b9f30..130eb6a6 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -use garage_table::crdt::Crdt; +use garage_table::crdt::*; use garage_table::*; use garage_util::data::*; use garage_util::time::*; @@ -44,6 +44,9 @@ pub struct BucketParams { pub website_config: crdt::Lww>, /// CORS rules pub cors_config: crdt::Lww>>, + /// Bucket quotas + #[serde(default)] + pub quotas: crdt::Lww, } #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] @@ -62,6 +65,18 @@ pub struct CorsRule { pub expose_headers: Vec, } +#[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct BucketQuotas { + /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket) + pub max_size: Option, + /// Maximum number of non-deleted objects in the bucket + pub max_objects: Option, +} + +impl AutoCrdt for BucketQuotas { + const WARN_IF_DIFFERENT: bool = true; +} + impl BucketParams { /// Create an empty BucketParams with no authorized keys and no website accesss pub fn new() -> Self { @@ -72,6 +87,7 @@ impl BucketParams { local_aliases: crdt::LwwMap::new(), website_config: crdt::Lww::new(None), cors_config: crdt::Lww::new(None), + quotas: crdt::Lww::new(BucketQuotas::default()), } } } @@ -86,6 +102,7 @@ impl Crdt for BucketParams { self.website_config.merge(&o.website_config); self.cors_config.merge(&o.cors_config); + self.quotas.merge(&o.quotas); } } diff --git a/src/model/garage.rs b/src/model/garage.rs index 280f3dc7..15769a17 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -6,6 +6,7 @@ use garage_db as db; use garage_util::background::*; use garage_util::config::*; +use garage_util::error::Error; use garage_rpc::system::System; @@ -22,12 +23,11 @@ use crate::s3::version_table::*; use crate::bucket_alias_table::*; use crate::bucket_table::*; use crate::helper; +use crate::index_counter::*; use crate::key_table::*; #[cfg(feature = "k2v")] -use crate::index_counter::*; -#[cfg(feature = "k2v")] -use crate::k2v::{counter_table::*, item_table::*, poll::*, rpc::*}; +use crate::k2v::{item_table::*, poll::*, rpc::*}; /// An entire Garage full of data pub struct Garage { @@ -52,6 +52,8 @@ pub struct Garage { /// Table containing S3 objects pub object_table: Arc>, + /// Counting table containing object counters + pub object_counter_table: Arc>, /// Table containing S3 object versions pub version_table: Arc>, /// Table containing S3 block references (not blocks themselves) @@ -66,14 +68,57 @@ pub struct GarageK2V { /// Table containing K2V items pub item_table: Arc>, /// Indexing table containing K2V item counters - pub counter_table: Arc>, + pub counter_table: Arc>, /// K2V RPC handler pub rpc: Arc, } impl Garage { /// Create and run garage - pub fn new(config: Config, db: db::Db, background: Arc) -> Arc { + pub fn new(config: Config, background: Arc) -> Result, Error> { + info!("Opening database..."); + let mut db_path = config.metadata_dir.clone(); + std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory"); + let db = match config.db_engine.as_str() { + "sled" => { + db_path.push("db"); + info!("Opening Sled database at: {}", db_path.display()); + let db = db::sled_adapter::sled::Config::default() + .path(&db_path) + .cache_capacity(config.sled_cache_capacity) + .flush_every_ms(Some(config.sled_flush_every_ms)) + .open() + .expect("Unable to open sled DB"); + db::sled_adapter::SledDb::init(db) + } + "sqlite" | "sqlite3" | "rusqlite" => { + db_path.push("db.sqlite"); + info!("Opening Sqlite database at: {}", db_path.display()); + let db = db::sqlite_adapter::rusqlite::Connection::open(db_path) + .expect("Unable to open sqlite DB"); + db::sqlite_adapter::SqliteDb::init(db) + } + "lmdb" | "heed" => { + db_path.push("db.lmdb"); + info!("Opening LMDB database at: {}", db_path.display()); + std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory"); + let map_size = garage_db::lmdb_adapter::recommended_map_size(); + + let db = db::lmdb_adapter::heed::EnvOpenOptions::new() + .max_dbs(100) + .map_size(map_size) + .open(&db_path) + .expect("Unable to open LMDB DB"); + db::lmdb_adapter::LmdbDb::init(db) + } + e => { + return Err(Error::Message(format!( + "Unsupported DB engine: {} (options: sled, sqlite, lmdb)", + e + ))); + } + }; + let network_key = NetworkKey::from_slice( &hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..], ) @@ -155,12 +200,16 @@ impl Garage { &db, ); + info!("Initialize object counter table..."); + let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db); + info!("Initialize object_table..."); #[allow(clippy::redundant_clone)] let object_table = Table::new( ObjectTable { background: background.clone(), version_table: version_table.clone(), + object_counter_table: object_counter_table.clone(), }, meta_rep_param.clone(), system.clone(), @@ -171,9 +220,8 @@ impl Garage { #[cfg(feature = "k2v")] let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param); - info!("Initialize Garage..."); - - Arc::new(Self { + // -- done -- + Ok(Arc::new(Self { config, db, background, @@ -183,11 +231,12 @@ impl Garage { bucket_alias_table, key_table, object_table, + object_counter_table, version_table, block_ref_table, #[cfg(feature = "k2v")] k2v, - }) + })) } pub fn bucket_helper(&self) -> helper::bucket::BucketHelper { diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 2602d5d9..36e8172b 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -1,3 +1,4 @@ +use core::ops::Bound; use std::collections::{hash_map, BTreeMap, HashMap}; use std::marker::PhantomData; use std::sync::Arc; @@ -12,30 +13,36 @@ use garage_rpc::ring::Ring; use garage_rpc::system::System; use garage_util::data::*; use garage_util::error::*; +use garage_util::time::*; use garage_table::crdt::*; -use garage_table::replication::TableShardedReplication; +use garage_table::replication::*; use garage_table::*; -pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static { - const NAME: &'static str; - type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; - type S: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; +pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static { + const COUNTER_TABLE_NAME: &'static str; + + type CP: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type CS: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + + fn counter_partition_key(&self) -> &Self::CP; + fn counter_sort_key(&self) -> &Self::CS; + fn counts(&self) -> Vec<(&'static str, i64)>; } /// A counter entry in the global table -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct CounterEntry { - pub pk: T::P, - pub sk: T::S, +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +pub struct CounterEntry { + pub pk: T::CP, + pub sk: T::CS, pub values: BTreeMap, } -impl Entry for CounterEntry { - fn partition_key(&self) -> &T::P { +impl Entry for CounterEntry { + fn partition_key(&self) -> &T::CP { &self.pk } - fn sort_key(&self) -> &T::S { + fn sort_key(&self) -> &T::CS { &self.sk } fn is_tombstone(&self) -> bool { @@ -45,7 +52,7 @@ impl Entry for CounterEntry { } } -impl CounterEntry { +impl CounterEntry { pub fn filtered_values(&self, ring: &Ring) -> HashMap { let nodes = &ring.layout.node_id_vec[..]; self.filtered_values_with_nodes(nodes) @@ -78,7 +85,7 @@ pub struct CounterValue { pub node_values: BTreeMap, } -impl Crdt for CounterEntry { +impl Crdt for CounterEntry { fn merge(&mut self, other: &Self) { for (name, e2) in other.values.iter() { if let Some(e) = self.values.get_mut(name) { @@ -104,15 +111,15 @@ impl Crdt for CounterValue { } } -pub struct CounterTable { +pub struct CounterTable { _phantom_t: PhantomData, } -impl TableSchema for CounterTable { - const TABLE_NAME: &'static str = T::NAME; +impl TableSchema for CounterTable { + const TABLE_NAME: &'static str = T::COUNTER_TABLE_NAME; - type P = T::P; - type S = T::S; + type P = T::CP; + type S = T::CS; type E = CounterEntry; type Filter = (DeletedFilter, Vec); @@ -131,14 +138,14 @@ impl TableSchema for CounterTable { // ---- -pub struct IndexCounter { +pub struct IndexCounter { this_node: Uuid, local_counter: db::Tree, - propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>, + propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry)>, pub table: Arc, TableShardedReplication>>, } -impl IndexCounter { +impl IndexCounter { pub fn new( system: Arc, replication: TableShardedReplication, @@ -151,7 +158,7 @@ impl IndexCounter { let this = Arc::new(Self { this_node: system.id, local_counter: db - .open_tree(format!("local_counter:{}", T::NAME)) + .open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME)) .expect("Unable to open local counter tree"), propagate_tx, table: Table::new( @@ -166,7 +173,7 @@ impl IndexCounter { let this2 = this.clone(); background.spawn_worker( - format!("{} index counter propagator", T::NAME), + format!("{} index counter propagator", T::COUNTER_TABLE_NAME), move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit), ); this @@ -175,24 +182,45 @@ impl IndexCounter { pub fn count( &self, tx: &mut db::Transaction, - pk: &T::P, - sk: &T::S, - counts: &[(&str, i64)], + old: Option<&T>, + new: Option<&T>, ) -> db::TxResult<(), Error> { + let pk = old + .map(|e| e.counter_partition_key()) + .unwrap_or_else(|| new.unwrap().counter_partition_key()); + let sk = old + .map(|e| e.counter_sort_key()) + .unwrap_or_else(|| new.unwrap().counter_sort_key()); + + // calculate counter differences + let mut counts = HashMap::new(); + for (k, v) in old.map(|x| x.counts()).unwrap_or_default() { + *counts.entry(k).or_insert(0) -= v; + } + for (k, v) in new.map(|x| x.counts()).unwrap_or_default() { + *counts.entry(k).or_insert(0) += v; + } + + // update local counter table let tree_key = self.table.data.tree_key(pk, sk); let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { - Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) - .map_err(Error::RmpDecode) - .map_err(db::TxError::Abort)?, + Some(old_bytes) => { + rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) + .map_err(Error::RmpDecode) + .map_err(db::TxError::Abort)? + } None => LocalCounterEntry { + pk: pk.clone(), + sk: sk.clone(), values: BTreeMap::new(), }, }; + let now = now_msec(); for (s, inc) in counts.iter() { let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); - ent.0 += 1; + ent.0 = std::cmp::max(ent.0 + 1, now); ent.1 += *inc; } @@ -213,7 +241,7 @@ impl IndexCounter { async fn propagate_loop( self: Arc, - mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>, + mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>, must_exit: watch::Receiver, ) { // This loop batches updates to counters to be sent all at once. @@ -236,7 +264,7 @@ impl IndexCounter { if let Some((pk, sk, counters)) = ent { let tree_key = self.table.data.tree_key(&pk, &sk); - let dist_entry = counters.into_counter_entry::(self.this_node, pk, sk); + let dist_entry = counters.into_counter_entry(self.this_node); match buf.entry(tree_key) { hash_map::Entry::Vacant(e) => { e.insert(dist_entry); @@ -255,10 +283,10 @@ impl IndexCounter { if let Err(e) = self.table.insert_many(entries).await { errors += 1; if errors >= 2 && *must_exit.borrow() { - error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e); + error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e); break; } - warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors); + warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors); tokio::time::sleep(Duration::from_secs(5)).await; continue; } @@ -272,23 +300,155 @@ impl IndexCounter { } } } + + pub fn offline_recount_all( + &self, + counted_table: &Arc>, + ) -> Result<(), Error> + where + TS: TableSchema, + TR: TableReplication, + { + let save_counter_entry = |entry: CounterEntry| -> Result<(), Error> { + let entry_k = self + .table + .data + .tree_key(entry.partition_key(), entry.sort_key()); + self.table + .data + .update_entry_with(&entry_k, |ent| match ent { + Some(mut ent) => { + ent.merge(&entry); + ent + } + None => entry.clone(), + })?; + Ok(()) + }; + + // 1. Set all old local counters to zero + let now = now_msec(); + let mut next_start: Option> = None; + loop { + let low_bound = match next_start.take() { + Some(v) => Bound::Excluded(v), + None => Bound::Unbounded, + }; + let mut batch = vec![]; + for item in self.local_counter.range((low_bound, Bound::Unbounded))? { + batch.push(item?); + if batch.len() > 1000 { + break; + } + } + + if batch.is_empty() { + break; + } + + info!("zeroing old counters... ({})", hex::encode(&batch[0].0)); + for (local_counter_k, local_counter) in batch { + let mut local_counter = + rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&local_counter)?; + + for (_, tv) in local_counter.values.iter_mut() { + tv.0 = std::cmp::max(tv.0 + 1, now); + tv.1 = 0; + } + + let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + self.local_counter + .insert(&local_counter_k, &local_counter_bytes)?; + + let counter_entry = local_counter.into_counter_entry(self.this_node); + save_counter_entry(counter_entry)?; + + next_start = Some(local_counter_k); + } + } + + // 2. Recount all table entries + let now = now_msec(); + let mut next_start: Option> = None; + loop { + let low_bound = match next_start.take() { + Some(v) => Bound::Excluded(v), + None => Bound::Unbounded, + }; + let mut batch = vec![]; + for item in counted_table + .data + .store + .range((low_bound, Bound::Unbounded))? + { + batch.push(item?); + if batch.len() > 1000 { + break; + } + } + + if batch.is_empty() { + break; + } + + info!("counting entries... ({})", hex::encode(&batch[0].0)); + for (counted_entry_k, counted_entry) in batch { + let counted_entry = counted_table.data.decode_entry(&counted_entry)?; + + let pk = counted_entry.counter_partition_key(); + let sk = counted_entry.counter_sort_key(); + let counts = counted_entry.counts(); + + let local_counter_key = self.table.data.tree_key(pk, sk); + let mut local_counter = match self.local_counter.get(&local_counter_key)? { + Some(old_bytes) => { + let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>( + &old_bytes, + )?; + assert!(ent.pk == *pk); + assert!(ent.sk == *sk); + ent + } + None => LocalCounterEntry { + pk: pk.clone(), + sk: sk.clone(), + values: BTreeMap::new(), + }, + }; + for (s, v) in counts.iter() { + let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0)); + tv.0 = std::cmp::max(tv.0 + 1, now); + tv.1 += v; + } + + let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + self.local_counter + .insert(&local_counter_key, local_counter_bytes)?; + + let counter_entry = local_counter.into_counter_entry(self.this_node); + save_counter_entry(counter_entry)?; + + next_start = Some(counted_entry_k); + } + } + + // Done + Ok(()) + } } #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -struct LocalCounterEntry { +struct LocalCounterEntry { + pk: T::CP, + sk: T::CS, values: BTreeMap, } -impl LocalCounterEntry { - fn into_counter_entry( - self, - this_node: Uuid, - pk: T::P, - sk: T::S, - ) -> CounterEntry { +impl LocalCounterEntry { + fn into_counter_entry(self, this_node: Uuid) -> CounterEntry { CounterEntry { - pk, - sk, + pk: self.pk, + sk: self.sk, values: self .values .into_iter() diff --git a/src/model/k2v/counter_table.rs b/src/model/k2v/counter_table.rs deleted file mode 100644 index 4856eb2b..00000000 --- a/src/model/k2v/counter_table.rs +++ /dev/null @@ -1,20 +0,0 @@ -use garage_util::data::*; - -use crate::index_counter::*; - -pub const ENTRIES: &str = "entries"; -pub const CONFLICTS: &str = "conflicts"; -pub const VALUES: &str = "values"; -pub const BYTES: &str = "bytes"; - -#[derive(PartialEq, Clone)] -pub struct K2VCounterTable; - -impl CounterSchema for K2VCounterTable { - const NAME: &'static str = "k2v_index_counter"; - - // Partition key = bucket id - type P = Uuid; - // Sort key = K2V item's partition key - type S = String; -} diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 991fe66d..baa1db4b 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -10,9 +10,13 @@ use garage_table::*; use crate::index_counter::*; use crate::k2v::causality::*; -use crate::k2v::counter_table::*; use crate::k2v::poll::*; +pub const ENTRIES: &str = "entries"; +pub const CONFLICTS: &str = "conflicts"; +pub const VALUES: &str = "values"; +pub const BYTES: &str = "bytes"; + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct K2VItem { pub partition: K2VItemPartition, @@ -112,27 +116,6 @@ impl K2VItem { ent.discard(); } } - - // returns counters: (non-deleted entries, conflict entries, non-tombstone values, bytes used) - fn stats(&self) -> (i64, i64, i64, i64) { - let values = self.values(); - - let n_entries = if self.is_tombstone() { 0 } else { 1 }; - let n_conflicts = if values.len() > 1 { 1 } else { 0 }; - let n_values = values - .iter() - .filter(|v| matches!(v, DvvsValue::Value(_))) - .count() as i64; - let n_bytes = values - .iter() - .map(|v| match v { - DvvsValue::Deleted => 0, - DvvsValue::Value(v) => v.len() as i64, - }) - .sum(); - - (n_entries, n_conflicts, n_values, n_bytes) - } } impl DvvsEntry { @@ -204,7 +187,7 @@ impl Entry for K2VItem { } pub struct K2VItemTable { - pub(crate) counter_table: Arc>, + pub(crate) counter_table: Arc>, pub(crate) subscriptions: Arc, } @@ -229,40 +212,14 @@ impl TableSchema for K2VItemTable { new: Option<&Self::E>, ) -> db::TxOpResult<()> { // 1. Count - let (old_entries, old_conflicts, old_values, old_bytes) = match old { - None => (0, 0, 0, 0), - Some(e) => e.stats(), - }; - let (new_entries, new_conflicts, new_values, new_bytes) = match new { - None => (0, 0, 0, 0), - Some(e) => e.stats(), - }; - - let count_pk = old - .map(|e| e.partition.bucket_id) - .unwrap_or_else(|| new.unwrap().partition.bucket_id); - let count_sk = old - .map(|e| &e.partition.partition_key) - .unwrap_or_else(|| &new.unwrap().partition.partition_key); - - let counter_res = self.counter_table.count( - tx, - &count_pk, - count_sk, - &[ - (ENTRIES, new_entries - old_entries), - (CONFLICTS, new_conflicts - old_conflicts), - (VALUES, new_values - old_values), - (BYTES, new_bytes - old_bytes), - ], - ); + let counter_res = self.counter_table.count(tx, old, new); if let Err(e) = db::unabort(counter_res)? { // This result can be returned by `counter_table.count()` for instance // if messagepack serialization or deserialization fails at some step. // Warn admin but ignore this error for now, that's all we can do. error!( - "Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!", - count_pk, count_sk, e + "Unable to update K2V item counter: {}. Index values will be wrong!", + e ); } @@ -282,6 +239,47 @@ impl TableSchema for K2VItemTable { } } +impl CountedItem for K2VItem { + const COUNTER_TABLE_NAME: &'static str = "k2v_index_counter_v2"; + + // Partition key = bucket id + type CP = Uuid; + // Sort key = K2V item's partition key + type CS = String; + + fn counter_partition_key(&self) -> &Uuid { + &self.partition.bucket_id + } + fn counter_sort_key(&self) -> &String { + &self.partition.partition_key + } + + fn counts(&self) -> Vec<(&'static str, i64)> { + let values = self.values(); + + let n_entries = if self.is_tombstone() { 0 } else { 1 }; + let n_conflicts = if values.len() > 1 { 1 } else { 0 }; + let n_values = values + .iter() + .filter(|v| matches!(v, DvvsValue::Value(_))) + .count() as i64; + let n_bytes = values + .iter() + .map(|v| match v { + DvvsValue::Deleted => 0, + DvvsValue::Value(v) => v.len() as i64, + }) + .sum(); + + vec![ + (ENTRIES, n_entries), + (CONFLICTS, n_conflicts), + (VALUES, n_values), + (BYTES, n_bytes), + ] + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs index 664172a6..f6a96151 100644 --- a/src/model/k2v/mod.rs +++ b/src/model/k2v/mod.rs @@ -1,6 +1,5 @@ pub mod causality; -pub mod counter_table; pub mod item_table; pub mod poll; diff --git a/src/model/migrate.rs b/src/model/migrate.rs index 25acb4b0..5fc67069 100644 --- a/src/model/migrate.rs +++ b/src/model/migrate.rs @@ -77,6 +77,7 @@ impl Migrate { local_aliases: LwwMap::new(), website_config: Lww::new(website), cors_config: Lww::new(None), + quotas: Lww::new(Default::default()), }), }) .await?; diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 62f5d8d9..a3914c36 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -11,10 +11,15 @@ use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; use garage_table::*; +use crate::index_counter::*; use crate::s3::version_table::*; use garage_model_050::object_table as old; +pub const OBJECTS: &str = "objects"; +pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads"; +pub const BYTES: &str = "bytes"; + /// An object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { @@ -218,6 +223,7 @@ impl Crdt for Object { pub struct ObjectTable { pub background: Arc, pub version_table: Arc>, + pub object_counter_table: Arc>, } #[derive(Clone, Copy, Debug, Serialize, Deserialize)] @@ -236,10 +242,20 @@ impl TableSchema for ObjectTable { fn updated( &self, - _tx: &mut db::Transaction, + tx: &mut db::Transaction, old: Option<&Self::E>, new: Option<&Self::E>, ) -> db::TxOpResult<()> { + // 1. Count + let counter_res = self.object_counter_table.count(tx, old, new); + if let Err(e) = db::unabort(counter_res)? { + error!( + "Unable to update object counter: {}. Index values will be wrong!", + e + ); + } + + // 2. Spawn threads that propagates deletions to version table let version_table = self.version_table.clone(); let old = old.cloned(); let new = new.cloned(); @@ -283,6 +299,49 @@ impl TableSchema for ObjectTable { } } +impl CountedItem for Object { + const COUNTER_TABLE_NAME: &'static str = "bucket_object_counter"; + + // Partition key = bucket id + type CP = Uuid; + // Sort key = nothing + type CS = EmptyKey; + + fn counter_partition_key(&self) -> &Uuid { + &self.bucket_id + } + fn counter_sort_key(&self) -> &EmptyKey { + &EmptyKey + } + + fn counts(&self) -> Vec<(&'static str, i64)> { + let versions = self.versions(); + let n_objects = if versions.iter().any(|v| v.is_data()) { + 1 + } else { + 0 + }; + let n_unfinished_uploads = versions + .iter() + .filter(|v| matches!(v.state, ObjectVersionState::Uploading(_))) + .count(); + let n_bytes = versions + .iter() + .map(|v| match &v.state { + ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) + | ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta.size, + _ => 0, + }) + .sum::(); + + vec![ + (OBJECTS, n_objects), + (UNFINISHED_UPLOADS, n_unfinished_uploads as i64), + (BYTES, n_bytes as i64), + ] + } +} + // vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv // (we just want to change bucket into bucket_id by hashing it) -- cgit v1.2.3 From 4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7 Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 8 Jul 2022 13:30:26 +0200 Subject: Background task manager (#332) - [x] New background worker trait - [x] Adapt all current workers to use new API - [x] Command to list currently running workers, and whether they are active, idle, or dead - [x] Error reporting - Optimizations - [x] Merkle updater: several items per iteration - [ ] Use `tokio::task::spawn_blocking` where appropriate so that CPU-intensive tasks don't block other things going on - scrub: - [x] have only one worker with a channel to start/pause/cancel - [x] automatic scrub - [x] ability to view and change tranquility from CLI - [x] persistence of a few info - [ ] Testing Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/332 Co-authored-by: Alex Co-committed-by: Alex --- src/model/index_counter.rs | 169 +++++++++++++++++++++++++++------------------ 1 file changed, 101 insertions(+), 68 deletions(-) (limited to 'src/model') diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 36e8172b..26833390 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -2,8 +2,8 @@ use core::ops::Bound; use std::collections::{hash_map, BTreeMap, HashMap}; use std::marker::PhantomData; use std::sync::Arc; -use std::time::Duration; +use async_trait::async_trait; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch}; @@ -11,6 +11,7 @@ use garage_db as db; use garage_rpc::ring::Ring; use garage_rpc::system::System; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; @@ -171,11 +172,13 @@ impl IndexCounter { ), }); - let this2 = this.clone(); - background.spawn_worker( - format!("{} index counter propagator", T::COUNTER_TABLE_NAME), - move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit), - ); + background.spawn_worker(IndexPropagatorWorker { + index_counter: this.clone(), + propagate_rx, + buf: HashMap::new(), + errors: 0, + }); + this } @@ -239,68 +242,6 @@ impl IndexCounter { Ok(()) } - async fn propagate_loop( - self: Arc, - mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>, - must_exit: watch::Receiver, - ) { - // This loop batches updates to counters to be sent all at once. - // They are sent once the propagate_rx channel has been emptied (or is closed). - let mut buf = HashMap::new(); - let mut errors = 0; - - loop { - let (ent, closed) = match propagate_rx.try_recv() { - Ok(ent) => (Some(ent), false), - Err(mpsc::error::TryRecvError::Empty) if buf.is_empty() => { - match propagate_rx.recv().await { - Some(ent) => (Some(ent), false), - None => (None, true), - } - } - Err(mpsc::error::TryRecvError::Empty) => (None, false), - Err(mpsc::error::TryRecvError::Disconnected) => (None, true), - }; - - if let Some((pk, sk, counters)) = ent { - let tree_key = self.table.data.tree_key(&pk, &sk); - let dist_entry = counters.into_counter_entry(self.this_node); - match buf.entry(tree_key) { - hash_map::Entry::Vacant(e) => { - e.insert(dist_entry); - } - hash_map::Entry::Occupied(mut e) => { - e.get_mut().merge(&dist_entry); - } - } - // As long as we can add entries, loop back and add them to batch - // before sending batch to other nodes - continue; - } - - if !buf.is_empty() { - let entries = buf.iter().map(|(_k, v)| v); - if let Err(e) = self.table.insert_many(entries).await { - errors += 1; - if errors >= 2 && *must_exit.borrow() { - error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e); - break; - } - warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors); - tokio::time::sleep(Duration::from_secs(5)).await; - continue; - } - - buf.clear(); - errors = 0; - } - - if closed || *must_exit.borrow() { - break; - } - } - } - pub fn offline_recount_all( &self, counted_table: &Arc>, @@ -437,6 +378,98 @@ impl IndexCounter { } } +struct IndexPropagatorWorker { + index_counter: Arc>, + propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>, + + buf: HashMap, CounterEntry>, + errors: usize, +} + +impl IndexPropagatorWorker { + fn add_ent(&mut self, pk: T::CP, sk: T::CS, counters: LocalCounterEntry) { + let tree_key = self.index_counter.table.data.tree_key(&pk, &sk); + let dist_entry = counters.into_counter_entry(self.index_counter.this_node); + match self.buf.entry(tree_key) { + hash_map::Entry::Vacant(e) => { + e.insert(dist_entry); + } + hash_map::Entry::Occupied(mut e) => { + e.get_mut().merge(&dist_entry); + } + } + } +} + +#[async_trait] +impl Worker for IndexPropagatorWorker { + fn name(&self) -> String { + format!("{} index counter propagator", T::COUNTER_TABLE_NAME) + } + + fn info(&self) -> Option { + if !self.buf.is_empty() { + Some(format!("{} items in queue", self.buf.len())) + } else { + None + } + } + + async fn work(&mut self, must_exit: &mut watch::Receiver) -> Result { + // This loop batches updates to counters to be sent all at once. + // They are sent once the propagate_rx channel has been emptied (or is closed). + let closed = loop { + match self.propagate_rx.try_recv() { + Ok((pk, sk, counters)) => { + self.add_ent(pk, sk, counters); + } + Err(mpsc::error::TryRecvError::Empty) => break false, + Err(mpsc::error::TryRecvError::Disconnected) => break true, + } + }; + + if !self.buf.is_empty() { + let entries_k = self.buf.keys().take(100).cloned().collect::>(); + let entries = entries_k.iter().map(|k| self.buf.get(k).unwrap()); + if let Err(e) = self.index_counter.table.insert_many(entries).await { + self.errors += 1; + if self.errors >= 2 && *must_exit.borrow() { + error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e); + return Ok(WorkerState::Done); + } + // Propagate error up to worker manager, it will log it, increment a counter, + // and sleep for a certain delay (with exponential backoff), waiting for + // things to go back to normal + return Err(e); + } else { + for k in entries_k { + self.buf.remove(&k); + } + self.errors = 0; + } + + return Ok(WorkerState::Busy); + } else if closed { + return Ok(WorkerState::Done); + } else { + return Ok(WorkerState::Idle); + } + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { + match self.propagate_rx.recv().await { + Some((pk, sk, counters)) => { + self.add_ent(pk, sk, counters); + WorkerState::Busy + } + None => match self.buf.is_empty() { + false => WorkerState::Busy, + true => WorkerState::Done, + }, + } + } +} + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] struct LocalCounterEntry { pk: T::CP, -- cgit v1.2.3 From 2f111e6b3d772b10c8ed6279ce0c82d22852afd1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 18 Jul 2022 18:40:57 +0200 Subject: Performance improvements: - reduce contention on mutation_lock by having 256 of them - better lmdb defaults --- src/model/garage.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) (limited to 'src/model') diff --git a/src/model/garage.rs b/src/model/garage.rs index 15769a17..0d239df6 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -104,11 +104,16 @@ impl Garage { std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory"); let map_size = garage_db::lmdb_adapter::recommended_map_size(); - let db = db::lmdb_adapter::heed::EnvOpenOptions::new() - .max_dbs(100) - .map_size(map_size) - .open(&db_path) - .expect("Unable to open LMDB DB"); + use db::lmdb_adapter::heed; + let mut env_builder = heed::EnvOpenOptions::new(); + env_builder.max_dbs(100); + env_builder.max_readers(500); + env_builder.map_size(map_size); + unsafe { + env_builder.flag(heed::flags::Flags::MdbNoSync); + env_builder.flag(heed::flags::Flags::MdbNoMetaSync); + } + let db = env_builder.open(&db_path).expect("Unable to open LMDB DB"); db::lmdb_adapter::LmdbDb::init(db) } e => { -- cgit v1.2.3 From 8e7e680afe39f48fe15f365c9ef3fee57596e119 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 15:20:00 +0200 Subject: First adaptation to WIP netapp with streaming body --- src/model/Cargo.toml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index d908dc01..a97bce4d 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -40,9 +40,8 @@ futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } opentelemetry = "0.17" -#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -#netapp = { version = "0.4", path = "../../../netapp" } -netapp = "0.4" +#netapp = "0.4" +netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } [features] k2v = [ "garage_util/k2v" ] -- cgit v1.2.3 From a35d4da721db3550a2833d8576d4283bc999e8df Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 16:45:45 +0200 Subject: update netapp to 0.5 --- src/model/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index a97bce4d..73011e0d 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -41,7 +41,7 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi opentelemetry = "0.17" #netapp = "0.4" -netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } +netapp = { version = "0.5", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } [features] k2v = [ "garage_util/k2v" ] -- cgit v1.2.3 From 943d76c583f5740b1d922275a673233a27fe1693 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 2 Sep 2022 15:34:21 +0200 Subject: Ability to dynamically set resync tranquility --- src/model/garage.rs | 1 - 1 file changed, 1 deletion(-) (limited to 'src/model') diff --git a/src/model/garage.rs b/src/model/garage.rs index 15769a17..4dd24582 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -159,7 +159,6 @@ impl Garage { &db, config.data_dir.clone(), config.compression_level, - config.block_manager_background_tranquility, data_rep_param, system.clone(), ); -- cgit v1.2.3 From 48ffaaadfc790142ed9556f5227913fa8c32d2ed Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Sep 2022 16:47:56 +0200 Subject: Bump versions to 0.8.0 (compatibility is broken already) --- src/model/Cargo.toml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index d908dc01..7b831538 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_model" -version = "0.7.0" +version = "0.8.0" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" @@ -15,10 +15,10 @@ path = "lib.rs" [dependencies] garage_db = { version = "0.8.0", path = "../db" } -garage_rpc = { version = "0.7.0", path = "../rpc" } -garage_table = { version = "0.7.0", path = "../table" } -garage_block = { version = "0.7.0", path = "../block" } -garage_util = { version = "0.7.0", path = "../util" } +garage_rpc = { version = "0.8.0", path = "../rpc" } +garage_table = { version = "0.8.0", path = "../table" } +garage_block = { version = "0.8.0", path = "../block" } +garage_util = { version = "0.8.0", path = "../util" } garage_model_050 = { package = "garage_model", version = "0.5.1" } async-trait = "0.1.7" -- cgit v1.2.3 From b886c75450e3ee6a7c2b0a8265d7ada20a4d9d75 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Sep 2022 17:09:43 +0200 Subject: Make all DB engines optional build features --- src/model/Cargo.toml | 3 +++ src/model/garage.rs | 30 ++++++++++++++++++++++++++++-- 2 files changed, 31 insertions(+), 2 deletions(-) (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 7b831538..cb0017b2 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -46,3 +46,6 @@ netapp = "0.4" [features] k2v = [ "garage_util/k2v" ] +lmdb = [ "garage_db/lmdb" ] +sled = [ "garage_db/sled" ] +sqlite = [ "garage_db/sqlite" ] diff --git a/src/model/garage.rs b/src/model/garage.rs index 15769a17..19eecb1e 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -80,6 +80,8 @@ impl Garage { let mut db_path = config.metadata_dir.clone(); std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory"); let db = match config.db_engine.as_str() { + // ---- Sled DB ---- + #[cfg(feature = "sled")] "sled" => { db_path.push("db"); info!("Opening Sled database at: {}", db_path.display()); @@ -91,6 +93,10 @@ impl Garage { .expect("Unable to open sled DB"); db::sled_adapter::SledDb::init(db) } + #[cfg(not(feature = "sled"))] + "sled" => return Err(Error::Message("sled db not available in this build".into())), + // ---- Sqlite DB ---- + #[cfg(feature = "sqlite")] "sqlite" | "sqlite3" | "rusqlite" => { db_path.push("db.sqlite"); info!("Opening Sqlite database at: {}", db_path.display()); @@ -98,6 +104,14 @@ impl Garage { .expect("Unable to open sqlite DB"); db::sqlite_adapter::SqliteDb::init(db) } + #[cfg(not(feature = "sqlite"))] + "sqlite" | "sqlite3" | "rusqlite" => { + return Err(Error::Message( + "sqlite db not available in this build".into(), + )) + } + // ---- LMDB DB ---- + #[cfg(feature = "lmdb")] "lmdb" | "heed" => { db_path.push("db.lmdb"); info!("Opening LMDB database at: {}", db_path.display()); @@ -111,10 +125,22 @@ impl Garage { .expect("Unable to open LMDB DB"); db::lmdb_adapter::LmdbDb::init(db) } + #[cfg(not(feature = "lmdb"))] + "lmdb" | "heed" => return Err(Error::Message("lmdb db not available in this build".into())), + // ---- Unavailable DB engine ---- e => { return Err(Error::Message(format!( - "Unsupported DB engine: {} (options: sled, sqlite, lmdb)", - e + "Unsupported DB engine: {} (options: {})", + e, + vec![ + #[cfg(feature = "sled")] + "sled", + #[cfg(feature = "sqlite")] + "sqlite", + #[cfg(feature = "lmdb")] + "lmdb", + ] + .join(", ") ))); } }; -- cgit v1.2.3 From 0f5689c16920479066277db2880e2ca87f7ca602 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Sep 2022 17:52:50 +0200 Subject: Include code from v0.5.1 directly to remove dependencies --- src/model/Cargo.toml | 1 - src/model/key_table.rs | 2 +- src/model/lib.rs | 3 + src/model/migrate.rs | 2 +- src/model/prev/mod.rs | 1 + src/model/prev/v051/bucket_table.rs | 63 +++++++++++++++ src/model/prev/v051/key_table.rs | 51 ++++++++++++ src/model/prev/v051/mod.rs | 4 + src/model/prev/v051/object_table.rs | 150 +++++++++++++++++++++++++++++++++++ src/model/prev/v051/version_table.rs | 79 ++++++++++++++++++ src/model/s3/object_table.rs | 2 +- src/model/s3/version_table.rs | 2 +- 12 files changed, 355 insertions(+), 5 deletions(-) create mode 100644 src/model/prev/mod.rs create mode 100644 src/model/prev/v051/bucket_table.rs create mode 100644 src/model/prev/v051/key_table.rs create mode 100644 src/model/prev/v051/mod.rs create mode 100644 src/model/prev/v051/object_table.rs create mode 100644 src/model/prev/v051/version_table.rs (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index cb0017b2..bbcfe89c 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -19,7 +19,6 @@ garage_rpc = { version = "0.8.0", path = "../rpc" } garage_table = { version = "0.8.0", path = "../table" } garage_block = { version = "0.8.0", path = "../block" } garage_util = { version = "0.8.0", path = "../util" } -garage_model_050 = { package = "garage_model", version = "0.5.1" } async-trait = "0.1.7" arc-swap = "1.0" diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 330e83f0..7288f6e4 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -6,7 +6,7 @@ use garage_util::data::*; use crate::permission::BucketKeyPerm; -use garage_model_050::key_table as old; +use crate::prev::v051::key_table as old; /// An api key #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] diff --git a/src/model/lib.rs b/src/model/lib.rs index 7c9d9270..4f20ea46 100644 --- a/src/model/lib.rs +++ b/src/model/lib.rs @@ -1,6 +1,9 @@ #[macro_use] extern crate tracing; +// For migration from previous versions +pub(crate) mod prev; + pub mod permission; pub mod index_counter; diff --git a/src/model/migrate.rs b/src/model/migrate.rs index 5fc67069..cd6ad26a 100644 --- a/src/model/migrate.rs +++ b/src/model/migrate.rs @@ -5,7 +5,7 @@ use garage_util::data::*; use garage_util::error::Error as GarageError; use garage_util::time::*; -use garage_model_050::bucket_table as old_bucket; +use crate::prev::v051::bucket_table as old_bucket; use crate::bucket_alias_table::*; use crate::bucket_table::*; diff --git a/src/model/prev/mod.rs b/src/model/prev/mod.rs new file mode 100644 index 00000000..68bb1502 --- /dev/null +++ b/src/model/prev/mod.rs @@ -0,0 +1 @@ +pub(crate) mod v051; diff --git a/src/model/prev/v051/bucket_table.rs b/src/model/prev/v051/bucket_table.rs new file mode 100644 index 00000000..0c52b6ea --- /dev/null +++ b/src/model/prev/v051/bucket_table.rs @@ -0,0 +1,63 @@ +use serde::{Deserialize, Serialize}; + +use garage_table::crdt::Crdt; +use garage_table::*; + +use super::key_table::PermissionSet; + +/// A bucket is a collection of objects +/// +/// Its parameters are not directly accessible as: +/// - It must be possible to merge paramaters, hence the use of a LWW CRDT. +/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Bucket { + /// Name of the bucket + pub name: String, + /// State, and configuration if not deleted, of the bucket + pub state: crdt::Lww, +} + +/// State of a bucket +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum BucketState { + /// The bucket is deleted + Deleted, + /// The bucket exists + Present(BucketParams), +} + +impl Crdt for BucketState { + fn merge(&mut self, o: &Self) { + match o { + BucketState::Deleted => *self = BucketState::Deleted, + BucketState::Present(other_params) => { + if let BucketState::Present(params) = self { + params.merge(other_params); + } + } + } + } +} + +/// Configuration for a bucket +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct BucketParams { + /// Map of key with access to the bucket, and what kind of access they give + pub authorized_keys: crdt::LwwMap, + /// Is the bucket served as http + pub website: crdt::Lww, +} + +impl Crdt for BucketParams { + fn merge(&mut self, o: &Self) { + self.authorized_keys.merge(&o.authorized_keys); + self.website.merge(&o.website); + } +} + +impl Crdt for Bucket { + fn merge(&mut self, other: &Self) { + self.state.merge(&other.state); + } +} diff --git a/src/model/prev/v051/key_table.rs b/src/model/prev/v051/key_table.rs new file mode 100644 index 00000000..dab6caa7 --- /dev/null +++ b/src/model/prev/v051/key_table.rs @@ -0,0 +1,51 @@ +use serde::{Deserialize, Serialize}; + +use garage_table::crdt::*; +use garage_table::*; + +/// An api key +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Key { + /// The id of the key (immutable), used as partition key + pub key_id: String, + + /// The secret_key associated + pub secret_key: String, + + /// Name for the key + pub name: crdt::Lww, + + /// Is the key deleted + pub deleted: crdt::Bool, + + /// Buckets in which the key is authorized. Empty if `Key` is deleted + // CRDT interaction: deleted implies authorized_buckets is empty + pub authorized_buckets: crdt::LwwMap, +} + +/// Permission given to a key in a bucket +#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +pub struct PermissionSet { + /// The key can be used to read the bucket + pub allow_read: bool, + /// The key can be used to write in the bucket + pub allow_write: bool, +} + +impl AutoCrdt for PermissionSet { + const WARN_IF_DIFFERENT: bool = true; +} + +impl Crdt for Key { + fn merge(&mut self, other: &Self) { + self.name.merge(&other.name); + self.deleted.merge(&other.deleted); + + if self.deleted.get() { + self.authorized_buckets.clear(); + } else { + self.authorized_buckets.merge(&other.authorized_buckets); + } + } +} + diff --git a/src/model/prev/v051/mod.rs b/src/model/prev/v051/mod.rs new file mode 100644 index 00000000..7a954752 --- /dev/null +++ b/src/model/prev/v051/mod.rs @@ -0,0 +1,4 @@ +pub(crate) mod bucket_table; +pub(crate) mod key_table; +pub(crate) mod object_table; +pub(crate) mod version_table; diff --git a/src/model/prev/v051/object_table.rs b/src/model/prev/v051/object_table.rs new file mode 100644 index 00000000..fe35d683 --- /dev/null +++ b/src/model/prev/v051/object_table.rs @@ -0,0 +1,150 @@ +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; + +use garage_util::data::*; + +use garage_table::crdt::*; + +/// An object +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Object { + /// The bucket in which the object is stored, used as partition key + pub bucket: String, + + /// The key at which the object is stored in its bucket, used as sorting key + pub key: String, + + /// The list of currenty stored versions of the object + versions: Vec, +} + +impl Object { + /// Get a list of currently stored versions of `Object` + pub fn versions(&self) -> &[ObjectVersion] { + &self.versions[..] + } +} + +/// Informations about a version of an object +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersion { + /// Id of the version + pub uuid: Uuid, + /// Timestamp of when the object was created + pub timestamp: u64, + /// State of the version + pub state: ObjectVersionState, +} + +/// State of an object version +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum ObjectVersionState { + /// The version is being received + Uploading(ObjectVersionHeaders), + /// The version is fully received + Complete(ObjectVersionData), + /// The version uploaded containded errors or the upload was explicitly aborted + Aborted, +} + +impl Crdt for ObjectVersionState { + fn merge(&mut self, other: &Self) { + use ObjectVersionState::*; + match other { + Aborted => { + *self = Aborted; + } + Complete(b) => match self { + Aborted => {} + Complete(a) => { + a.merge(b); + } + Uploading(_) => { + *self = Complete(b.clone()); + } + }, + Uploading(_) => {} + } + } +} + +/// Data stored in object version +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub enum ObjectVersionData { + /// The object was deleted, this Version is a tombstone to mark it as such + DeleteMarker, + /// The object is short, it's stored inlined + Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec), + /// The object is not short, Hash of first block is stored here, next segments hashes are + /// stored in the version table + FirstBlock(ObjectVersionMeta, Hash), +} + +impl AutoCrdt for ObjectVersionData { + const WARN_IF_DIFFERENT: bool = true; +} + +/// Metadata about the object version +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersionMeta { + /// Headers to send to the client + pub headers: ObjectVersionHeaders, + /// Size of the object + pub size: u64, + /// etag of the object + pub etag: String, +} + +/// Additional headers for an object +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersionHeaders { + /// Content type of the object + pub content_type: String, + /// Any other http headers to send + pub other: BTreeMap, +} + +impl ObjectVersion { + fn cmp_key(&self) -> (u64, Uuid) { + (self.timestamp, self.uuid) + } + + /// Is the object version completely received + pub fn is_complete(&self) -> bool { + matches!(self.state, ObjectVersionState::Complete(_)) + } +} + +impl Crdt for Object { + fn merge(&mut self, other: &Self) { + // Merge versions from other into here + for other_v in other.versions.iter() { + match self + .versions + .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) + { + Ok(i) => { + self.versions[i].state.merge(&other_v.state); + } + Err(i) => { + self.versions.insert(i, other_v.clone()); + } + } + } + + // Remove versions which are obsolete, i.e. those that come + // before the last version which .is_complete(). + let last_complete = self + .versions + .iter() + .enumerate() + .rev() + .find(|(_, v)| v.is_complete()) + .map(|(vi, _)| vi); + + if let Some(last_vi) = last_complete { + self.versions = self.versions.drain(last_vi..).collect::>(); + } + } +} + diff --git a/src/model/prev/v051/version_table.rs b/src/model/prev/v051/version_table.rs new file mode 100644 index 00000000..1e658f91 --- /dev/null +++ b/src/model/prev/v051/version_table.rs @@ -0,0 +1,79 @@ +use serde::{Deserialize, Serialize}; + +use garage_util::data::*; + +use garage_table::crdt::*; +use garage_table::*; + +/// A version of an object +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Version { + /// UUID of the version, used as partition key + pub uuid: Uuid, + + // Actual data: the blocks for this version + // In the case of a multipart upload, also store the etags + // of individual parts and check them when doing CompleteMultipartUpload + /// Is this version deleted + pub deleted: crdt::Bool, + /// list of blocks of data composing the version + pub blocks: crdt::Map, + /// Etag of each part in case of a multipart upload, empty otherwise + pub parts_etags: crdt::Map, + + // Back link to bucket+key so that we can figure if + // this was deleted later on + /// Bucket in which the related object is stored + pub bucket: String, + /// Key in which the related object is stored + pub key: String, +} + +#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] +pub struct VersionBlockKey { + /// Number of the part + pub part_number: u64, + /// Offset of this sub-segment in its part + pub offset: u64, +} + +impl Ord for VersionBlockKey { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.part_number + .cmp(&other.part_number) + .then(self.offset.cmp(&other.offset)) + } +} + +impl PartialOrd for VersionBlockKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// Informations about a single block +#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] +pub struct VersionBlock { + /// Blake2 sum of the block + pub hash: Hash, + /// Size of the block + pub size: u64, +} + +impl AutoCrdt for VersionBlock { + const WARN_IF_DIFFERENT: bool = true; +} + +impl Crdt for Version { + fn merge(&mut self, other: &Self) { + self.deleted.merge(&other.deleted); + + if self.deleted.get() { + self.blocks.clear(); + self.parts_etags.clear(); + } else { + self.blocks.merge(&other.blocks); + self.parts_etags.merge(&other.parts_etags); + } + } +} diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index a3914c36..a151f1b1 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -14,7 +14,7 @@ use garage_table::*; use crate::index_counter::*; use crate::s3::version_table::*; -use garage_model_050::object_table as old; +use crate::prev::v051::object_table as old; pub const OBJECTS: &str = "objects"; pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads"; diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index 881c245a..b545e66a 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -12,7 +12,7 @@ use garage_table::*; use crate::s3::block_ref_table::*; -use garage_model_050::version_table as old; +use crate::prev::v051::version_table as old; /// A version of an object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -- cgit v1.2.3 From 6f02c36a89d93b04944a3f0882b6f6b703d9c012 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Sep 2022 17:59:41 +0200 Subject: cargo fmt --- src/model/prev/v051/key_table.rs | 1 - src/model/prev/v051/object_table.rs | 1 - 2 files changed, 2 deletions(-) (limited to 'src/model') diff --git a/src/model/prev/v051/key_table.rs b/src/model/prev/v051/key_table.rs index dab6caa7..fee24741 100644 --- a/src/model/prev/v051/key_table.rs +++ b/src/model/prev/v051/key_table.rs @@ -48,4 +48,3 @@ impl Crdt for Key { } } } - diff --git a/src/model/prev/v051/object_table.rs b/src/model/prev/v051/object_table.rs index fe35d683..cb59b309 100644 --- a/src/model/prev/v051/object_table.rs +++ b/src/model/prev/v051/object_table.rs @@ -147,4 +147,3 @@ impl Crdt for Object { } } } - -- cgit v1.2.3 From db61f41030678c5756c844c8aa41a210c658769e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 7 Sep 2022 11:59:56 +0200 Subject: Move GIT_VERSION injection later in build chain to reduce build times --- src/model/Cargo.toml | 1 + src/model/lib.rs | 1 + src/model/version.rs | 7 +++++++ 3 files changed, 9 insertions(+) create mode 100644 src/model/version.rs (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index bbcfe89c..c41d3f16 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -24,6 +24,7 @@ async-trait = "0.1.7" arc-swap = "1.0" blake2 = "0.9" err-derive = "0.3" +git-version = "0.3.4" hex = "0.4" base64 = "0.13" tracing = "0.1.30" diff --git a/src/model/lib.rs b/src/model/lib.rs index 4f20ea46..43db01c5 100644 --- a/src/model/lib.rs +++ b/src/model/lib.rs @@ -19,3 +19,4 @@ pub mod s3; pub mod garage; pub mod helper; pub mod migrate; +pub mod version; diff --git a/src/model/version.rs b/src/model/version.rs new file mode 100644 index 00000000..cdb3ea62 --- /dev/null +++ b/src/model/version.rs @@ -0,0 +1,7 @@ +pub fn garage_version() -> &'static str { + option_env!("GIT_VERSION").unwrap_or(git_version::git_version!( + prefix = "git:", + cargo_prefix = "cargo:", + fallback = "unknown" + )) +} -- cgit v1.2.3 From 28d86e76021bed674ca78684b9522cfb664a8ae2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 7 Sep 2022 17:05:21 +0200 Subject: Report build features in garage --help --- src/model/Cargo.toml | 1 + src/model/version.rs | 16 ++++++++++++++++ 2 files changed, 17 insertions(+) (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index c41d3f16..101c97d3 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -26,6 +26,7 @@ blake2 = "0.9" err-derive = "0.3" git-version = "0.3.4" hex = "0.4" +lazy_static = "1.4" base64 = "0.13" tracing = "0.1.30" rand = "0.8" diff --git a/src/model/version.rs b/src/model/version.rs index cdb3ea62..af6aa809 100644 --- a/src/model/version.rs +++ b/src/model/version.rs @@ -1,3 +1,11 @@ +use std::sync::Arc; + +use arc_swap::ArcSwapOption; + +lazy_static::lazy_static! { + static ref FEATURES: ArcSwapOption<&'static [&'static str]> = ArcSwapOption::new(None); +} + pub fn garage_version() -> &'static str { option_env!("GIT_VERSION").unwrap_or(git_version::git_version!( prefix = "git:", @@ -5,3 +13,11 @@ pub fn garage_version() -> &'static str { fallback = "unknown" )) } + +pub fn garage_features() -> Option<&'static [&'static str]> { + FEATURES.load().as_ref().map(|f| &f[..]) +} + +pub fn init_features(features: &'static [&'static str]) { + FEATURES.store(Some(Arc::new(features))); +} -- cgit v1.2.3 From f310fce34b0273f9f75e7a6ea665f51003a1f795 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 7 Sep 2022 18:30:15 +0200 Subject: Inject GIT_VERSION even later --- src/model/version.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) (limited to 'src/model') diff --git a/src/model/version.rs b/src/model/version.rs index af6aa809..b515dccc 100644 --- a/src/model/version.rs +++ b/src/model/version.rs @@ -1,23 +1,28 @@ use std::sync::Arc; -use arc_swap::ArcSwapOption; +use arc_swap::{ArcSwap, ArcSwapOption}; lazy_static::lazy_static! { + static ref VERSION: ArcSwap<&'static str> = ArcSwap::new(Arc::new(git_version::git_version!( + prefix = "git:", + cargo_prefix = "cargo:", + fallback = "unknown" + ))); static ref FEATURES: ArcSwapOption<&'static [&'static str]> = ArcSwapOption::new(None); } pub fn garage_version() -> &'static str { - option_env!("GIT_VERSION").unwrap_or(git_version::git_version!( - prefix = "git:", - cargo_prefix = "cargo:", - fallback = "unknown" - )) + &VERSION.load() } pub fn garage_features() -> Option<&'static [&'static str]> { FEATURES.load().as_ref().map(|f| &f[..]) } +pub fn init_version(version: &'static str) { + VERSION.store(Arc::new(version)); +} + pub fn init_features(features: &'static [&'static str]) { FEATURES.store(Some(Arc::new(features))); } -- cgit v1.2.3 From ceb1f0229a9c8b9f8255b4a4c70272627f0c34d7 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 7 Sep 2022 18:36:46 +0200 Subject: Move version back into util --- src/model/Cargo.toml | 2 -- src/model/lib.rs | 1 - src/model/version.rs | 28 ---------------------------- 3 files changed, 31 deletions(-) delete mode 100644 src/model/version.rs (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 101c97d3..bbcfe89c 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -24,9 +24,7 @@ async-trait = "0.1.7" arc-swap = "1.0" blake2 = "0.9" err-derive = "0.3" -git-version = "0.3.4" hex = "0.4" -lazy_static = "1.4" base64 = "0.13" tracing = "0.1.30" rand = "0.8" diff --git a/src/model/lib.rs b/src/model/lib.rs index 43db01c5..4f20ea46 100644 --- a/src/model/lib.rs +++ b/src/model/lib.rs @@ -19,4 +19,3 @@ pub mod s3; pub mod garage; pub mod helper; pub mod migrate; -pub mod version; diff --git a/src/model/version.rs b/src/model/version.rs deleted file mode 100644 index b515dccc..00000000 --- a/src/model/version.rs +++ /dev/null @@ -1,28 +0,0 @@ -use std::sync::Arc; - -use arc_swap::{ArcSwap, ArcSwapOption}; - -lazy_static::lazy_static! { - static ref VERSION: ArcSwap<&'static str> = ArcSwap::new(Arc::new(git_version::git_version!( - prefix = "git:", - cargo_prefix = "cargo:", - fallback = "unknown" - ))); - static ref FEATURES: ArcSwapOption<&'static [&'static str]> = ArcSwapOption::new(None); -} - -pub fn garage_version() -> &'static str { - &VERSION.load() -} - -pub fn garage_features() -> Option<&'static [&'static str]> { - FEATURES.load().as_ref().map(|f| &f[..]) -} - -pub fn init_version(version: &'static str) { - VERSION.store(Arc::new(version)); -} - -pub fn init_features(features: &'static [&'static str]) { - FEATURES.store(Some(Arc::new(features))); -} -- cgit v1.2.3 From 28a4af73ca8c0f82314157939fc98c46f338e84a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 13:11:44 +0200 Subject: Use netapp 0.5 published from crates.io --- src/model/Cargo.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index d6e2adfe..2c2e2bfe 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -39,8 +39,7 @@ futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } opentelemetry = "0.17" -#netapp = "0.4" -netapp = { version = "0.5", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } +netapp = "0.5" [features] k2v = [ "garage_util/k2v" ] -- cgit v1.2.3 From 07febd3ecd0d491ed01b7ca43846aa43e423b2a1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 15:57:27 +0200 Subject: Ensure data dir is created immediately when Garage starts (fix #349) --- src/model/garage.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'src/model') diff --git a/src/model/garage.rs b/src/model/garage.rs index 66c359e7..ec1ec956 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -6,7 +6,7 @@ use garage_db as db; use garage_util::background::*; use garage_util::config::*; -use garage_util::error::Error; +use garage_util::error::*; use garage_rpc::system::System; @@ -76,9 +76,14 @@ pub struct GarageK2V { impl Garage { /// Create and run garage pub fn new(config: Config, background: Arc) -> Result, Error> { + // Create meta dir and data dir if they don't exist already + std::fs::create_dir_all(&config.metadata_dir) + .ok_or_message("Unable to create Garage metadata directory")?; + std::fs::create_dir_all(&config.data_dir) + .ok_or_message("Unable to create Garage data directory")?; + info!("Opening database..."); let mut db_path = config.metadata_dir.clone(); - std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory"); let db = match config.db_engine.as_str() { // ---- Sled DB ---- #[cfg(feature = "sled")] -- cgit v1.2.3 From 38be811b1cd20d9223b481c0ea91cc7e3ee795dc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 16:08:00 +0200 Subject: Fix clippy lint that says we should implement Eq --- src/model/bucket_alias_table.rs | 2 +- src/model/bucket_table.rs | 4 ++-- src/model/index_counter.rs | 2 +- src/model/key_table.rs | 4 ++-- src/model/prev/v051/bucket_table.rs | 6 +++--- src/model/prev/v051/key_table.rs | 2 +- src/model/prev/v051/object_table.rs | 6 +++--- src/model/prev/v051/version_table.rs | 2 +- src/model/s3/block_ref_table.rs | 2 +- src/model/s3/object_table.rs | 6 +++--- src/model/s3/version_table.rs | 2 +- 11 files changed, 19 insertions(+), 19 deletions(-) (limited to 'src/model') diff --git a/src/model/bucket_alias_table.rs b/src/model/bucket_alias_table.rs index fce03d04..fcd1536e 100644 --- a/src/model/bucket_alias_table.rs +++ b/src/model/bucket_alias_table.rs @@ -7,7 +7,7 @@ use garage_table::*; /// The bucket alias table holds the names given to buckets /// in the global namespace. -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct BucketAlias { name: String, pub state: crdt::Lww>, diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 130eb6a6..7be42702 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -12,7 +12,7 @@ use crate::permission::BucketKeyPerm; /// Its parameters are not directly accessible as: /// - It must be possible to merge paramaters, hence the use of a LWW CRDT. /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Bucket { /// ID of the bucket pub id: Uuid, @@ -21,7 +21,7 @@ pub struct Bucket { } /// Configuration for a bucket -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct BucketParams { /// Bucket's creation date pub creation_date: u64, diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 26833390..e6394f0c 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -81,7 +81,7 @@ impl CounterEntry { } /// A counter entry in the global table -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct CounterValue { pub node_values: BTreeMap, } diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 7288f6e4..9d2fc783 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -9,7 +9,7 @@ use crate::permission::BucketKeyPerm; use crate::prev::v051::key_table as old; /// An api key -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Key { /// The id of the key (immutable), used as partition key pub key_id: String, @@ -19,7 +19,7 @@ pub struct Key { } /// Configuration for a key -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct KeyParams { /// The secret_key associated (immutable) pub secret_key: String, diff --git a/src/model/prev/v051/bucket_table.rs b/src/model/prev/v051/bucket_table.rs index 0c52b6ea..628a49dd 100644 --- a/src/model/prev/v051/bucket_table.rs +++ b/src/model/prev/v051/bucket_table.rs @@ -10,7 +10,7 @@ use super::key_table::PermissionSet; /// Its parameters are not directly accessible as: /// - It must be possible to merge paramaters, hence the use of a LWW CRDT. /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Bucket { /// Name of the bucket pub name: String, @@ -19,7 +19,7 @@ pub struct Bucket { } /// State of a bucket -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub enum BucketState { /// The bucket is deleted Deleted, @@ -41,7 +41,7 @@ impl Crdt for BucketState { } /// Configuration for a bucket -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct BucketParams { /// Map of key with access to the bucket, and what kind of access they give pub authorized_keys: crdt::LwwMap, diff --git a/src/model/prev/v051/key_table.rs b/src/model/prev/v051/key_table.rs index fee24741..37516b1c 100644 --- a/src/model/prev/v051/key_table.rs +++ b/src/model/prev/v051/key_table.rs @@ -4,7 +4,7 @@ use garage_table::crdt::*; use garage_table::*; /// An api key -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Key { /// The id of the key (immutable), used as partition key pub key_id: String, diff --git a/src/model/prev/v051/object_table.rs b/src/model/prev/v051/object_table.rs index cb59b309..e79e5787 100644 --- a/src/model/prev/v051/object_table.rs +++ b/src/model/prev/v051/object_table.rs @@ -6,7 +6,7 @@ use garage_util::data::*; use garage_table::crdt::*; /// An object -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Object { /// The bucket in which the object is stored, used as partition key pub bucket: String, @@ -26,7 +26,7 @@ impl Object { } /// Informations about a version of an object -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersion { /// Id of the version pub uuid: Uuid, @@ -37,7 +37,7 @@ pub struct ObjectVersion { } /// State of an object version -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionState { /// The version is being received Uploading(ObjectVersionHeaders), diff --git a/src/model/prev/v051/version_table.rs b/src/model/prev/v051/version_table.rs index 1e658f91..c11c62d5 100644 --- a/src/model/prev/v051/version_table.rs +++ b/src/model/prev/v051/version_table.rs @@ -6,7 +6,7 @@ use garage_table::crdt::*; use garage_table::*; /// A version of an object -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Version { /// UUID of the version, used as partition key pub uuid: Uuid, diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs index 9589b4aa..c7017409 100644 --- a/src/model/s3/block_ref_table.rs +++ b/src/model/s3/block_ref_table.rs @@ -10,7 +10,7 @@ use garage_table::*; use garage_block::manager::*; -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct BlockRef { /// Hash (blake2 sum) of the block, used as partition key pub block: Hash, diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index a151f1b1..26ff57f6 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -21,7 +21,7 @@ pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads"; pub const BYTES: &str = "bytes"; /// An object -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Object { /// The bucket in which the object is stored, used as partition key pub bucket_id: Uuid, @@ -70,7 +70,7 @@ impl Object { } /// Informations about a version of an object -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersion { /// Id of the version pub uuid: Uuid, @@ -81,7 +81,7 @@ pub struct ObjectVersion { } /// State of an object version -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionState { /// The version is being received Uploading(ObjectVersionHeaders), diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index b545e66a..6bc2ecd1 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -15,7 +15,7 @@ use crate::s3::block_ref_table::*; use crate::prev::v051::version_table as old; /// A version of an object -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Version { /// UUID of the version, used as partition key pub uuid: Uuid, -- cgit v1.2.3 From ab722cb40f5aacf661a280b7eb025acd3aefc1bb Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 16:22:23 +0200 Subject: Add checks on replication_factor of layouts we use (fix #363, fix #364) --- src/model/garage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/model') diff --git a/src/model/garage.rs b/src/model/garage.rs index ec1ec956..75012952 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -169,7 +169,7 @@ impl Garage { background.clone(), replication_mode.replication_factor(), &config, - ); + )?; let data_rep_param = TableShardedReplication { system: system.clone(), -- cgit v1.2.3 From 56592e18538b379ccaaa7b7c1990a599ac83b191 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 19 Sep 2022 20:12:19 +0200 Subject: RPC performance changes - configurable ping timeout - single, much higher, configurable RPC timeout - no more concurrency semaphore --- src/model/k2v/rpc.rs | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) (limited to 'src/model') diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 90101d0f..a74df277 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -23,7 +23,6 @@ use garage_rpc::system::System; use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; -use garage_table::table::TABLE_RPC_TIMEOUT; use garage_table::{PartitionKey, Table}; use crate::k2v::causality::*; @@ -117,7 +116,6 @@ impl K2VRpcHandler { }), RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(1) - .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) .await?; @@ -169,7 +167,6 @@ impl K2VRpcHandler { K2VRpc::InsertManyItems(items), RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(1) - .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) .await?; @@ -205,22 +202,23 @@ impl K2VRpcHandler { .replication .write_nodes(&poll_key.partition.hash()); - let resps = self - .system - .rpc - .try_call_many( - &self.endpoint, - &nodes[..], - K2VRpc::PollItem { - key: poll_key, - causal_context, - timeout_msec, - }, - RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.item_table.data.replication.read_quorum()) - .with_timeout(Duration::from_millis(timeout_msec) + TABLE_RPC_TIMEOUT), - ) - .await?; + let rpc = self.system.rpc.try_call_many( + &self.endpoint, + &nodes[..], + K2VRpc::PollItem { + key: poll_key, + causal_context, + timeout_msec, + }, + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(self.item_table.data.replication.read_quorum()) + .without_timeout(), + ); + let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); + let resps = select! { + r = rpc => r?, + _ = tokio::time::sleep(timeout_duration) => return Ok(None), + }; let mut resp: Option = None; for v in resps { -- cgit v1.2.3 From ded444f6c96f8ab991e762f65760b42e4d64246c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 20 Sep 2022 16:01:41 +0200 Subject: Ability to have custom timeouts in request strategy (not used) --- src/model/k2v/causality.rs | 2 +- src/model/k2v/item_table.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) (limited to 'src/model') diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs index 8c76a32b..9a692870 100644 --- a/src/model/k2v/causality.rs +++ b/src/model/k2v/causality.rs @@ -15,7 +15,7 @@ pub fn make_node_id(node_id: Uuid) -> K2VNodeId { u64::from_be_bytes(tmp) } -#[derive(PartialEq, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)] pub struct CausalContext { pub vector_clock: BTreeMap, } diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index baa1db4b..7860cb17 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -17,7 +17,7 @@ pub const CONFLICTS: &str = "conflicts"; pub const VALUES: &str = "values"; pub const BYTES: &str = "bytes"; -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct K2VItem { pub partition: K2VItemPartition, pub sort_key: String, @@ -25,19 +25,19 @@ pub struct K2VItem { items: BTreeMap, } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, Hash, Eq)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)] pub struct K2VItemPartition { pub bucket_id: Uuid, pub partition_key: String, } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] struct DvvsEntry { t_discard: u64, values: Vec<(u64, DvvsValue)>, } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub enum DvvsValue { Value(#[serde(with = "serde_bytes")] Vec), Deleted, -- cgit v1.2.3