diff options
Diffstat (limited to 'src/core/object_table.rs')
-rw-r--r-- | src/core/object_table.rs | 198 |
1 files changed, 0 insertions, 198 deletions
diff --git a/src/core/object_table.rs b/src/core/object_table.rs deleted file mode 100644 index 01df70e6..00000000 --- a/src/core/object_table.rs +++ /dev/null @@ -1,198 +0,0 @@ -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; - -use garage_util::background::BackgroundRunner; -use garage_util::data::*; -use garage_util::error::Error; - -use garage_table::table_sharded::*; -use garage_table::*; - -use crate::version_table::*; - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct Object { - // Primary key - pub bucket: String, - - // Sort key - pub key: String, - - // Data - versions: Vec<ObjectVersion>, -} - -impl Object { - pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self { - let mut ret = Self { - bucket, - key, - versions: vec![], - }; - for v in versions { - ret.add_version(v) - .expect("Twice the same ObjectVersion in Object constructor"); - } - ret - } - /// Adds a version if it wasn't already present - pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> { - match self - .versions - .binary_search_by(|v| v.cmp_key().cmp(&new.cmp_key())) - { - Err(i) => { - self.versions.insert(i, new); - Ok(()) - } - Ok(_) => Err(()), - } - } - pub fn versions(&self) -> &[ObjectVersion] { - &self.versions[..] - } -} - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersion { - pub uuid: UUID, - pub timestamp: u64, - - pub mime_type: String, - pub size: u64, - pub state: ObjectVersionState, - - pub data: ObjectVersionData, -} - -#[derive(PartialEq, Clone, Copy, Debug, Serialize, Deserialize)] -pub enum ObjectVersionState { - Uploading, - Complete, - Aborted, -} - -impl ObjectVersionState { - fn max(self, other: Self) -> Self { - use ObjectVersionState::*; - if self == Aborted || other == Aborted { - Aborted - } else if self == Complete || other == Complete { - Complete - } else { - Uploading - } - } -} - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub enum ObjectVersionData { - Uploading, - DeleteMarker, - Inline(#[serde(with = "serde_bytes")] Vec<u8>), - FirstBlock(Hash), -} - -impl ObjectVersion { - fn cmp_key(&self) -> (u64, UUID) { - (self.timestamp, self.uuid) - } - pub fn is_complete(&self) -> bool { - self.state == ObjectVersionState::Complete - } - pub fn is_data(&self) -> bool { - self.state == ObjectVersionState::Complete && self.data != ObjectVersionData::DeleteMarker - } -} - -impl Entry<String, String> for Object { - fn partition_key(&self) -> &String { - &self.bucket - } - fn sort_key(&self) -> &String { - &self.key - } - - fn merge(&mut self, other: &Self) { - for other_v in other.versions.iter() { - match self - .versions - .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) - { - Ok(i) => { - let mut v = &mut self.versions[i]; - if other_v.size > v.size { - v.size = other_v.size; - } - v.state = v.state.max(other_v.state); - if v.data == ObjectVersionData::Uploading { - v.data = other_v.data.clone(); - } - } - Err(i) => { - self.versions.insert(i, other_v.clone()); - } - } - } - let last_complete = self - .versions - .iter() - .enumerate() - .rev() - .filter(|(_, v)| v.is_complete()) - .next() - .map(|(vi, _)| vi); - - if let Some(last_vi) = last_complete { - self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>(); - } - } -} - -pub struct ObjectTable { - pub background: Arc<BackgroundRunner>, - pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, -} - -#[async_trait] -impl TableSchema for ObjectTable { - type P = String; - type S = String; - type E = Object; - type Filter = (); - - async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> { - let version_table = self.version_table.clone(); - if let (Some(old_v), Some(new_v)) = (old, new) { - // Propagate deletion of old versions - for v in old_v.versions.iter() { - let newly_deleted = match new_v - .versions - .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) - { - Err(_) => true, - Ok(i) => { - new_v.versions[i].state == ObjectVersionState::Aborted - && v.state != ObjectVersionState::Aborted - } - }; - if newly_deleted { - let deleted_version = Version::new( - v.uuid, - old_v.bucket.clone(), - old_v.key.clone(), - true, - vec![], - ); - version_table.insert(&deleted_version).await?; - } - } - } - Ok(()) - } - - fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { - entry.versions.iter().any(|v| v.is_data()) - } -} |