diff options
Diffstat (limited to 'src/api/s3')
-rw-r--r-- | src/api/s3/copy.rs | 29 | ||||
-rw-r--r-- | src/api/s3/get.rs | 110 | ||||
-rw-r--r-- | src/api/s3/put.rs | 43 |
3 files changed, 105 insertions, 77 deletions
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 4415a037..a1a8c9a4 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -5,9 +5,12 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::{stream, stream::Stream, StreamExt, TryFutureExt}; use md5::{Digest as Md5Digest, Md5}; +use bytes::Bytes; use hyper::{Body, Request, Response}; use serde::Serialize; +use garage_rpc::netapp::bytes_buf::BytesBuf; +use garage_rpc::rpc_helper::OrderTag; use garage_table::*; use garage_util::data::*; use garage_util::time::*; @@ -305,13 +308,18 @@ pub async fn handle_upload_part_copy( // if and only if the block returned is a block that already existed // in the Garage data store (thus we don't need to save it again). let garage2 = garage.clone(); + let order_stream = OrderTag::stream(); let source_blocks = stream::iter(blocks_to_copy) - .flat_map(|(block_hash, range_to_copy)| { + .enumerate() + .flat_map(|(i, (block_hash, range_to_copy))| { let garage3 = garage2.clone(); stream::once(async move { - let data = garage3.block_manager.rpc_get_block(&block_hash).await?; + let data = garage3 + .block_manager + .rpc_get_block(&block_hash, Some(order_stream.order(i as u64))) + .await?; match range_to_copy { - Some(r) => Ok((data[r].to_vec(), None)), + Some(r) => Ok((data.slice(r), None)), None => Ok((data, Some(block_hash))), } }) @@ -365,10 +373,7 @@ pub async fn handle_upload_part_copy( // we need to insert that data as a new block. async move { if must_upload { - garage2 - .block_manager - .rpc_put_block(final_hash, data.into()) - .await + garage2.block_manager.rpc_put_block(final_hash, data).await } else { Ok(()) } @@ -556,13 +561,13 @@ impl CopyPreconditionHeaders { } } -type BlockStreamItemOk = (Vec<u8>, Option<Hash>); +type BlockStreamItemOk = (Bytes, Option<Hash>); type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>; struct Defragmenter<S: Stream<Item = BlockStreamItem>> { block_size: usize, block_stream: Pin<Box<stream::Peekable<S>>>, - buffer: Vec<u8>, + buffer: BytesBuf, hash: Option<Hash>, } @@ -571,7 +576,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> { Self { block_size, block_stream, - buffer: vec![], + buffer: BytesBuf::new(), hash: None, } } @@ -589,7 +594,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> { if self.buffer.is_empty() { let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?; - self.buffer = next_block; + self.buffer.extend(next_block); self.hash = next_block_hash; } else if self.buffer.len() + peeked_next_block.len() > self.block_size { break; @@ -600,7 +605,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> { } } - Ok((std::mem::take(&mut self.buffer), self.hash.take())) + Ok((self.buffer.take_all(), self.hash.take())) } } diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 7fa1a177..ae4c287d 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -7,9 +7,9 @@ use http::header::{ ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED, RANGE, }; -use hyper::body::Bytes; use hyper::{Body, Request, Response, StatusCode}; +use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag}; use garage_table::EmptyKey; use garage_util::data::*; @@ -242,10 +242,15 @@ pub async fn handle_get( Ok(resp_builder.body(body)?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { - let read_first_block = garage.block_manager.rpc_get_block(first_block_hash); + let order_stream = OrderTag::stream(); + + let read_first_block = garage + .block_manager + .rpc_get_block_streaming(first_block_hash, Some(order_stream.order(0))); let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); - let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?; + let (first_block_stream, version) = + futures::try_join!(read_first_block, get_next_blocks)?; let version = version.ok_or(Error::NoSuchKey)?; let mut blocks = version @@ -254,24 +259,26 @@ pub async fn handle_get( .iter() .map(|(_, vb)| (vb.hash, None)) .collect::<Vec<_>>(); - blocks[0].1 = Some(first_block); + blocks[0].1 = Some(first_block_stream); let body_stream = futures::stream::iter(blocks) - .map(move |(hash, data_opt)| { + .enumerate() + .map(move |(i, (hash, stream_opt))| { let garage = garage.clone(); async move { - if let Some(data) = data_opt { - Ok(Bytes::from(data)) + if let Some(stream) = stream_opt { + stream } else { garage .block_manager - .rpc_get_block(&hash) + .rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64))) .await - .map(Bytes::from) + .unwrap_or_else(|e| error_stream(i, e)) } } }) - .buffered(2); + .buffered(2) + .flatten(); let body = hyper::body::Body::wrap_stream(body_stream); Ok(resp_builder.body(body)?) @@ -422,40 +429,79 @@ fn body_from_blocks_range( all_blocks.len(), 4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize, )); - let mut true_offset = 0; + let mut block_offset: u64 = 0; for (_, b) in all_blocks.iter() { - if true_offset >= end { + if block_offset >= end { break; } // Keep only blocks that have an intersection with the requested range - if true_offset < end && true_offset + b.size > begin { - blocks.push((*b, true_offset)); + if block_offset < end && block_offset + b.size > begin { + blocks.push((*b, block_offset)); } - true_offset += b.size; + block_offset += b.size as u64; } + let order_stream = OrderTag::stream(); let body_stream = futures::stream::iter(blocks) - .map(move |(block, true_offset)| { + .enumerate() + .map(move |(i, (block, block_offset))| { let garage = garage.clone(); async move { - let data = garage.block_manager.rpc_get_block(&block.hash).await?; - let data = Bytes::from(data); - let start_in_block = if true_offset > begin { - 0 - } else { - begin - true_offset - }; - let end_in_block = if true_offset + block.size < end { - block.size - } else { - end - true_offset - }; - Result::<Bytes, Error>::Ok( - data.slice(start_in_block as usize..end_in_block as usize), - ) + garage + .block_manager + .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) + .await + .unwrap_or_else(|e| error_stream(i, e)) + .scan(block_offset, move |chunk_offset, chunk| { + let r = match chunk { + Ok(chunk_bytes) => { + let chunk_len = chunk_bytes.len() as u64; + let r = if *chunk_offset >= end { + // The current chunk is after the part we want to read. + // Returning None here will stop the scan, the rest of the + // stream will be ignored + None + } else if *chunk_offset + chunk_len <= begin { + // The current chunk is before the part we want to read. + // We return a None that will be removed by the filter_map + // below. + Some(None) + } else { + // The chunk has an intersection with the requested range + let start_in_chunk = if *chunk_offset > begin { + 0 + } else { + begin - *chunk_offset + }; + let end_in_chunk = if *chunk_offset + chunk_len < end { + chunk_len + } else { + end - *chunk_offset + }; + Some(Some(Ok(chunk_bytes + .slice(start_in_chunk as usize..end_in_chunk as usize)))) + }; + *chunk_offset += chunk_bytes.len() as u64; + r + } + Err(e) => Some(Some(Err(e))), + }; + futures::future::ready(r) + }) + .filter_map(futures::future::ready) } }) - .buffered(2); + .buffered(2) + .flatten(); hyper::body::Body::wrap_stream(body_stream) } + +fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream { + Box::pin(futures::stream::once(async move { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Could not get block {}: {}", i, e), + )) + })) +} diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index dc0530df..97b8e4e3 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; use futures::prelude::*; @@ -13,6 +13,7 @@ use opentelemetry::{ Context, }; +use garage_rpc::netapp::bytes_buf::BytesBuf; use garage_table::*; use garage_util::async_hash::*; use garage_util::data::*; @@ -108,7 +109,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( size, etag: data_md5sum_hex.clone(), }, - first_block, + first_block.to_vec(), )), }; @@ -136,7 +137,6 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage.version_table.insert(&version).await?; // Transfer data and verify checksum - let first_block = Bytes::from(first_block); let first_block_hash = async_blake2sum(first_block.clone()).await; let tx_result = (|| async { @@ -318,7 +318,6 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( chunker.next(), )?; if let Some(block) = next_block { - let block = Bytes::from(block); let (_, _, block_hash) = futures::future::join3( md5hasher.update(block.clone()), sha256hasher.update(block.clone()), @@ -387,8 +386,7 @@ struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> { stream: S, read_all: bool, block_size: usize, - buf: VecDeque<Bytes>, - buf_len: usize, + buf: BytesBuf, } impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { @@ -397,45 +395,25 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { stream, read_all: false, block_size, - buf: VecDeque::with_capacity(8), - buf_len: 0, + buf: BytesBuf::new(), } } - async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> { - while !self.read_all && self.buf_len < self.block_size { + async fn next(&mut self) -> Result<Option<Bytes>, Error> { + while !self.read_all && self.buf.len() < self.block_size { if let Some(block) = self.stream.next().await { let bytes = block?; trace!("Body next: {} bytes", bytes.len()); - self.buf_len += bytes.len(); - self.buf.push_back(bytes); + self.buf.extend(bytes); } else { self.read_all = true; } } - if self.buf_len == 0 { + if self.buf.is_empty() { Ok(None) } else { - let mut slices = Vec::with_capacity(self.buf.len()); - let mut taken = 0; - while self.buf_len > 0 && taken < self.block_size { - let front = self.buf.pop_front().unwrap(); - if taken + front.len() <= self.block_size { - taken += front.len(); - self.buf_len -= front.len(); - slices.push(front); - } else { - let front_take = self.block_size - taken; - slices.push(front.slice(..front_take)); - self.buf.push_front(front.slice(front_take..)); - self.buf_len -= front_take; - break; - } - } - Ok(Some( - slices.iter().map(|x| &x[..]).collect::<Vec<_>>().concat(), - )) + Ok(Some(self.buf.take_max(self.block_size))) } } } @@ -545,7 +523,6 @@ pub async fn handle_put_part( // Copy block to store let version = Version::new(version_uuid, bucket_id, key, false); - let first_block = Bytes::from(first_block); let first_block_hash = async_blake2sum(first_block.clone()).await; let (_, data_md5sum, data_sha256sum) = read_and_put_blocks( |