diff options
Diffstat (limited to 'src/store/object_table.rs')
-rw-r--r-- | src/store/object_table.rs | 134 |
1 files changed, 134 insertions, 0 deletions
diff --git a/src/store/object_table.rs b/src/store/object_table.rs new file mode 100644 index 00000000..97de0cdb --- /dev/null +++ b/src/store/object_table.rs @@ -0,0 +1,134 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +use crate::background::BackgroundRunner; +use crate::data::*; +use crate::error::Error; + +use crate::table::table_sharded::*; +use crate::table::*; + +use crate::store::version_table::*; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Object { + // Primary key + pub bucket: String, + + // Sort key + pub key: String, + + // Data + pub versions: Vec<Box<ObjectVersion>>, +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersion { + pub uuid: UUID, + pub timestamp: u64, + + pub mime_type: String, + pub size: u64, + pub is_complete: bool, + + pub data: ObjectVersionData, +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum ObjectVersionData { + DeleteMarker, + Inline(#[serde(with = "serde_bytes")] Vec<u8>), + FirstBlock(Hash), +} + +impl ObjectVersion { + fn cmp_key(&self) -> (u64, &UUID) { + (self.timestamp, &self.uuid) + } +} + +impl Entry<String, String> for Object { + fn partition_key(&self) -> &String { + &self.bucket + } + fn sort_key(&self) -> &String { + &self.key + } + + fn merge(&mut self, other: &Self) { + for other_v in other.versions.iter() { + match self + .versions + .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) + { + Ok(i) => { + let mut v = &mut self.versions[i]; + if other_v.size > v.size { + v.size = other_v.size; + } + if other_v.is_complete && !v.is_complete { + v.is_complete = true; + } + } + Err(i) => { + self.versions.insert(i, other_v.clone()); + } + } + } + let last_complete = self + .versions + .iter() + .enumerate() + .rev() + .filter(|(_, v)| v.is_complete) + .next() + .map(|(vi, _)| vi); + + if let Some(last_vi) = last_complete { + self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>(); + } + } +} + +pub struct ObjectTable { + pub background: Arc<BackgroundRunner>, + pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, +} + +#[async_trait] +impl TableSchema for ObjectTable { + type P = String; + type S = String; + type E = Object; + type Filter = (); + + async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> { + let version_table = self.version_table.clone(); + if let (Some(old_v), Some(new_v)) = (old, new) { + // Propagate deletion of old versions + for v in old_v.versions.iter() { + if new_v + .versions + .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) + .is_err() + { + let deleted_version = Version { + uuid: v.uuid, + deleted: true, + blocks: vec![], + bucket: old_v.bucket.clone(), + key: old_v.key.clone(), + }; + version_table.insert(&deleted_version).await?; + } + } + } + Ok(()) + } + + fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { + // TODO + true + } +} |