diff options
Diffstat (limited to 'src/model/object_table.rs')
-rw-r--r-- | src/model/object_table.rs | 198 |
1 files changed, 198 insertions, 0 deletions
diff --git a/src/model/object_table.rs b/src/model/object_table.rs new file mode 100644 index 00000000..01df70e6 --- /dev/null +++ b/src/model/object_table.rs @@ -0,0 +1,198 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +use garage_util::background::BackgroundRunner; +use garage_util::data::*; +use garage_util::error::Error; + +use garage_table::table_sharded::*; +use garage_table::*; + +use crate::version_table::*; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Object { + // Primary key + pub bucket: String, + + // Sort key + pub key: String, + + // Data + versions: Vec<ObjectVersion>, +} + +impl Object { + pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self { + let mut ret = Self { + bucket, + key, + versions: vec![], + }; + for v in versions { + ret.add_version(v) + .expect("Twice the same ObjectVersion in Object constructor"); + } + ret + } + /// Adds a version if it wasn't already present + pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> { + match self + .versions + .binary_search_by(|v| v.cmp_key().cmp(&new.cmp_key())) + { + Err(i) => { + self.versions.insert(i, new); + Ok(()) + } + Ok(_) => Err(()), + } + } + pub fn versions(&self) -> &[ObjectVersion] { + &self.versions[..] + } +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersion { + pub uuid: UUID, + pub timestamp: u64, + + pub mime_type: String, + pub size: u64, + pub state: ObjectVersionState, + + pub data: ObjectVersionData, +} + +#[derive(PartialEq, Clone, Copy, Debug, Serialize, Deserialize)] +pub enum ObjectVersionState { + Uploading, + Complete, + Aborted, +} + +impl ObjectVersionState { + fn max(self, other: Self) -> Self { + use ObjectVersionState::*; + if self == Aborted || other == Aborted { + Aborted + } else if self == Complete || other == Complete { + Complete + } else { + Uploading + } + } +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum ObjectVersionData { + Uploading, + DeleteMarker, + Inline(#[serde(with = "serde_bytes")] Vec<u8>), + FirstBlock(Hash), +} + +impl ObjectVersion { + fn cmp_key(&self) -> (u64, UUID) { + (self.timestamp, self.uuid) + } + pub fn is_complete(&self) -> bool { + self.state == ObjectVersionState::Complete + } + pub fn is_data(&self) -> bool { + self.state == ObjectVersionState::Complete && self.data != ObjectVersionData::DeleteMarker + } +} + +impl Entry<String, String> for Object { + fn partition_key(&self) -> &String { + &self.bucket + } + fn sort_key(&self) -> &String { + &self.key + } + + fn merge(&mut self, other: &Self) { + for other_v in other.versions.iter() { + match self + .versions + .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) + { + Ok(i) => { + let mut v = &mut self.versions[i]; + if other_v.size > v.size { + v.size = other_v.size; + } + v.state = v.state.max(other_v.state); + if v.data == ObjectVersionData::Uploading { + v.data = other_v.data.clone(); + } + } + Err(i) => { + self.versions.insert(i, other_v.clone()); + } + } + } + let last_complete = self + .versions + .iter() + .enumerate() + .rev() + .filter(|(_, v)| v.is_complete()) + .next() + .map(|(vi, _)| vi); + + if let Some(last_vi) = last_complete { + self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>(); + } + } +} + +pub struct ObjectTable { + pub background: Arc<BackgroundRunner>, + pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, +} + +#[async_trait] +impl TableSchema for ObjectTable { + type P = String; + type S = String; + type E = Object; + type Filter = (); + + async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> { + let version_table = self.version_table.clone(); + if let (Some(old_v), Some(new_v)) = (old, new) { + // Propagate deletion of old versions + for v in old_v.versions.iter() { + let newly_deleted = match new_v + .versions + .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) + { + Err(_) => true, + Ok(i) => { + new_v.versions[i].state == ObjectVersionState::Aborted + && v.state != ObjectVersionState::Aborted + } + }; + if newly_deleted { + let deleted_version = Version::new( + v.uuid, + old_v.bucket.clone(), + old_v.key.clone(), + true, + vec![], + ); + version_table.insert(&deleted_version).await?; + } + } + } + Ok(()) + } + + fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { + entry.versions.iter().any(|v| v.is_data()) + } +} |