From 0f5689c16920479066277db2880e2ca87f7ca602 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Sep 2022 17:52:50 +0200 Subject: Include code from v0.5.1 directly to remove dependencies --- src/model/prev/v051/object_table.rs | 150 ++++++++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 src/model/prev/v051/object_table.rs (limited to 'src/model/prev/v051/object_table.rs') diff --git a/src/model/prev/v051/object_table.rs b/src/model/prev/v051/object_table.rs new file mode 100644 index 00000000..fe35d683 --- /dev/null +++ b/src/model/prev/v051/object_table.rs @@ -0,0 +1,150 @@ +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; + +use garage_util::data::*; + +use garage_table::crdt::*; + +/// An object +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Object { + /// The bucket in which the object is stored, used as partition key + pub bucket: String, + + /// The key at which the object is stored in its bucket, used as sorting key + pub key: String, + + /// The list of currenty stored versions of the object + versions: Vec, +} + +impl Object { + /// Get a list of currently stored versions of `Object` + pub fn versions(&self) -> &[ObjectVersion] { + &self.versions[..] + } +} + +/// Informations about a version of an object +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersion { + /// Id of the version + pub uuid: Uuid, + /// Timestamp of when the object was created + pub timestamp: u64, + /// State of the version + pub state: ObjectVersionState, +} + +/// State of an object version +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum ObjectVersionState { + /// The version is being received + Uploading(ObjectVersionHeaders), + /// The version is fully received + Complete(ObjectVersionData), + /// The version uploaded containded errors or the upload was explicitly aborted + Aborted, +} + +impl Crdt for ObjectVersionState { + fn merge(&mut self, other: &Self) { + use ObjectVersionState::*; + match other { + Aborted => { + *self = Aborted; + } + Complete(b) => match self { + Aborted => {} + Complete(a) => { + a.merge(b); + } + Uploading(_) => { + *self = Complete(b.clone()); + } + }, + Uploading(_) => {} + } + } +} + +/// Data stored in object version +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub enum ObjectVersionData { + /// The object was deleted, this Version is a tombstone to mark it as such + DeleteMarker, + /// The object is short, it's stored inlined + Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec), + /// The object is not short, Hash of first block is stored here, next segments hashes are + /// stored in the version table + FirstBlock(ObjectVersionMeta, Hash), +} + +impl AutoCrdt for ObjectVersionData { + const WARN_IF_DIFFERENT: bool = true; +} + +/// Metadata about the object version +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersionMeta { + /// Headers to send to the client + pub headers: ObjectVersionHeaders, + /// Size of the object + pub size: u64, + /// etag of the object + pub etag: String, +} + +/// Additional headers for an object +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersionHeaders { + /// Content type of the object + pub content_type: String, + /// Any other http headers to send + pub other: BTreeMap, +} + +impl ObjectVersion { + fn cmp_key(&self) -> (u64, Uuid) { + (self.timestamp, self.uuid) + } + + /// Is the object version completely received + pub fn is_complete(&self) -> bool { + matches!(self.state, ObjectVersionState::Complete(_)) + } +} + +impl Crdt for Object { + fn merge(&mut self, other: &Self) { + // Merge versions from other into here + for other_v in other.versions.iter() { + match self + .versions + .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) + { + Ok(i) => { + self.versions[i].state.merge(&other_v.state); + } + Err(i) => { + self.versions.insert(i, other_v.clone()); + } + } + } + + // Remove versions which are obsolete, i.e. those that come + // before the last version which .is_complete(). + let last_complete = self + .versions + .iter() + .enumerate() + .rev() + .find(|(_, v)| v.is_complete()) + .map(|(vi, _)| vi); + + if let Some(last_vi) = last_complete { + self.versions = self.versions.drain(last_vi..).collect::>(); + } + } +} + -- cgit v1.2.3 From 6f02c36a89d93b04944a3f0882b6f6b703d9c012 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Sep 2022 17:59:41 +0200 Subject: cargo fmt --- src/model/prev/v051/object_table.rs | 1 - 1 file changed, 1 deletion(-) (limited to 'src/model/prev/v051/object_table.rs') diff --git a/src/model/prev/v051/object_table.rs b/src/model/prev/v051/object_table.rs index fe35d683..cb59b309 100644 --- a/src/model/prev/v051/object_table.rs +++ b/src/model/prev/v051/object_table.rs @@ -147,4 +147,3 @@ impl Crdt for Object { } } } - -- cgit v1.2.3 From 38be811b1cd20d9223b481c0ea91cc7e3ee795dc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 16:08:00 +0200 Subject: Fix clippy lint that says we should implement Eq --- src/model/prev/v051/object_table.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/model/prev/v051/object_table.rs') diff --git a/src/model/prev/v051/object_table.rs b/src/model/prev/v051/object_table.rs index cb59b309..e79e5787 100644 --- a/src/model/prev/v051/object_table.rs +++ b/src/model/prev/v051/object_table.rs @@ -6,7 +6,7 @@ use garage_util::data::*; use garage_table::crdt::*; /// An object -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Object { /// The bucket in which the object is stored, used as partition key pub bucket: String, @@ -26,7 +26,7 @@ impl Object { } /// Informations about a version of an object -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersion { /// Id of the version pub uuid: Uuid, @@ -37,7 +37,7 @@ pub struct ObjectVersion { } /// State of an object version -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionState { /// The version is being received Uploading(ObjectVersionHeaders), -- cgit v1.2.3