aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3_copy.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3_copy.rs')
-rw-r--r--src/api/s3_copy.rs185
1 files changed, 122 insertions, 63 deletions
diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs
index f92dfcf1..f8a269c0 100644
--- a/src/api/s3_copy.rs
+++ b/src/api/s3_copy.rs
@@ -1,7 +1,7 @@
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
-use futures::TryFutureExt;
+use futures::{stream, StreamExt, TryFutureExt};
use md5::{Digest as Md5Digest, Md5};
use hyper::{Body, Request, Response};
@@ -298,77 +298,136 @@ pub async fn handle_upload_part_copy(
// Now, actually copy the blocks
let mut md5hasher = Md5::new();
- let mut block = Some(
- garage
- .block_manager
- .rpc_get_block(&blocks_to_copy[0].0)
- .await?,
- );
+ // First, create a stream that is able to read the source blocks
+ // and extract the subrange if necessary.
+ // The second returned value is an Option<Hash>, that is Some
+ // if and only if the block returned is a block that already existed
+ // in the Garage data store (thus we don't need to save it again).
+ let garage2 = garage.clone();
+ let source_blocks = stream::iter(blocks_to_copy)
+ .flat_map(|(block_hash, range_to_copy)| {
+ let garage3 = garage2.clone();
+ stream::once(async move {
+ let data = garage3.block_manager.rpc_get_block(&block_hash).await?;
+ match range_to_copy {
+ Some(r) => Ok((data[r].to_vec(), None)),
+ None => Ok((data, Some(block_hash))),
+ }
+ })
+ })
+ .peekable();
+ let mut source_blocks = Box::pin(source_blocks);
+ // Keep information about the next block we want to insert
let mut current_offset = 0;
- for (i, (block_hash, range_to_copy)) in blocks_to_copy.iter().enumerate() {
- let (current_block, subrange_hash) = match range_to_copy.clone() {
- Some(r) => {
- let subrange = block.take().unwrap()[r].to_vec();
- let hash = blake2sum(&subrange);
- (subrange, hash)
+ let mut next_block: Option<(Vec<u8>, Option<Hash>)> = None;
+
+ // State of the defragmenter
+ let config_block_size = garage.config.block_size;
+ let mut defrag_buffer = vec![];
+ let mut defrag_hash = None;
+
+ // We loop a step that does two things concurrently:
+ // 1. if there is a block to be inserted in next_block, insert it
+ // 2. grab the next block (it might be composed of several sub-blocks
+ // to be concatenated to ensure defragmentation)
+ loop {
+ let insert_current_block = async {
+ if let Some((data, existing_block_hash)) = next_block {
+ md5hasher.update(&data[..]);
+
+ let must_upload = existing_block_hash.is_none();
+ let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..]));
+
+ let mut version =
+ Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false);
+ version.blocks.put(
+ VersionBlockKey {
+ part_number,
+ offset: current_offset,
+ },
+ VersionBlock {
+ hash: final_hash,
+ size: data.len() as u64,
+ },
+ );
+ current_offset += data.len() as u64;
+
+ let block_ref = BlockRef {
+ block: final_hash,
+ version: dest_version_uuid,
+ deleted: false.into(),
+ };
+
+ let garage2 = garage.clone();
+ futures::try_join!(
+ // Thing 1: if the block is not exactly a block that existed before,
+ // we need to insert that data as a new block.
+ async move {
+ if must_upload {
+ garage2.block_manager.rpc_put_block(final_hash, data).await
+ } else {
+ Ok(())
+ }
+ },
+ // Thing 2: we need to insert the block in the version
+ garage.version_table.insert(&version),
+ // Thing 3: we need to add a block reference
+ garage.block_ref_table.insert(&block_ref),
+ )?;
+ Ok(())
+ } else {
+ Ok(())
}
- None => (block.take().unwrap(), *block_hash),
- };
- md5hasher.update(&current_block[..]);
-
- let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false);
- version.blocks.put(
- VersionBlockKey {
- part_number,
- offset: current_offset,
- },
- VersionBlock {
- hash: subrange_hash,
- size: current_block.len() as u64,
- },
- );
- current_offset += current_block.len() as u64;
-
- let block_ref = BlockRef {
- block: subrange_hash,
- version: dest_version_uuid,
- deleted: false.into(),
};
- let next_block_hash = blocks_to_copy.get(i + 1).map(|(h, _)| *h);
-
- let garage2 = garage.clone();
- let garage3 = garage.clone();
- let is_subrange = range_to_copy.is_some();
-
- let (_, _, _, next_block) = futures::try_join!(
- // Thing 1: if we are taking a subrange of the source block,
- // we need to insert that subrange as a new block.
- async move {
- if is_subrange {
- garage2
- .block_manager
- .rpc_put_block(subrange_hash, current_block)
- .await
+ let get_next_block = async {
+ loop {
+ let tmpres: Option<&Result<(Vec<u8>, Option<Hash>), garage_util::error::Error>> =
+ source_blocks.as_mut().peek().await;
+ if let Some(res) = tmpres {
+ let (next_block, _) = match res {
+ Ok(t) => t,
+ Err(_) => {
+ source_blocks.next().await.unwrap()?;
+ unreachable!()
+ }
+ };
+
+ if defrag_buffer.is_empty() {
+ let (next_block, next_block_hash) = source_blocks.next().await.unwrap()?;
+ defrag_buffer = next_block;
+ defrag_hash = next_block_hash;
+ } else if defrag_buffer.len() + next_block.len() > config_block_size {
+ return Ok((
+ std::mem::replace(&mut defrag_buffer, vec![]),
+ std::mem::replace(&mut defrag_hash, None),
+ ));
+ } else {
+ let (next_block, _) = source_blocks.next().await.unwrap()?;
+ defrag_buffer.extend(next_block);
+ defrag_hash = None;
+ }
} else {
- Ok(())
+ return Ok::<_, garage_util::error::Error>((
+ std::mem::replace(&mut defrag_buffer, vec![]),
+ std::mem::replace(&mut defrag_hash, None),
+ ));
}
- },
- // Thing 2: we need to insert the block in the version
- garage.version_table.insert(&version),
- // Thing 3: we need to add a block reference
- garage.block_ref_table.insert(&block_ref),
+ }
+ };
+
+ let tmp: Result<(_, (Vec<u8>, Option<Hash>)), garage_util::error::Error> = futures::try_join!(
+ insert_current_block,
// Thing 4: we need to prefetch the next block
- async move {
- match next_block_hash {
- Some(h) => Ok(Some(garage3.block_manager.rpc_get_block(&h).await?)),
- None => Ok(None),
- }
- },
- )?;
+ get_next_block,
+ );
+ let tmp_block = tmp?.1;
- block = next_block;
+ if tmp_block.0.is_empty() {
+ break;
+ }
+ next_block = Some(tmp_block);
}
let data_md5sum = md5hasher.finalize();