aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/s3_copy.rs206
1 files changed, 102 insertions, 104 deletions
diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs
index f8a269c0..1b2573a1 100644
--- a/src/api/s3_copy.rs
+++ b/src/api/s3_copy.rs
@@ -1,7 +1,8 @@
+use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
-use futures::{stream, StreamExt, TryFutureExt};
+use futures::{stream, stream::Stream, StreamExt, TryFutureExt};
use md5::{Digest as Md5Digest, Md5};
use hyper::{Body, Request, Response};
@@ -316,118 +317,67 @@ pub async fn handle_upload_part_copy(
})
})
.peekable();
- let mut source_blocks = Box::pin(source_blocks);
- // Keep information about the next block we want to insert
- let mut current_offset = 0;
- let mut next_block: Option<(Vec<u8>, Option<Hash>)> = None;
+ // The defragmenter is a custom stream (defined below) that concatenates
+ // consecutive block parts when they are too small.
+ // It returns a series of (Vec<u8>, Option<Hash>).
+ // When it is done, it returns an empty vec.
+ // Same as the previous iterator, the Option is Some(_) if and only if
+ // it's an existing block of the Garage data store.
+ let mut defragmenter = Defragmenter::new(garage.config.block_size, Box::pin(source_blocks));
- // State of the defragmenter
- let config_block_size = garage.config.block_size;
- let mut defrag_buffer = vec![];
- let mut defrag_hash = None;
+ let mut current_offset = 0;
+ let mut next_block = defragmenter.next().await?;
- // We loop a step that does two things concurrently:
- // 1. if there is a block to be inserted in next_block, insert it
- // 2. grab the next block (it might be composed of several sub-blocks
- // to be concatenated to ensure defragmentation)
loop {
- let insert_current_block = async {
- if let Some((data, existing_block_hash)) = next_block {
- md5hasher.update(&data[..]);
-
- let must_upload = existing_block_hash.is_none();
- let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..]));
-
- let mut version =
- Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false);
- version.blocks.put(
- VersionBlockKey {
- part_number,
- offset: current_offset,
- },
- VersionBlock {
- hash: final_hash,
- size: data.len() as u64,
- },
- );
- current_offset += data.len() as u64;
-
- let block_ref = BlockRef {
- block: final_hash,
- version: dest_version_uuid,
- deleted: false.into(),
- };
-
- let garage2 = garage.clone();
- futures::try_join!(
- // Thing 1: if the block is not exactly a block that existed before,
- // we need to insert that data as a new block.
- async move {
- if must_upload {
- garage2.block_manager.rpc_put_block(final_hash, data).await
- } else {
- Ok(())
- }
- },
- // Thing 2: we need to insert the block in the version
- garage.version_table.insert(&version),
- // Thing 3: we need to add a block reference
- garage.block_ref_table.insert(&block_ref),
- )?;
- Ok(())
- } else {
- Ok(())
- }
+ let (data, existing_block_hash) = next_block;
+ if data.is_empty() {
+ break;
+ }
+
+ md5hasher.update(&data[..]);
+
+ let must_upload = existing_block_hash.is_none();
+ let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..]));
+
+ let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false);
+ version.blocks.put(
+ VersionBlockKey {
+ part_number,
+ offset: current_offset,
+ },
+ VersionBlock {
+ hash: final_hash,
+ size: data.len() as u64,
+ },
+ );
+ current_offset += data.len() as u64;
+
+ let block_ref = BlockRef {
+ block: final_hash,
+ version: dest_version_uuid,
+ deleted: false.into(),
};
- let get_next_block = async {
- loop {
- let tmpres: Option<&Result<(Vec<u8>, Option<Hash>), garage_util::error::Error>> =
- source_blocks.as_mut().peek().await;
- if let Some(res) = tmpres {
- let (next_block, _) = match res {
- Ok(t) => t,
- Err(_) => {
- source_blocks.next().await.unwrap()?;
- unreachable!()
- }
- };
-
- if defrag_buffer.is_empty() {
- let (next_block, next_block_hash) = source_blocks.next().await.unwrap()?;
- defrag_buffer = next_block;
- defrag_hash = next_block_hash;
- } else if defrag_buffer.len() + next_block.len() > config_block_size {
- return Ok((
- std::mem::replace(&mut defrag_buffer, vec![]),
- std::mem::replace(&mut defrag_hash, None),
- ));
- } else {
- let (next_block, _) = source_blocks.next().await.unwrap()?;
- defrag_buffer.extend(next_block);
- defrag_hash = None;
- }
+ let garage2 = garage.clone();
+ let res = futures::try_join!(
+ // Thing 1: if the block is not exactly a block that existed before,
+ // we need to insert that data as a new block.
+ async move {
+ if must_upload {
+ garage2.block_manager.rpc_put_block(final_hash, data).await
} else {
- return Ok::<_, garage_util::error::Error>((
- std::mem::replace(&mut defrag_buffer, vec![]),
- std::mem::replace(&mut defrag_hash, None),
- ));
+ Ok(())
}
- }
- };
-
- let tmp: Result<(_, (Vec<u8>, Option<Hash>)), garage_util::error::Error> = futures::try_join!(
- insert_current_block,
+ },
+ // Thing 2: we need to insert the block in the version
+ garage.version_table.insert(&version),
+ // Thing 3: we need to add a block reference
+ garage.block_ref_table.insert(&block_ref),
// Thing 4: we need to prefetch the next block
- get_next_block,
- );
- let tmp_block = tmp?.1;
-
- if tmp_block.0.is_empty() {
- break;
- }
- next_block = Some(tmp_block);
+ defragmenter.next(),
+ )?;
+ next_block = res.3;
}
let data_md5sum = md5hasher.finalize();
@@ -600,6 +550,54 @@ impl CopyPreconditionHeaders {
}
}
+type BlockStreamItemOk = (Vec<u8>, Option<Hash>);
+type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>;
+
+struct Defragmenter<S: Stream<Item = BlockStreamItem>> {
+ block_size: usize,
+ block_stream: Pin<Box<stream::Peekable<S>>>,
+ buffer: Vec<u8>,
+ hash: Option<Hash>,
+}
+
+impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
+ fn new(block_size: usize, block_stream: Pin<Box<stream::Peekable<S>>>) -> Self {
+ Self {
+ block_size,
+ block_stream,
+ buffer: vec![],
+ hash: None,
+ }
+ }
+
+ async fn next(&mut self) -> BlockStreamItem {
+ // Fill buffer while we can
+ while let Some(res) = self.block_stream.as_mut().peek().await {
+ let (peeked_next_block, _) = match res {
+ Ok(t) => t,
+ Err(_) => {
+ self.block_stream.next().await.unwrap()?;
+ unreachable!()
+ }
+ };
+
+ if self.buffer.is_empty() {
+ let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?;
+ self.buffer = next_block;
+ self.hash = next_block_hash;
+ } else if self.buffer.len() + peeked_next_block.len() > self.block_size {
+ break;
+ } else {
+ let (next_block, _) = self.block_stream.next().await.unwrap()?;
+ self.buffer.extend(next_block);
+ self.hash = None;
+ }
+ }
+
+ return Ok((std::mem::take(&mut self.buffer), self.hash.take()));
+ }
+}
+
#[derive(Debug, Serialize, PartialEq)]
pub struct CopyObjectResult {
#[serde(rename = "LastModified")]