From 5768bf362262f78376af14517c4921941986192e Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 10 May 2022 13:16:57 +0200 Subject: First implementation of K2V (#293) **Specification:** View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md) - [x] Specify the structure of K2V triples - [x] Specify the DVVS format used for causality detection - [x] Specify the K2V index (just a counter of number of values per partition key) - [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem - [x] Specify index endpoint: ReadIndex - [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch - [x] Move to JSON objects instead of tuples - [x] Specify endpoints for polling for updates on single values (PollItem) **Implementation:** - [x] Table for K2V items, causal contexts - [x] Indexing mechanism and table for K2V index - [x] Make API handlers a bit more generic - [x] K2V API endpoint - [x] K2V API router - [x] ReadItem - [x] InsertItem - [x] DeleteItem - [x] PollItem - [x] ReadIndex - [x] InsertBatch - [x] ReadBatch - [x] DeleteBatch **Testing:** - [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values) - [x] Actual tests: - [x] Adapt testing framework - [x] Simple test with InsertItem + ReadItem - [x] Test with several Insert/Read/DeleteItem + ReadIndex - [x] Test all combinations of return formats for ReadItem - [x] Test with ReadBatch, InsertBatch, DeleteBatch - [x] Test with PollItem - [x] Test error codes - [ ] Fix most broken stuff - [x] test PollItem broken randomly - [x] when invalid causality tokens are given, errors should be 4xx not 5xx **Improvements:** - [x] Descending range queries - [x] Specify - [x] Implement - [x] Add test - [x] Batch updates to index counter - [x] Put K2V behind `k2v` feature flag Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293 Co-authored-by: Alex Co-committed-by: Alex --- src/model/s3/block_ref_table.rs | 74 +++++++++ src/model/s3/mod.rs | 3 + src/model/s3/object_table.rs | 337 ++++++++++++++++++++++++++++++++++++++++ src/model/s3/version_table.rs | 207 ++++++++++++++++++++++++ 4 files changed, 621 insertions(+) create mode 100644 src/model/s3/block_ref_table.rs create mode 100644 src/model/s3/mod.rs create mode 100644 src/model/s3/object_table.rs create mode 100644 src/model/s3/version_table.rs (limited to 'src/model/s3') diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs new file mode 100644 index 00000000..9b3991bf --- /dev/null +++ b/src/model/s3/block_ref_table.rs @@ -0,0 +1,74 @@ +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +use garage_util::data::*; + +use garage_table::crdt::Crdt; +use garage_table::*; + +use garage_block::manager::*; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct BlockRef { + /// Hash (blake2 sum) of the block, used as partition key + pub block: Hash, + + /// Id of the Version for the object containing this block, used as sorting key + pub version: Uuid, + + // Keep track of deleted status + /// Is the Version that contains this block deleted + pub deleted: crdt::Bool, +} + +impl Entry for BlockRef { + fn partition_key(&self) -> &Hash { + &self.block + } + fn sort_key(&self) -> &Uuid { + &self.version + } + fn is_tombstone(&self) -> bool { + self.deleted.get() + } +} + +impl Crdt for BlockRef { + fn merge(&mut self, other: &Self) { + self.deleted.merge(&other.deleted); + } +} + +pub struct BlockRefTable { + pub block_manager: Arc, +} + +impl TableSchema for BlockRefTable { + const TABLE_NAME: &'static str = "block_ref"; + + type P = Hash; + type S = Uuid; + type E = BlockRef; + type Filter = DeletedFilter; + + fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + #[allow(clippy::or_fun_call)] + let block = &old.or(new).unwrap().block; + let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false); + let is_after = new.map(|x| !x.deleted.get()).unwrap_or(false); + if is_after && !was_before { + if let Err(e) = self.block_manager.block_incref(block) { + warn!("block_incref failed for block {:?}: {}", block, e); + } + } + if was_before && !is_after { + if let Err(e) = self.block_manager.block_decref(block) { + warn!("block_decref failed for block {:?}: {}", block, e); + } + } + } + + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + filter.apply(entry.deleted.get()) + } +} diff --git a/src/model/s3/mod.rs b/src/model/s3/mod.rs new file mode 100644 index 00000000..4e94337d --- /dev/null +++ b/src/model/s3/mod.rs @@ -0,0 +1,3 @@ +pub mod block_ref_table; +pub mod object_table; +pub mod version_table; diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs new file mode 100644 index 00000000..3d9a89f7 --- /dev/null +++ b/src/model/s3/object_table.rs @@ -0,0 +1,337 @@ +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::sync::Arc; + +use garage_util::background::BackgroundRunner; +use garage_util::data::*; + +use garage_table::crdt::*; +use garage_table::replication::TableShardedReplication; +use garage_table::*; + +use crate::s3::version_table::*; + +use garage_model_050::object_table as old; + +/// An object +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Object { + /// The bucket in which the object is stored, used as partition key + pub bucket_id: Uuid, + + /// The key at which the object is stored in its bucket, used as sorting key + pub key: String, + + /// The list of currenty stored versions of the object + versions: Vec, +} + +impl Object { + /// Initialize an Object struct from parts + pub fn new(bucket_id: Uuid, key: String, versions: Vec) -> Self { + let mut ret = Self { + bucket_id, + key, + versions: vec![], + }; + for v in versions { + ret.add_version(v) + .expect("Twice the same ObjectVersion in Object constructor"); + } + ret + } + + /// Adds a version if it wasn't already present + #[allow(clippy::result_unit_err)] + pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> { + match self + .versions + .binary_search_by(|v| v.cmp_key().cmp(&new.cmp_key())) + { + Err(i) => { + self.versions.insert(i, new); + Ok(()) + } + Ok(_) => Err(()), + } + } + + /// Get a list of currently stored versions of `Object` + pub fn versions(&self) -> &[ObjectVersion] { + &self.versions[..] + } +} + +/// Informations about a version of an object +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersion { + /// Id of the version + pub uuid: Uuid, + /// Timestamp of when the object was created + pub timestamp: u64, + /// State of the version + pub state: ObjectVersionState, +} + +/// State of an object version +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum ObjectVersionState { + /// The version is being received + Uploading(ObjectVersionHeaders), + /// The version is fully received + Complete(ObjectVersionData), + /// The version uploaded containded errors or the upload was explicitly aborted + Aborted, +} + +impl Crdt for ObjectVersionState { + fn merge(&mut self, other: &Self) { + use ObjectVersionState::*; + match other { + Aborted => { + *self = Aborted; + } + Complete(b) => match self { + Aborted => {} + Complete(a) => { + a.merge(b); + } + Uploading(_) => { + *self = Complete(b.clone()); + } + }, + Uploading(_) => {} + } + } +} + +/// Data stored in object version +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub enum ObjectVersionData { + /// The object was deleted, this Version is a tombstone to mark it as such + DeleteMarker, + /// The object is short, it's stored inlined + Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec), + /// The object is not short, Hash of first block is stored here, next segments hashes are + /// stored in the version table + FirstBlock(ObjectVersionMeta, Hash), +} + +impl AutoCrdt for ObjectVersionData { + const WARN_IF_DIFFERENT: bool = true; +} + +/// Metadata about the object version +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersionMeta { + /// Headers to send to the client + pub headers: ObjectVersionHeaders, + /// Size of the object + pub size: u64, + /// etag of the object + pub etag: String, +} + +/// Additional headers for an object +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersionHeaders { + /// Content type of the object + pub content_type: String, + /// Any other http headers to send + pub other: BTreeMap, +} + +impl ObjectVersion { + fn cmp_key(&self) -> (u64, Uuid) { + (self.timestamp, self.uuid) + } + + /// Is the object version currently being uploaded + pub fn is_uploading(&self) -> bool { + matches!(self.state, ObjectVersionState::Uploading(_)) + } + + /// Is the object version completely received + pub fn is_complete(&self) -> bool { + matches!(self.state, ObjectVersionState::Complete(_)) + } + + /// Is the object version available (received and not a tombstone) + pub fn is_data(&self) -> bool { + match self.state { + ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false, + ObjectVersionState::Complete(_) => true, + _ => false, + } + } +} + +impl Entry for Object { + fn partition_key(&self) -> &Uuid { + &self.bucket_id + } + fn sort_key(&self) -> &String { + &self.key + } + fn is_tombstone(&self) -> bool { + self.versions.len() == 1 + && self.versions[0].state + == ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) + } +} + +impl Crdt for Object { + fn merge(&mut self, other: &Self) { + // Merge versions from other into here + for other_v in other.versions.iter() { + match self + .versions + .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) + { + Ok(i) => { + self.versions[i].state.merge(&other_v.state); + } + Err(i) => { + self.versions.insert(i, other_v.clone()); + } + } + } + + // Remove versions which are obsolete, i.e. those that come + // before the last version which .is_complete(). + let last_complete = self + .versions + .iter() + .enumerate() + .rev() + .find(|(_, v)| v.is_complete()) + .map(|(vi, _)| vi); + + if let Some(last_vi) = last_complete { + self.versions = self.versions.drain(last_vi..).collect::>(); + } + } +} + +pub struct ObjectTable { + pub background: Arc, + pub version_table: Arc>, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub enum ObjectFilter { + IsData, + IsUploading, +} + +impl TableSchema for ObjectTable { + const TABLE_NAME: &'static str = "object"; + + type P = Uuid; + type S = String; + type E = Object; + type Filter = ObjectFilter; + + fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + let version_table = self.version_table.clone(); + let old = old.cloned(); + let new = new.cloned(); + + self.background.spawn(async move { + if let (Some(old_v), Some(new_v)) = (old, new) { + // Propagate deletion of old versions + for v in old_v.versions.iter() { + let newly_deleted = match new_v + .versions + .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) + { + Err(_) => true, + Ok(i) => { + new_v.versions[i].state == ObjectVersionState::Aborted + && v.state != ObjectVersionState::Aborted + } + }; + if newly_deleted { + let deleted_version = + Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); + version_table.insert(&deleted_version).await?; + } + } + } + Ok(()) + }) + } + + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + match filter { + ObjectFilter::IsData => entry.versions.iter().any(|v| v.is_data()), + ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()), + } + } + + fn try_migrate(bytes: &[u8]) -> Option { + let old_obj = rmp_serde::decode::from_read_ref::<_, old::Object>(bytes).ok()?; + Some(migrate_object(old_obj)) + } +} + +// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv +// (we just want to change bucket into bucket_id by hashing it) + +fn migrate_object(o: old::Object) -> Object { + let versions = o + .versions() + .iter() + .cloned() + .map(migrate_object_version) + .collect(); + Object { + bucket_id: blake2sum(o.bucket.as_bytes()), + key: o.key, + versions, + } +} + +fn migrate_object_version(v: old::ObjectVersion) -> ObjectVersion { + ObjectVersion { + uuid: Uuid::try_from(v.uuid.as_slice()).unwrap(), + timestamp: v.timestamp, + state: match v.state { + old::ObjectVersionState::Uploading(h) => { + ObjectVersionState::Uploading(migrate_object_version_headers(h)) + } + old::ObjectVersionState::Complete(d) => { + ObjectVersionState::Complete(migrate_object_version_data(d)) + } + old::ObjectVersionState::Aborted => ObjectVersionState::Aborted, + }, + } +} + +fn migrate_object_version_headers(h: old::ObjectVersionHeaders) -> ObjectVersionHeaders { + ObjectVersionHeaders { + content_type: h.content_type, + other: h.other, + } +} + +fn migrate_object_version_data(d: old::ObjectVersionData) -> ObjectVersionData { + match d { + old::ObjectVersionData::DeleteMarker => ObjectVersionData::DeleteMarker, + old::ObjectVersionData::Inline(m, b) => { + ObjectVersionData::Inline(migrate_object_version_meta(m), b) + } + old::ObjectVersionData::FirstBlock(m, h) => ObjectVersionData::FirstBlock( + migrate_object_version_meta(m), + Hash::try_from(h.as_slice()).unwrap(), + ), + } +} + +fn migrate_object_version_meta(m: old::ObjectVersionMeta) -> ObjectVersionMeta { + ObjectVersionMeta { + headers: migrate_object_version_headers(m.headers), + size: m.size, + etag: m.etag, + } +} diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs new file mode 100644 index 00000000..ad096772 --- /dev/null +++ b/src/model/s3/version_table.rs @@ -0,0 +1,207 @@ +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +use garage_util::background::BackgroundRunner; +use garage_util::data::*; + +use garage_table::crdt::*; +use garage_table::replication::TableShardedReplication; +use garage_table::*; + +use crate::s3::block_ref_table::*; + +use garage_model_050::version_table as old; + +/// A version of an object +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Version { + /// UUID of the version, used as partition key + pub uuid: Uuid, + + // Actual data: the blocks for this version + // In the case of a multipart upload, also store the etags + // of individual parts and check them when doing CompleteMultipartUpload + /// Is this version deleted + pub deleted: crdt::Bool, + /// list of blocks of data composing the version + pub blocks: crdt::Map, + /// Etag of each part in case of a multipart upload, empty otherwise + pub parts_etags: crdt::Map, + + // Back link to bucket+key so that we can figure if + // this was deleted later on + /// Bucket in which the related object is stored + pub bucket_id: Uuid, + /// Key in which the related object is stored + pub key: String, +} + +impl Version { + pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self { + Self { + uuid, + deleted: deleted.into(), + blocks: crdt::Map::new(), + parts_etags: crdt::Map::new(), + bucket_id, + key, + } + } + + pub fn has_part_number(&self, part_number: u64) -> bool { + let case1 = self + .parts_etags + .items() + .binary_search_by(|(k, _)| k.cmp(&part_number)) + .is_ok(); + let case2 = self + .blocks + .items() + .binary_search_by(|(k, _)| k.part_number.cmp(&part_number)) + .is_ok(); + case1 || case2 + } +} + +#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] +pub struct VersionBlockKey { + /// Number of the part + pub part_number: u64, + /// Offset of this sub-segment in its part + pub offset: u64, +} + +impl Ord for VersionBlockKey { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.part_number + .cmp(&other.part_number) + .then(self.offset.cmp(&other.offset)) + } +} + +impl PartialOrd for VersionBlockKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// Informations about a single block +#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] +pub struct VersionBlock { + /// Blake2 sum of the block + pub hash: Hash, + /// Size of the block + pub size: u64, +} + +impl AutoCrdt for VersionBlock { + const WARN_IF_DIFFERENT: bool = true; +} + +impl Entry for Version { + fn partition_key(&self) -> &Uuid { + &self.uuid + } + fn sort_key(&self) -> &EmptyKey { + &EmptyKey + } + fn is_tombstone(&self) -> bool { + self.deleted.get() + } +} + +impl Crdt for Version { + fn merge(&mut self, other: &Self) { + self.deleted.merge(&other.deleted); + + if self.deleted.get() { + self.blocks.clear(); + self.parts_etags.clear(); + } else { + self.blocks.merge(&other.blocks); + self.parts_etags.merge(&other.parts_etags); + } + } +} + +pub struct VersionTable { + pub background: Arc, + pub block_ref_table: Arc>, +} + +impl TableSchema for VersionTable { + const TABLE_NAME: &'static str = "version"; + + type P = Uuid; + type S = EmptyKey; + type E = Version; + type Filter = DeletedFilter; + + fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + let block_ref_table = self.block_ref_table.clone(); + let old = old.cloned(); + let new = new.cloned(); + + self.background.spawn(async move { + if let (Some(old_v), Some(new_v)) = (old, new) { + // Propagate deletion of version blocks + if new_v.deleted.get() && !old_v.deleted.get() { + let deleted_block_refs = old_v + .blocks + .items() + .iter() + .map(|(_k, vb)| BlockRef { + block: vb.hash, + version: old_v.uuid, + deleted: true.into(), + }) + .collect::>(); + block_ref_table.insert_many(&deleted_block_refs[..]).await?; + } + } + Ok(()) + }) + } + + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + filter.apply(entry.deleted.get()) + } + + fn try_migrate(bytes: &[u8]) -> Option { + let old = rmp_serde::decode::from_read_ref::<_, old::Version>(bytes).ok()?; + + let blocks = old + .blocks + .items() + .iter() + .map(|(k, v)| { + ( + VersionBlockKey { + part_number: k.part_number, + offset: k.offset, + }, + VersionBlock { + hash: Hash::try_from(v.hash.as_slice()).unwrap(), + size: v.size, + }, + ) + }) + .collect::>(); + + let parts_etags = old + .parts_etags + .items() + .iter() + .map(|(k, v)| (*k, v.clone())) + .collect::>(); + + Some(Version { + uuid: Hash::try_from(old.uuid.as_slice()).unwrap(), + deleted: crdt::Bool::new(old.deleted.get()), + blocks, + parts_etags, + bucket_id: blake2sum(old.bucket.as_bytes()), + key: old.key, + }) + } +} -- cgit v1.2.3 From b44d3fc796484a50cd6854f20c9b46e5fddedc9d Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 8 Jun 2022 10:01:44 +0200 Subject: Abstract database behind generic interface and implement alternative drivers (#322) - [x] Design interface - [x] Implement Sled backend - [x] Re-implement the SledCountedTree hack ~~on Sled backend~~ on all backends (i.e. over the abstraction) - [x] Convert Garage code to use generic interface - [x] Proof-read converted Garage code - [ ] Test everything well - [x] Implement sqlite backend - [x] Implement LMDB backend - [ ] (Implement Persy backend?) - [ ] (Implement other backends? (like RocksDB, ...)) - [x] Implement backend choice in config file and garage server module - [x] Add CLI for converting between DB formats - Exploit the new interface to put more things in transactions - [x] `.updated()` trigger on Garage tables Fix #284 **Bugs** - [x] When exporting sqlite, trees iterate empty?? - [x] LMDB doesn't work **Known issues for various back-ends** - Sled: - Eats all my RAM and also all my disk space - `.len()` has to traverse the whole table - Is actually quite slow on some operations - And is actually pretty bad code... - Sqlite: - Requires a lock to be taken on all operations. The lock is also taken when iterating on a table with `.iter()`, and the lock isn't released until the iterator is dropped. This means that we must be VERY carefull to not do anything else inside a `.iter()` loop or else we will have a deadlock! Most such cases have been eliminated from the Garage codebase, but there might still be some that remain. If your Garage-over-Sqlite seems to hang/freeze, this is the reason. - (adapter uses a bunch of unsafe code) - Heed (LMDB): - Not suited for 32-bit machines as it has to map the whole DB in memory. - (adpater uses a tiny bit of unsafe code) **My recommendation:** avoid 32-bit machines and use LMDB as much as possible. **Converting databases** is actually quite easy. For example from Sled to LMDB: ```bash cd src/db cargo run --features cli --bin convert -- -i path/to/garage/meta/db -a sled -o path/to/garage/meta/db.lmdb -b lmdb ``` Then, just add this to your `config.toml`: ```toml db_engine = "lmdb" ``` Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/322 Co-authored-by: Alex Co-committed-by: Alex --- src/model/s3/block_ref_table.rs | 21 ++++++++++++--------- src/model/s3/object_table.rs | 12 ++++++++++-- src/model/s3/version_table.rs | 13 +++++++++++-- 3 files changed, 33 insertions(+), 13 deletions(-) (limited to 'src/model/s3') diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs index 9b3991bf..9589b4aa 100644 --- a/src/model/s3/block_ref_table.rs +++ b/src/model/s3/block_ref_table.rs @@ -1,6 +1,8 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; +use garage_db as db; + use garage_util::data::*; use garage_table::crdt::Crdt; @@ -51,21 +53,22 @@ impl TableSchema for BlockRefTable { type E = BlockRef; type Filter = DeletedFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { - #[allow(clippy::or_fun_call)] - let block = &old.or(new).unwrap().block; + fn updated( + &self, + tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { + let block = old.or(new).unwrap().block; let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false); let is_after = new.map(|x| !x.deleted.get()).unwrap_or(false); if is_after && !was_before { - if let Err(e) = self.block_manager.block_incref(block) { - warn!("block_incref failed for block {:?}: {}", block, e); - } + self.block_manager.block_incref(tx, block)?; } if was_before && !is_after { - if let Err(e) = self.block_manager.block_decref(block) { - warn!("block_decref failed for block {:?}: {}", block, e); - } + self.block_manager.block_decref(tx, block)?; } + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 3d9a89f7..62f5d8d9 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::sync::Arc; +use garage_db as db; + use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -232,7 +234,12 @@ impl TableSchema for ObjectTable { type E = Object; type Filter = ObjectFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + fn updated( + &self, + _tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { let version_table = self.version_table.clone(); let old = old.cloned(); let new = new.cloned(); @@ -259,7 +266,8 @@ impl TableSchema for ObjectTable { } } Ok(()) - }) + }); + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index ad096772..881c245a 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -1,6 +1,8 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; +use garage_db as db; + use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -137,7 +139,12 @@ impl TableSchema for VersionTable { type E = Version; type Filter = DeletedFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + fn updated( + &self, + _tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { let block_ref_table = self.block_ref_table.clone(); let old = old.cloned(); let new = new.cloned(); @@ -160,7 +167,9 @@ impl TableSchema for VersionTable { } } Ok(()) - }) + }); + + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { -- cgit v1.2.3 From 77e3fd6db2c9cd3a10889bd071e95ef839cfbefc Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 15 Jun 2022 20:20:28 +0200 Subject: improve internal item counter mechanisms and implement bucket quotas (#326) - [x] Refactoring of internal counting API - [x] Repair procedure for counters (it's an offline procedure!!!) - [x] New counter for objects in buckets - [x] Add quotas to buckets struct - [x] Add CLI to manage bucket quotas - [x] Add admin API to manage bucket quotas - [x] Apply quotas by adding checks on put operations - [x] Proof-read Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/326 Co-authored-by: Alex Co-committed-by: Alex --- src/model/s3/object_table.rs | 61 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) (limited to 'src/model/s3') diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 62f5d8d9..a3914c36 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -11,10 +11,15 @@ use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; use garage_table::*; +use crate::index_counter::*; use crate::s3::version_table::*; use garage_model_050::object_table as old; +pub const OBJECTS: &str = "objects"; +pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads"; +pub const BYTES: &str = "bytes"; + /// An object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { @@ -218,6 +223,7 @@ impl Crdt for Object { pub struct ObjectTable { pub background: Arc, pub version_table: Arc>, + pub object_counter_table: Arc>, } #[derive(Clone, Copy, Debug, Serialize, Deserialize)] @@ -236,10 +242,20 @@ impl TableSchema for ObjectTable { fn updated( &self, - _tx: &mut db::Transaction, + tx: &mut db::Transaction, old: Option<&Self::E>, new: Option<&Self::E>, ) -> db::TxOpResult<()> { + // 1. Count + let counter_res = self.object_counter_table.count(tx, old, new); + if let Err(e) = db::unabort(counter_res)? { + error!( + "Unable to update object counter: {}. Index values will be wrong!", + e + ); + } + + // 2. Spawn threads that propagates deletions to version table let version_table = self.version_table.clone(); let old = old.cloned(); let new = new.cloned(); @@ -283,6 +299,49 @@ impl TableSchema for ObjectTable { } } +impl CountedItem for Object { + const COUNTER_TABLE_NAME: &'static str = "bucket_object_counter"; + + // Partition key = bucket id + type CP = Uuid; + // Sort key = nothing + type CS = EmptyKey; + + fn counter_partition_key(&self) -> &Uuid { + &self.bucket_id + } + fn counter_sort_key(&self) -> &EmptyKey { + &EmptyKey + } + + fn counts(&self) -> Vec<(&'static str, i64)> { + let versions = self.versions(); + let n_objects = if versions.iter().any(|v| v.is_data()) { + 1 + } else { + 0 + }; + let n_unfinished_uploads = versions + .iter() + .filter(|v| matches!(v.state, ObjectVersionState::Uploading(_))) + .count(); + let n_bytes = versions + .iter() + .map(|v| match &v.state { + ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) + | ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta.size, + _ => 0, + }) + .sum::(); + + vec![ + (OBJECTS, n_objects), + (UNFINISHED_UPLOADS, n_unfinished_uploads as i64), + (BYTES, n_bytes as i64), + ] + } +} + // vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv // (we just want to change bucket into bucket_id by hashing it) -- cgit v1.2.3 From 0f5689c16920479066277db2880e2ca87f7ca602 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Sep 2022 17:52:50 +0200 Subject: Include code from v0.5.1 directly to remove dependencies --- src/model/s3/object_table.rs | 2 +- src/model/s3/version_table.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'src/model/s3') diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index a3914c36..a151f1b1 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -14,7 +14,7 @@ use garage_table::*; use crate::index_counter::*; use crate::s3::version_table::*; -use garage_model_050::object_table as old; +use crate::prev::v051::object_table as old; pub const OBJECTS: &str = "objects"; pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads"; diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index 881c245a..b545e66a 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -12,7 +12,7 @@ use garage_table::*; use crate::s3::block_ref_table::*; -use garage_model_050::version_table as old; +use crate::prev::v051::version_table as old; /// A version of an object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -- cgit v1.2.3 From 38be811b1cd20d9223b481c0ea91cc7e3ee795dc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 16:08:00 +0200 Subject: Fix clippy lint that says we should implement Eq --- src/model/s3/block_ref_table.rs | 2 +- src/model/s3/object_table.rs | 6 +++--- src/model/s3/version_table.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) (limited to 'src/model/s3') diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs index 9589b4aa..c7017409 100644 --- a/src/model/s3/block_ref_table.rs +++ b/src/model/s3/block_ref_table.rs @@ -10,7 +10,7 @@ use garage_table::*; use garage_block::manager::*; -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct BlockRef { /// Hash (blake2 sum) of the block, used as partition key pub block: Hash, diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index a151f1b1..26ff57f6 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -21,7 +21,7 @@ pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads"; pub const BYTES: &str = "bytes"; /// An object -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Object { /// The bucket in which the object is stored, used as partition key pub bucket_id: Uuid, @@ -70,7 +70,7 @@ impl Object { } /// Informations about a version of an object -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersion { /// Id of the version pub uuid: Uuid, @@ -81,7 +81,7 @@ pub struct ObjectVersion { } /// State of an object version -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionState { /// The version is being received Uploading(ObjectVersionHeaders), diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index b545e66a..6bc2ecd1 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -15,7 +15,7 @@ use crate::s3::block_ref_table::*; use crate::prev::v051::version_table as old; /// A version of an object -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Version { /// UUID of the version, used as partition key pub uuid: Uuid, -- cgit v1.2.3