aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/model')
-rw-r--r--src/model/Cargo.toml1
-rw-r--r--src/model/key_table.rs2
-rw-r--r--src/model/lib.rs3
-rw-r--r--src/model/migrate.rs2
-rw-r--r--src/model/prev/mod.rs1
-rw-r--r--src/model/prev/v051/bucket_table.rs63
-rw-r--r--src/model/prev/v051/key_table.rs51
-rw-r--r--src/model/prev/v051/mod.rs4
-rw-r--r--src/model/prev/v051/object_table.rs150
-rw-r--r--src/model/prev/v051/version_table.rs79
-rw-r--r--src/model/s3/object_table.rs2
-rw-r--r--src/model/s3/version_table.rs2
12 files changed, 355 insertions, 5 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index cb0017b2..bbcfe89c 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -19,7 +19,6 @@ garage_rpc = { version = "0.8.0", path = "../rpc" }
garage_table = { version = "0.8.0", path = "../table" }
garage_block = { version = "0.8.0", path = "../block" }
garage_util = { version = "0.8.0", path = "../util" }
-garage_model_050 = { package = "garage_model", version = "0.5.1" }
async-trait = "0.1.7"
arc-swap = "1.0"
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index 330e83f0..7288f6e4 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -6,7 +6,7 @@ use garage_util::data::*;
use crate::permission::BucketKeyPerm;
-use garage_model_050::key_table as old;
+use crate::prev::v051::key_table as old;
/// An api key
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
diff --git a/src/model/lib.rs b/src/model/lib.rs
index 7c9d9270..4f20ea46 100644
--- a/src/model/lib.rs
+++ b/src/model/lib.rs
@@ -1,6 +1,9 @@
#[macro_use]
extern crate tracing;
+// For migration from previous versions
+pub(crate) mod prev;
+
pub mod permission;
pub mod index_counter;
diff --git a/src/model/migrate.rs b/src/model/migrate.rs
index 5fc67069..cd6ad26a 100644
--- a/src/model/migrate.rs
+++ b/src/model/migrate.rs
@@ -5,7 +5,7 @@ use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::time::*;
-use garage_model_050::bucket_table as old_bucket;
+use crate::prev::v051::bucket_table as old_bucket;
use crate::bucket_alias_table::*;
use crate::bucket_table::*;
diff --git a/src/model/prev/mod.rs b/src/model/prev/mod.rs
new file mode 100644
index 00000000..68bb1502
--- /dev/null
+++ b/src/model/prev/mod.rs
@@ -0,0 +1 @@
+pub(crate) mod v051;
diff --git a/src/model/prev/v051/bucket_table.rs b/src/model/prev/v051/bucket_table.rs
new file mode 100644
index 00000000..0c52b6ea
--- /dev/null
+++ b/src/model/prev/v051/bucket_table.rs
@@ -0,0 +1,63 @@
+use serde::{Deserialize, Serialize};
+
+use garage_table::crdt::Crdt;
+use garage_table::*;
+
+use super::key_table::PermissionSet;
+
+/// A bucket is a collection of objects
+///
+/// Its parameters are not directly accessible as:
+/// - It must be possible to merge paramaters, hence the use of a LWW CRDT.
+/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct Bucket {
+ /// Name of the bucket
+ pub name: String,
+ /// State, and configuration if not deleted, of the bucket
+ pub state: crdt::Lww<BucketState>,
+}
+
+/// State of a bucket
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub enum BucketState {
+ /// The bucket is deleted
+ Deleted,
+ /// The bucket exists
+ Present(BucketParams),
+}
+
+impl Crdt for BucketState {
+ fn merge(&mut self, o: &Self) {
+ match o {
+ BucketState::Deleted => *self = BucketState::Deleted,
+ BucketState::Present(other_params) => {
+ if let BucketState::Present(params) = self {
+ params.merge(other_params);
+ }
+ }
+ }
+ }
+}
+
+/// Configuration for a bucket
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct BucketParams {
+ /// Map of key with access to the bucket, and what kind of access they give
+ pub authorized_keys: crdt::LwwMap<String, PermissionSet>,
+ /// Is the bucket served as http
+ pub website: crdt::Lww<bool>,
+}
+
+impl Crdt for BucketParams {
+ fn merge(&mut self, o: &Self) {
+ self.authorized_keys.merge(&o.authorized_keys);
+ self.website.merge(&o.website);
+ }
+}
+
+impl Crdt for Bucket {
+ fn merge(&mut self, other: &Self) {
+ self.state.merge(&other.state);
+ }
+}
diff --git a/src/model/prev/v051/key_table.rs b/src/model/prev/v051/key_table.rs
new file mode 100644
index 00000000..dab6caa7
--- /dev/null
+++ b/src/model/prev/v051/key_table.rs
@@ -0,0 +1,51 @@
+use serde::{Deserialize, Serialize};
+
+use garage_table::crdt::*;
+use garage_table::*;
+
+/// An api key
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct Key {
+ /// The id of the key (immutable), used as partition key
+ pub key_id: String,
+
+ /// The secret_key associated
+ pub secret_key: String,
+
+ /// Name for the key
+ pub name: crdt::Lww<String>,
+
+ /// Is the key deleted
+ pub deleted: crdt::Bool,
+
+ /// Buckets in which the key is authorized. Empty if `Key` is deleted
+ // CRDT interaction: deleted implies authorized_buckets is empty
+ pub authorized_buckets: crdt::LwwMap<String, PermissionSet>,
+}
+
+/// Permission given to a key in a bucket
+#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+pub struct PermissionSet {
+ /// The key can be used to read the bucket
+ pub allow_read: bool,
+ /// The key can be used to write in the bucket
+ pub allow_write: bool,
+}
+
+impl AutoCrdt for PermissionSet {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl Crdt for Key {
+ fn merge(&mut self, other: &Self) {
+ self.name.merge(&other.name);
+ self.deleted.merge(&other.deleted);
+
+ if self.deleted.get() {
+ self.authorized_buckets.clear();
+ } else {
+ self.authorized_buckets.merge(&other.authorized_buckets);
+ }
+ }
+}
+
diff --git a/src/model/prev/v051/mod.rs b/src/model/prev/v051/mod.rs
new file mode 100644
index 00000000..7a954752
--- /dev/null
+++ b/src/model/prev/v051/mod.rs
@@ -0,0 +1,4 @@
+pub(crate) mod bucket_table;
+pub(crate) mod key_table;
+pub(crate) mod object_table;
+pub(crate) mod version_table;
diff --git a/src/model/prev/v051/object_table.rs b/src/model/prev/v051/object_table.rs
new file mode 100644
index 00000000..fe35d683
--- /dev/null
+++ b/src/model/prev/v051/object_table.rs
@@ -0,0 +1,150 @@
+use serde::{Deserialize, Serialize};
+use std::collections::BTreeMap;
+
+use garage_util::data::*;
+
+use garage_table::crdt::*;
+
+/// An object
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct Object {
+ /// The bucket in which the object is stored, used as partition key
+ pub bucket: String,
+
+ /// The key at which the object is stored in its bucket, used as sorting key
+ pub key: String,
+
+ /// The list of currenty stored versions of the object
+ versions: Vec<ObjectVersion>,
+}
+
+impl Object {
+ /// Get a list of currently stored versions of `Object`
+ pub fn versions(&self) -> &[ObjectVersion] {
+ &self.versions[..]
+ }
+}
+
+/// Informations about a version of an object
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct ObjectVersion {
+ /// Id of the version
+ pub uuid: Uuid,
+ /// Timestamp of when the object was created
+ pub timestamp: u64,
+ /// State of the version
+ pub state: ObjectVersionState,
+}
+
+/// State of an object version
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub enum ObjectVersionState {
+ /// The version is being received
+ Uploading(ObjectVersionHeaders),
+ /// The version is fully received
+ Complete(ObjectVersionData),
+ /// The version uploaded containded errors or the upload was explicitly aborted
+ Aborted,
+}
+
+impl Crdt for ObjectVersionState {
+ fn merge(&mut self, other: &Self) {
+ use ObjectVersionState::*;
+ match other {
+ Aborted => {
+ *self = Aborted;
+ }
+ Complete(b) => match self {
+ Aborted => {}
+ Complete(a) => {
+ a.merge(b);
+ }
+ Uploading(_) => {
+ *self = Complete(b.clone());
+ }
+ },
+ Uploading(_) => {}
+ }
+ }
+}
+
+/// Data stored in object version
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+pub enum ObjectVersionData {
+ /// The object was deleted, this Version is a tombstone to mark it as such
+ DeleteMarker,
+ /// The object is short, it's stored inlined
+ Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
+ /// The object is not short, Hash of first block is stored here, next segments hashes are
+ /// stored in the version table
+ FirstBlock(ObjectVersionMeta, Hash),
+}
+
+impl AutoCrdt for ObjectVersionData {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+/// Metadata about the object version
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+pub struct ObjectVersionMeta {
+ /// Headers to send to the client
+ pub headers: ObjectVersionHeaders,
+ /// Size of the object
+ pub size: u64,
+ /// etag of the object
+ pub etag: String,
+}
+
+/// Additional headers for an object
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+pub struct ObjectVersionHeaders {
+ /// Content type of the object
+ pub content_type: String,
+ /// Any other http headers to send
+ pub other: BTreeMap<String, String>,
+}
+
+impl ObjectVersion {
+ fn cmp_key(&self) -> (u64, Uuid) {
+ (self.timestamp, self.uuid)
+ }
+
+ /// Is the object version completely received
+ pub fn is_complete(&self) -> bool {
+ matches!(self.state, ObjectVersionState::Complete(_))
+ }
+}
+
+impl Crdt for Object {
+ fn merge(&mut self, other: &Self) {
+ // Merge versions from other into here
+ for other_v in other.versions.iter() {
+ match self
+ .versions
+ .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key()))
+ {
+ Ok(i) => {
+ self.versions[i].state.merge(&other_v.state);
+ }
+ Err(i) => {
+ self.versions.insert(i, other_v.clone());
+ }
+ }
+ }
+
+ // Remove versions which are obsolete, i.e. those that come
+ // before the last version which .is_complete().
+ let last_complete = self
+ .versions
+ .iter()
+ .enumerate()
+ .rev()
+ .find(|(_, v)| v.is_complete())
+ .map(|(vi, _)| vi);
+
+ if let Some(last_vi) = last_complete {
+ self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>();
+ }
+ }
+}
+
diff --git a/src/model/prev/v051/version_table.rs b/src/model/prev/v051/version_table.rs
new file mode 100644
index 00000000..1e658f91
--- /dev/null
+++ b/src/model/prev/v051/version_table.rs
@@ -0,0 +1,79 @@
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::*;
+
+use garage_table::crdt::*;
+use garage_table::*;
+
+/// A version of an object
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct Version {
+ /// UUID of the version, used as partition key
+ pub uuid: Uuid,
+
+ // Actual data: the blocks for this version
+ // In the case of a multipart upload, also store the etags
+ // of individual parts and check them when doing CompleteMultipartUpload
+ /// Is this version deleted
+ pub deleted: crdt::Bool,
+ /// list of blocks of data composing the version
+ pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
+ /// Etag of each part in case of a multipart upload, empty otherwise
+ pub parts_etags: crdt::Map<u64, String>,
+
+ // Back link to bucket+key so that we can figure if
+ // this was deleted later on
+ /// Bucket in which the related object is stored
+ pub bucket: String,
+ /// Key in which the related object is stored
+ pub key: String,
+}
+
+#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
+pub struct VersionBlockKey {
+ /// Number of the part
+ pub part_number: u64,
+ /// Offset of this sub-segment in its part
+ pub offset: u64,
+}
+
+impl Ord for VersionBlockKey {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ self.part_number
+ .cmp(&other.part_number)
+ .then(self.offset.cmp(&other.offset))
+ }
+}
+
+impl PartialOrd for VersionBlockKey {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+/// Informations about a single block
+#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
+pub struct VersionBlock {
+ /// Blake2 sum of the block
+ pub hash: Hash,
+ /// Size of the block
+ pub size: u64,
+}
+
+impl AutoCrdt for VersionBlock {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl Crdt for Version {
+ fn merge(&mut self, other: &Self) {
+ self.deleted.merge(&other.deleted);
+
+ if self.deleted.get() {
+ self.blocks.clear();
+ self.parts_etags.clear();
+ } else {
+ self.blocks.merge(&other.blocks);
+ self.parts_etags.merge(&other.parts_etags);
+ }
+ }
+}
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs
index a3914c36..a151f1b1 100644
--- a/src/model/s3/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -14,7 +14,7 @@ use garage_table::*;
use crate::index_counter::*;
use crate::s3::version_table::*;
-use garage_model_050::object_table as old;
+use crate::prev::v051::object_table as old;
pub const OBJECTS: &str = "objects";
pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads";
diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs
index 881c245a..b545e66a 100644
--- a/src/model/s3/version_table.rs
+++ b/src/model/s3/version_table.rs
@@ -12,7 +12,7 @@ use garage_table::*;
use crate::s3::block_ref_table::*;
-use garage_model_050::version_table as old;
+use crate::prev::v051::version_table as old;
/// A version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]