diff options
Diffstat (limited to 'src/api/s3_put.rs')
-rw-r--r-- | src/api/s3_put.rs | 190 |
1 files changed, 190 insertions, 0 deletions
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs new file mode 100644 index 00000000..15c2ccb7 --- /dev/null +++ b/src/api/s3_put.rs @@ -0,0 +1,190 @@ +use std::collections::VecDeque; +use std::sync::Arc; + +use futures::stream::*; +use hyper::Body; + +use garage_util::data::*; +use garage_util::error::Error; + +use garage_core::block::INLINE_THRESHOLD; +use garage_core::block_ref_table::*; +use garage_core::garage::Garage; +use garage_core::object_table::*; +use garage_core::version_table::*; + +pub async fn handle_put( + garage: Arc<Garage>, + mime_type: &str, + bucket: &str, + key: &str, + body: Body, +) -> Result<UUID, Error> { + let version_uuid = gen_uuid(); + + let mut chunker = BodyChunker::new(body, garage.config.block_size); + let first_block = match chunker.next().await? { + Some(x) => x, + None => return Err(Error::BadRequest(format!("Empty body"))), + }; + + let mut object_version = ObjectVersion { + uuid: version_uuid, + timestamp: now_msec(), + mime_type: mime_type.to_string(), + size: first_block.len() as u64, + is_complete: false, + data: ObjectVersionData::DeleteMarker, + }; + + if first_block.len() < INLINE_THRESHOLD { + object_version.data = ObjectVersionData::Inline(first_block); + object_version.is_complete = true; + + let object = Object::new(bucket.into(), key.into(), vec![object_version]); + garage.object_table.insert(&object).await?; + return Ok(version_uuid); + } + + let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]); + + let first_block_hash = hash(&first_block[..]); + object_version.data = ObjectVersionData::FirstBlock(first_block_hash); + let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]); + garage.object_table.insert(&object).await?; + + let mut next_offset = first_block.len(); + let mut put_curr_version_block = put_block_meta(garage.clone(), &version, 0, first_block_hash); + let mut put_curr_block = garage + .block_manager + .rpc_put_block(first_block_hash, first_block); + + loop { + let (_, _, next_block) = + futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; + if let Some(block) = next_block { + let block_hash = hash(&block[..]); + let block_len = block.len(); + put_curr_version_block = + put_block_meta(garage.clone(), &version, next_offset as u64, block_hash); + put_curr_block = garage.block_manager.rpc_put_block(block_hash, block); + next_offset += block_len; + } else { + break; + } + } + + // TODO: if at any step we have an error, we should undo everything we did + + object_version.is_complete = true; + object_version.size = next_offset as u64; + + let object = Object::new(bucket.into(), key.into(), vec![object_version]); + garage.object_table.insert(&object).await?; + + Ok(version_uuid) +} + +async fn put_block_meta( + garage: Arc<Garage>, + version: &Version, + offset: u64, + hash: Hash, +) -> Result<(), Error> { + // TODO: don't clone, restart from empty block list ?? + let mut version = version.clone(); + version.add_block(VersionBlock { offset, hash }).unwrap(); + + let block_ref = BlockRef { + block: hash, + version: version.uuid, + deleted: false, + }; + + futures::try_join!( + garage.version_table.insert(&version), + garage.block_ref_table.insert(&block_ref), + )?; + Ok(()) +} + +struct BodyChunker { + body: Body, + read_all: bool, + block_size: usize, + buf: VecDeque<u8>, +} + +impl BodyChunker { + fn new(body: Body, block_size: usize) -> Self { + Self { + body, + read_all: false, + block_size, + buf: VecDeque::new(), + } + } + async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> { + while !self.read_all && self.buf.len() < self.block_size { + if let Some(block) = self.body.next().await { + let bytes = block?; + trace!("Body next: {} bytes", bytes.len()); + self.buf.extend(&bytes[..]); + } else { + self.read_all = true; + } + } + if self.buf.len() == 0 { + Ok(None) + } else if self.buf.len() <= self.block_size { + let block = self.buf.drain(..).collect::<Vec<u8>>(); + Ok(Some(block)) + } else { + let block = self.buf.drain(..self.block_size).collect::<Vec<u8>>(); + Ok(Some(block)) + } + } +} + +pub async fn handle_delete(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<UUID, Error> { + let exists = match garage + .object_table + .get(&bucket.to_string(), &key.to_string()) + .await? + { + None => false, + Some(o) => { + let mut has_active_version = false; + for v in o.versions().iter() { + if v.data != ObjectVersionData::DeleteMarker { + has_active_version = true; + break; + } + } + has_active_version + } + }; + + if !exists { + // No need to delete + return Ok([0u8; 32].into()); + } + + let version_uuid = gen_uuid(); + + let object = Object::new( + bucket.into(), + key.into(), + vec![ObjectVersion { + uuid: version_uuid, + timestamp: now_msec(), + mime_type: "application/x-delete-marker".into(), + size: 0, + is_complete: true, + data: ObjectVersionData::DeleteMarker, + }], + ); + + garage.object_table.insert(&object).await?; + return Ok(version_uuid); +} |