From 9cb870f950504b142e2954d2e1eb76929aaf689f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sun, 26 Apr 2020 18:55:13 +0000 Subject: Prepare for multipart uploads --- src/api/api_server.rs | 5 ++--- src/api/s3_get.rs | 4 ++-- src/api/s3_list.rs | 15 ++++++--------- src/api/s3_put.rs | 22 +++++++++++++++------- src/core/object_table.rs | 43 ++++++++++++++++++++++++++++++++----------- src/core/version_table.rs | 1 + 6 files changed, 58 insertions(+), 32 deletions(-) (limited to 'src') diff --git a/src/api/api_server.rs b/src/api/api_server.rs index 4413c7d2..37c95f92 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -134,10 +134,9 @@ async fn handler_inner( .body(empty_body) .unwrap(); Ok(response) - }, + } &Method::DELETE => Err(Error::Forbidden( - "Cannot delete buckets using S3 api, please talk to Garage directly" - .into(), + "Cannot delete buckets using S3 api, please talk to Garage directly".into(), )), &Method::GET => { let mut params = HashMap::new(); diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs index b933f549..dbdac03c 100644 --- a/src/api/s3_get.rs +++ b/src/api/s3_get.rs @@ -43,7 +43,7 @@ pub async fn handle_head( .versions() .iter() .rev() - .filter(|v| v.is_complete && v.data != ObjectVersionData::DeleteMarker) + .filter(|v| v.is_data()) .next() { Some(v) => v, @@ -76,7 +76,7 @@ pub async fn handle_get( .versions() .iter() .rev() - .filter(|v| v.is_complete) + .filter(|v| v.is_complete()) .next() { Some(v) => v, diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index b4ee0919..184168f3 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -8,7 +8,6 @@ use hyper::Response; use garage_util::error::Error; use garage_core::garage::Garage; -use garage_core::object_table::*; use crate::api_server::BodyType; use crate::http_util::*; @@ -44,11 +43,7 @@ pub async fn handle_list( ) .await?; for object in objects.iter() { - if let Some(version) = object - .versions() - .iter() - .find(|x| x.is_complete && x.data != ObjectVersionData::DeleteMarker) - { + if let Some(version) = object.versions().iter().find(|x| x.is_data()) { if !object.key.starts_with(prefix) { truncated = false; break; @@ -56,7 +51,7 @@ pub async fn handle_list( let common_prefix = if delimiter.len() > 0 { let relative_key = &object.key[prefix.len()..]; match relative_key.find(delimiter) { - Some(i) => Some(&object.key[..prefix.len()+i+delimiter.len()]), + Some(i) => Some(&object.key[..prefix.len() + i + delimiter.len()]), None => None, } } else { @@ -70,7 +65,9 @@ pub async fn handle_list( last_modified: version.timestamp, size: version.size, }, - Some(_lri) => return Err(Error::Message(format!("Duplicate key?? {}", object.key))), + Some(_lri) => { + return Err(Error::Message(format!("Duplicate key?? {}", object.key))) + } }; result_keys.insert(object.key.clone(), info); }; @@ -117,7 +114,7 @@ pub async fn handle_list( writeln!(&mut xml, "").unwrap(); Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes())))) -} +} fn xml_escape(s: &str) -> String { s.replace("<", "<") .replace(">", ">") diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 15c2ccb7..e6130af6 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -33,13 +33,13 @@ pub async fn handle_put( timestamp: now_msec(), mime_type: mime_type.to_string(), size: first_block.len() as u64, - is_complete: false, + state: ObjectVersionState::Uploading, data: ObjectVersionData::DeleteMarker, }; if first_block.len() < INLINE_THRESHOLD { object_version.data = ObjectVersionData::Inline(first_block); - object_version.is_complete = true; + object_version.state = ObjectVersionState::Complete; let object = Object::new(bucket.into(), key.into(), vec![object_version]); garage.object_table.insert(&object).await?; @@ -54,7 +54,8 @@ pub async fn handle_put( garage.object_table.insert(&object).await?; let mut next_offset = first_block.len(); - let mut put_curr_version_block = put_block_meta(garage.clone(), &version, 0, first_block_hash); + let mut put_curr_version_block = + put_block_meta(garage.clone(), &version, 0, 0, first_block_hash); let mut put_curr_block = garage .block_manager .rpc_put_block(first_block_hash, first_block); @@ -66,7 +67,7 @@ pub async fn handle_put( let block_hash = hash(&block[..]); let block_len = block.len(); put_curr_version_block = - put_block_meta(garage.clone(), &version, next_offset as u64, block_hash); + put_block_meta(garage.clone(), &version, 0, next_offset as u64, block_hash); put_curr_block = garage.block_manager.rpc_put_block(block_hash, block); next_offset += block_len; } else { @@ -76,7 +77,7 @@ pub async fn handle_put( // TODO: if at any step we have an error, we should undo everything we did - object_version.is_complete = true; + object_version.state = ObjectVersionState::Complete; object_version.size = next_offset as u64; let object = Object::new(bucket.into(), key.into(), vec![object_version]); @@ -88,12 +89,19 @@ pub async fn handle_put( async fn put_block_meta( garage: Arc, version: &Version, + part_number: u64, offset: u64, hash: Hash, ) -> Result<(), Error> { // TODO: don't clone, restart from empty block list ?? let mut version = version.clone(); - version.add_block(VersionBlock { offset, hash }).unwrap(); + version + .add_block(VersionBlock { + part_number, + offset, + hash, + }) + .unwrap(); let block_ref = BlockRef { block: hash, @@ -180,7 +188,7 @@ pub async fn handle_delete(garage: Arc, bucket: &str, key: &str) -> Resu timestamp: now_msec(), mime_type: "application/x-delete-marker".into(), size: 0, - is_complete: true, + state: ObjectVersionState::Complete, data: ObjectVersionData::DeleteMarker, }], ); diff --git a/src/core/object_table.rs b/src/core/object_table.rs index e4c78524..98e8d636 100644 --- a/src/core/object_table.rs +++ b/src/core/object_table.rs @@ -61,11 +61,31 @@ pub struct ObjectVersion { pub mime_type: String, pub size: u64, - pub is_complete: bool, + pub state: ObjectVersionState, pub data: ObjectVersionData, } +#[derive(PartialEq, Clone, Copy, Debug, Serialize, Deserialize)] +pub enum ObjectVersionState { + Uploading, + Complete, + Aborted, +} + +impl ObjectVersionState { + fn max(self, other: Self) -> Self { + use ObjectVersionState::*; + if self == Aborted || other == Aborted { + Aborted + } else if self == Complete || other == Complete { + Complete + } else { + Uploading + } + } +} + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionData { DeleteMarker, @@ -74,8 +94,14 @@ pub enum ObjectVersionData { } impl ObjectVersion { - fn cmp_key(&self) -> (u64, &UUID) { - (self.timestamp, &self.uuid) + fn cmp_key(&self) -> (u64, UUID) { + (self.timestamp, self.uuid) + } + pub fn is_complete(&self) -> bool { + self.state == ObjectVersionState::Complete + } + pub fn is_data(&self) -> bool { + self.state == ObjectVersionState::Complete && self.data != ObjectVersionData::DeleteMarker } } @@ -98,9 +124,7 @@ impl Entry for Object { if other_v.size > v.size { v.size = other_v.size; } - if other_v.is_complete && !v.is_complete { - v.is_complete = true; - } + v.state = v.state.max(other_v.state); } Err(i) => { self.versions.insert(i, other_v.clone()); @@ -112,7 +136,7 @@ impl Entry for Object { .iter() .enumerate() .rev() - .filter(|(_, v)| v.is_complete) + .filter(|(_, v)| v.is_complete()) .next() .map(|(vi, _)| vi); @@ -159,9 +183,6 @@ impl TableSchema for ObjectTable { } fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { - entry - .versions - .iter() - .any(|x| x.is_complete && x.data != ObjectVersionData::DeleteMarker) + entry.versions.iter().any(|v| v.is_data()) } } diff --git a/src/core/version_table.rs b/src/core/version_table.rs index ae32e5cb..cd4448ad 100644 --- a/src/core/version_table.rs +++ b/src/core/version_table.rs @@ -64,6 +64,7 @@ impl Version { #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct VersionBlock { + pub part_number: u64, pub offset: u64, pub hash: Hash, } -- cgit v1.2.3