aboutsummaryrefslogtreecommitdiff
path: root/src/api
diff options
context:
space:
mode:
Diffstat (limited to 'src/api')
-rw-r--r--src/api/s3/copy.rs29
-rw-r--r--src/api/s3/get.rs110
-rw-r--r--src/api/s3/put.rs43
3 files changed, 105 insertions, 77 deletions
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index 4415a037..a1a8c9a4 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -5,9 +5,12 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::{stream, stream::Stream, StreamExt, TryFutureExt};
use md5::{Digest as Md5Digest, Md5};
+use bytes::Bytes;
use hyper::{Body, Request, Response};
use serde::Serialize;
+use garage_rpc::netapp::bytes_buf::BytesBuf;
+use garage_rpc::rpc_helper::OrderTag;
use garage_table::*;
use garage_util::data::*;
use garage_util::time::*;
@@ -305,13 +308,18 @@ pub async fn handle_upload_part_copy(
// if and only if the block returned is a block that already existed
// in the Garage data store (thus we don't need to save it again).
let garage2 = garage.clone();
+ let order_stream = OrderTag::stream();
let source_blocks = stream::iter(blocks_to_copy)
- .flat_map(|(block_hash, range_to_copy)| {
+ .enumerate()
+ .flat_map(|(i, (block_hash, range_to_copy))| {
let garage3 = garage2.clone();
stream::once(async move {
- let data = garage3.block_manager.rpc_get_block(&block_hash).await?;
+ let data = garage3
+ .block_manager
+ .rpc_get_block(&block_hash, Some(order_stream.order(i as u64)))
+ .await?;
match range_to_copy {
- Some(r) => Ok((data[r].to_vec(), None)),
+ Some(r) => Ok((data.slice(r), None)),
None => Ok((data, Some(block_hash))),
}
})
@@ -365,10 +373,7 @@ pub async fn handle_upload_part_copy(
// we need to insert that data as a new block.
async move {
if must_upload {
- garage2
- .block_manager
- .rpc_put_block(final_hash, data.into())
- .await
+ garage2.block_manager.rpc_put_block(final_hash, data).await
} else {
Ok(())
}
@@ -556,13 +561,13 @@ impl CopyPreconditionHeaders {
}
}
-type BlockStreamItemOk = (Vec<u8>, Option<Hash>);
+type BlockStreamItemOk = (Bytes, Option<Hash>);
type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>;
struct Defragmenter<S: Stream<Item = BlockStreamItem>> {
block_size: usize,
block_stream: Pin<Box<stream::Peekable<S>>>,
- buffer: Vec<u8>,
+ buffer: BytesBuf,
hash: Option<Hash>,
}
@@ -571,7 +576,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
Self {
block_size,
block_stream,
- buffer: vec![],
+ buffer: BytesBuf::new(),
hash: None,
}
}
@@ -589,7 +594,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
if self.buffer.is_empty() {
let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?;
- self.buffer = next_block;
+ self.buffer.extend(next_block);
self.hash = next_block_hash;
} else if self.buffer.len() + peeked_next_block.len() > self.block_size {
break;
@@ -600,7 +605,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
}
}
- Ok((std::mem::take(&mut self.buffer), self.hash.take()))
+ Ok((self.buffer.take_all(), self.hash.take()))
}
}
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index 7fa1a177..ae4c287d 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -7,9 +7,9 @@ use http::header::{
ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE,
IF_NONE_MATCH, LAST_MODIFIED, RANGE,
};
-use hyper::body::Bytes;
use hyper::{Body, Request, Response, StatusCode};
+use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag};
use garage_table::EmptyKey;
use garage_util::data::*;
@@ -242,10 +242,15 @@ pub async fn handle_get(
Ok(resp_builder.body(body)?)
}
ObjectVersionData::FirstBlock(_, first_block_hash) => {
- let read_first_block = garage.block_manager.rpc_get_block(first_block_hash);
+ let order_stream = OrderTag::stream();
+
+ let read_first_block = garage
+ .block_manager
+ .rpc_get_block_streaming(first_block_hash, Some(order_stream.order(0)));
let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey);
- let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?;
+ let (first_block_stream, version) =
+ futures::try_join!(read_first_block, get_next_blocks)?;
let version = version.ok_or(Error::NoSuchKey)?;
let mut blocks = version
@@ -254,24 +259,26 @@ pub async fn handle_get(
.iter()
.map(|(_, vb)| (vb.hash, None))
.collect::<Vec<_>>();
- blocks[0].1 = Some(first_block);
+ blocks[0].1 = Some(first_block_stream);
let body_stream = futures::stream::iter(blocks)
- .map(move |(hash, data_opt)| {
+ .enumerate()
+ .map(move |(i, (hash, stream_opt))| {
let garage = garage.clone();
async move {
- if let Some(data) = data_opt {
- Ok(Bytes::from(data))
+ if let Some(stream) = stream_opt {
+ stream
} else {
garage
.block_manager
- .rpc_get_block(&hash)
+ .rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64)))
.await
- .map(Bytes::from)
+ .unwrap_or_else(|e| error_stream(i, e))
}
}
})
- .buffered(2);
+ .buffered(2)
+ .flatten();
let body = hyper::body::Body::wrap_stream(body_stream);
Ok(resp_builder.body(body)?)
@@ -422,40 +429,79 @@ fn body_from_blocks_range(
all_blocks.len(),
4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize,
));
- let mut true_offset = 0;
+ let mut block_offset: u64 = 0;
for (_, b) in all_blocks.iter() {
- if true_offset >= end {
+ if block_offset >= end {
break;
}
// Keep only blocks that have an intersection with the requested range
- if true_offset < end && true_offset + b.size > begin {
- blocks.push((*b, true_offset));
+ if block_offset < end && block_offset + b.size > begin {
+ blocks.push((*b, block_offset));
}
- true_offset += b.size;
+ block_offset += b.size as u64;
}
+ let order_stream = OrderTag::stream();
let body_stream = futures::stream::iter(blocks)
- .map(move |(block, true_offset)| {
+ .enumerate()
+ .map(move |(i, (block, block_offset))| {
let garage = garage.clone();
async move {
- let data = garage.block_manager.rpc_get_block(&block.hash).await?;
- let data = Bytes::from(data);
- let start_in_block = if true_offset > begin {
- 0
- } else {
- begin - true_offset
- };
- let end_in_block = if true_offset + block.size < end {
- block.size
- } else {
- end - true_offset
- };
- Result::<Bytes, Error>::Ok(
- data.slice(start_in_block as usize..end_in_block as usize),
- )
+ garage
+ .block_manager
+ .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
+ .await
+ .unwrap_or_else(|e| error_stream(i, e))
+ .scan(block_offset, move |chunk_offset, chunk| {
+ let r = match chunk {
+ Ok(chunk_bytes) => {
+ let chunk_len = chunk_bytes.len() as u64;
+ let r = if *chunk_offset >= end {
+ // The current chunk is after the part we want to read.
+ // Returning None here will stop the scan, the rest of the
+ // stream will be ignored
+ None
+ } else if *chunk_offset + chunk_len <= begin {
+ // The current chunk is before the part we want to read.
+ // We return a None that will be removed by the filter_map
+ // below.
+ Some(None)
+ } else {
+ // The chunk has an intersection with the requested range
+ let start_in_chunk = if *chunk_offset > begin {
+ 0
+ } else {
+ begin - *chunk_offset
+ };
+ let end_in_chunk = if *chunk_offset + chunk_len < end {
+ chunk_len
+ } else {
+ end - *chunk_offset
+ };
+ Some(Some(Ok(chunk_bytes
+ .slice(start_in_chunk as usize..end_in_chunk as usize))))
+ };
+ *chunk_offset += chunk_bytes.len() as u64;
+ r
+ }
+ Err(e) => Some(Some(Err(e))),
+ };
+ futures::future::ready(r)
+ })
+ .filter_map(futures::future::ready)
}
})
- .buffered(2);
+ .buffered(2)
+ .flatten();
hyper::body::Body::wrap_stream(body_stream)
}
+
+fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream {
+ Box::pin(futures::stream::once(async move {
+ Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ format!("Could not get block {}: {}", i, e),
+ ))
+ }))
+}
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index dc0530df..97b8e4e3 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -1,4 +1,4 @@
-use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
+use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
use futures::prelude::*;
@@ -13,6 +13,7 @@ use opentelemetry::{
Context,
};
+use garage_rpc::netapp::bytes_buf::BytesBuf;
use garage_table::*;
use garage_util::async_hash::*;
use garage_util::data::*;
@@ -108,7 +109,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
size,
etag: data_md5sum_hex.clone(),
},
- first_block,
+ first_block.to_vec(),
)),
};
@@ -136,7 +137,6 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage.version_table.insert(&version).await?;
// Transfer data and verify checksum
- let first_block = Bytes::from(first_block);
let first_block_hash = async_blake2sum(first_block.clone()).await;
let tx_result = (|| async {
@@ -318,7 +318,6 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
chunker.next(),
)?;
if let Some(block) = next_block {
- let block = Bytes::from(block);
let (_, _, block_hash) = futures::future::join3(
md5hasher.update(block.clone()),
sha256hasher.update(block.clone()),
@@ -387,8 +386,7 @@ struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> {
stream: S,
read_all: bool,
block_size: usize,
- buf: VecDeque<Bytes>,
- buf_len: usize,
+ buf: BytesBuf,
}
impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
@@ -397,45 +395,25 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
stream,
read_all: false,
block_size,
- buf: VecDeque::with_capacity(8),
- buf_len: 0,
+ buf: BytesBuf::new(),
}
}
- async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> {
- while !self.read_all && self.buf_len < self.block_size {
+ async fn next(&mut self) -> Result<Option<Bytes>, Error> {
+ while !self.read_all && self.buf.len() < self.block_size {
if let Some(block) = self.stream.next().await {
let bytes = block?;
trace!("Body next: {} bytes", bytes.len());
- self.buf_len += bytes.len();
- self.buf.push_back(bytes);
+ self.buf.extend(bytes);
} else {
self.read_all = true;
}
}
- if self.buf_len == 0 {
+ if self.buf.is_empty() {
Ok(None)
} else {
- let mut slices = Vec::with_capacity(self.buf.len());
- let mut taken = 0;
- while self.buf_len > 0 && taken < self.block_size {
- let front = self.buf.pop_front().unwrap();
- if taken + front.len() <= self.block_size {
- taken += front.len();
- self.buf_len -= front.len();
- slices.push(front);
- } else {
- let front_take = self.block_size - taken;
- slices.push(front.slice(..front_take));
- self.buf.push_front(front.slice(front_take..));
- self.buf_len -= front_take;
- break;
- }
- }
- Ok(Some(
- slices.iter().map(|x| &x[..]).collect::<Vec<_>>().concat(),
- ))
+ Ok(Some(self.buf.take_max(self.block_size)))
}
}
}
@@ -545,7 +523,6 @@ pub async fn handle_put_part(
// Copy block to store
let version = Version::new(version_uuid, bucket_id, key, false);
- let first_block = Bytes::from(first_block);
let first_block_hash = async_blake2sum(first_block.clone()).await;
let (_, data_md5sum, data_sha256sum) = read_and_put_blocks(