From 605a630333c8ee60c55fe011a375c01277bba173 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 18:20:27 +0200 Subject: Use streaming in block manager --- src/api/s3/copy.rs | 12 ++++++++---- src/api/s3/get.rs | 29 ++++++++++++++++++++--------- 2 files changed, 28 insertions(+), 13 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 4415a037..54a565e0 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -5,6 +5,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::{stream, stream::Stream, StreamExt, TryFutureExt}; use md5::{Digest as Md5Digest, Md5}; +use bytes::Bytes; use hyper::{Body, Request, Response}; use serde::Serialize; @@ -311,7 +312,7 @@ pub async fn handle_upload_part_copy( stream::once(async move { let data = garage3.block_manager.rpc_get_block(&block_hash).await?; match range_to_copy { - Some(r) => Ok((data[r].to_vec(), None)), + Some(r) => Ok((data.slice(r), None)), None => Ok((data, Some(block_hash))), } }) @@ -556,7 +557,7 @@ impl CopyPreconditionHeaders { } } -type BlockStreamItemOk = (Vec, Option); +type BlockStreamItemOk = (Bytes, Option); type BlockStreamItem = Result; struct Defragmenter> { @@ -589,7 +590,7 @@ impl> Defragmenter { if self.buffer.is_empty() { let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?; - self.buffer = next_block; + self.buffer = next_block.to_vec(); // TODO TOO MUCH COPY self.hash = next_block_hash; } else if self.buffer.len() + peeked_next_block.len() > self.block_size { break; @@ -600,7 +601,10 @@ impl> Defragmenter { } } - Ok((std::mem::take(&mut self.buffer), self.hash.take())) + Ok(( + Bytes::from(std::mem::take(&mut self.buffer)), + self.hash.take(), + )) } } diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 7fa1a177..7d118f89 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -242,10 +242,13 @@ pub async fn handle_get( Ok(resp_builder.body(body)?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { - let read_first_block = garage.block_manager.rpc_get_block(first_block_hash); + let read_first_block = garage + .block_manager + .rpc_get_block_streaming(first_block_hash); let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); - let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?; + let (first_block_stream, version) = + futures::try_join!(read_first_block, get_next_blocks)?; let version = version.ok_or(Error::NoSuchKey)?; let mut blocks = version @@ -254,24 +257,32 @@ pub async fn handle_get( .iter() .map(|(_, vb)| (vb.hash, None)) .collect::>(); - blocks[0].1 = Some(first_block); + blocks[0].1 = Some(first_block_stream); let body_stream = futures::stream::iter(blocks) - .map(move |(hash, data_opt)| { + .map(move |(hash, stream_opt)| { let garage = garage.clone(); async move { - if let Some(data) = data_opt { - Ok(Bytes::from(data)) + if let Some(stream) = stream_opt { + stream } else { garage .block_manager - .rpc_get_block(&hash) + .rpc_get_block_streaming(&hash) .await - .map(Bytes::from) + .unwrap_or_else(|_| { + Box::pin(futures::stream::once(async move { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Could not get next block", + )) + })) + }) } } }) - .buffered(2); + .buffered(3) + .flatten(); let body = hyper::body::Body::wrap_stream(body_stream); Ok(resp_builder.body(body)?) -- cgit v1.2.3 From 68087ee13dc22dbaeb0c1fa8dcb4bdbaa82098a6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 19:06:56 +0200 Subject: Fix clippy --- src/api/s3/copy.rs | 5 +---- src/api/s3/get.rs | 1 - 2 files changed, 1 insertion(+), 5 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 54a565e0..b54cbd23 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -366,10 +366,7 @@ pub async fn handle_upload_part_copy( // we need to insert that data as a new block. async move { if must_upload { - garage2 - .block_manager - .rpc_put_block(final_hash, data.into()) - .await + garage2.block_manager.rpc_put_block(final_hash, data).await } else { Ok(()) } diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 7d118f89..c7621ade 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -450,7 +450,6 @@ fn body_from_blocks_range( let garage = garage.clone(); async move { let data = garage.block_manager.rpc_get_block(&block.hash).await?; - let data = Bytes::from(data); let start_in_block = if true_offset > begin { 0 } else { -- cgit v1.2.3 From bc977f9a7a7a5bd87ccf5fe96d64b397591f8ba0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 12:58:20 +0200 Subject: Update to Netapp with OrderTag support and exploit OrderTags --- src/api/s3/copy.rs | 10 ++++++++-- src/api/s3/get.rs | 21 +++++++++++++++------ 2 files changed, 23 insertions(+), 8 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index b54cbd23..10cf5935 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -9,6 +9,7 @@ use bytes::Bytes; use hyper::{Body, Request, Response}; use serde::Serialize; +use garage_rpc::rpc_helper::OrderTag; use garage_table::*; use garage_util::data::*; use garage_util::time::*; @@ -306,11 +307,16 @@ pub async fn handle_upload_part_copy( // if and only if the block returned is a block that already existed // in the Garage data store (thus we don't need to save it again). let garage2 = garage.clone(); + let order_stream = OrderTag::stream(); let source_blocks = stream::iter(blocks_to_copy) - .flat_map(|(block_hash, range_to_copy)| { + .enumerate() + .flat_map(|(i, (block_hash, range_to_copy))| { let garage3 = garage2.clone(); stream::once(async move { - let data = garage3.block_manager.rpc_get_block(&block_hash).await?; + let data = garage3 + .block_manager + .rpc_get_block(&block_hash, Some(order_stream.order(i as u64))) + .await?; match range_to_copy { Some(r) => Ok((data.slice(r), None)), None => Ok((data, Some(block_hash))), diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index c7621ade..dfc284fe 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -10,6 +10,7 @@ use http::header::{ use hyper::body::Bytes; use hyper::{Body, Request, Response, StatusCode}; +use garage_rpc::rpc_helper::OrderTag; use garage_table::EmptyKey; use garage_util::data::*; @@ -242,9 +243,11 @@ pub async fn handle_get( Ok(resp_builder.body(body)?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { + let order_stream = OrderTag::stream(); + let read_first_block = garage .block_manager - .rpc_get_block_streaming(first_block_hash); + .rpc_get_block_streaming(first_block_hash, Some(order_stream.order(0))); let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); let (first_block_stream, version) = @@ -260,7 +263,8 @@ pub async fn handle_get( blocks[0].1 = Some(first_block_stream); let body_stream = futures::stream::iter(blocks) - .map(move |(hash, stream_opt)| { + .enumerate() + .map(move |(i, (hash, stream_opt))| { let garage = garage.clone(); async move { if let Some(stream) = stream_opt { @@ -268,7 +272,7 @@ pub async fn handle_get( } else { garage .block_manager - .rpc_get_block_streaming(&hash) + .rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64))) .await .unwrap_or_else(|_| { Box::pin(futures::stream::once(async move { @@ -281,7 +285,7 @@ pub async fn handle_get( } } }) - .buffered(3) + .buffered(2) .flatten(); let body = hyper::body::Body::wrap_stream(body_stream); @@ -445,11 +449,16 @@ fn body_from_blocks_range( true_offset += b.size; } + let order_stream = OrderTag::stream(); let body_stream = futures::stream::iter(blocks) - .map(move |(block, true_offset)| { + .enumerate() + .map(move |(i, (block, true_offset))| { let garage = garage.clone(); async move { - let data = garage.block_manager.rpc_get_block(&block.hash).await?; + let data = garage + .block_manager + .rpc_get_block(&block.hash, Some(order_stream.order(i as u64))) + .await?; let start_in_block = if true_offset > begin { 0 } else { -- cgit v1.2.3 From 13b5f28c7e8dec12b1db61735931b3830a3c893f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 2 Sep 2022 13:46:42 +0200 Subject: Make use of BytesBuf from new Netapp --- src/api/s3/put.rs | 43 ++++++++++--------------------------------- 1 file changed, 10 insertions(+), 33 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index dc0530df..97b8e4e3 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; use futures::prelude::*; @@ -13,6 +13,7 @@ use opentelemetry::{ Context, }; +use garage_rpc::netapp::bytes_buf::BytesBuf; use garage_table::*; use garage_util::async_hash::*; use garage_util::data::*; @@ -108,7 +109,7 @@ pub(crate) async fn save_stream> + Unpin>( size, etag: data_md5sum_hex.clone(), }, - first_block, + first_block.to_vec(), )), }; @@ -136,7 +137,6 @@ pub(crate) async fn save_stream> + Unpin>( garage.version_table.insert(&version).await?; // Transfer data and verify checksum - let first_block = Bytes::from(first_block); let first_block_hash = async_blake2sum(first_block.clone()).await; let tx_result = (|| async { @@ -318,7 +318,6 @@ async fn read_and_put_blocks> + Unpin>( chunker.next(), )?; if let Some(block) = next_block { - let block = Bytes::from(block); let (_, _, block_hash) = futures::future::join3( md5hasher.update(block.clone()), sha256hasher.update(block.clone()), @@ -387,8 +386,7 @@ struct StreamChunker>> { stream: S, read_all: bool, block_size: usize, - buf: VecDeque, - buf_len: usize, + buf: BytesBuf, } impl> + Unpin> StreamChunker { @@ -397,45 +395,25 @@ impl> + Unpin> StreamChunker { stream, read_all: false, block_size, - buf: VecDeque::with_capacity(8), - buf_len: 0, + buf: BytesBuf::new(), } } - async fn next(&mut self) -> Result>, Error> { - while !self.read_all && self.buf_len < self.block_size { + async fn next(&mut self) -> Result, Error> { + while !self.read_all && self.buf.len() < self.block_size { if let Some(block) = self.stream.next().await { let bytes = block?; trace!("Body next: {} bytes", bytes.len()); - self.buf_len += bytes.len(); - self.buf.push_back(bytes); + self.buf.extend(bytes); } else { self.read_all = true; } } - if self.buf_len == 0 { + if self.buf.is_empty() { Ok(None) } else { - let mut slices = Vec::with_capacity(self.buf.len()); - let mut taken = 0; - while self.buf_len > 0 && taken < self.block_size { - let front = self.buf.pop_front().unwrap(); - if taken + front.len() <= self.block_size { - taken += front.len(); - self.buf_len -= front.len(); - slices.push(front); - } else { - let front_take = self.block_size - taken; - slices.push(front.slice(..front_take)); - self.buf.push_front(front.slice(front_take..)); - self.buf_len -= front_take; - break; - } - } - Ok(Some( - slices.iter().map(|x| &x[..]).collect::>().concat(), - )) + Ok(Some(self.buf.take_max(self.block_size))) } } } @@ -545,7 +523,6 @@ pub async fn handle_put_part( // Copy block to store let version = Version::new(version_uuid, bucket_id, key, false); - let first_block = Bytes::from(first_block); let first_block_hash = async_blake2sum(first_block.clone()).await; let (_, data_md5sum, data_sha256sum) = read_and_put_blocks( -- cgit v1.2.3 From 907054775dc71a10a92ab96112889db9113130ab Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Sep 2022 22:25:23 +0200 Subject: Faster copy, better get error message --- src/api/s3/copy.rs | 12 +++++------- src/api/s3/get.rs | 4 ++-- 2 files changed, 7 insertions(+), 9 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 10cf5935..a1a8c9a4 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -9,6 +9,7 @@ use bytes::Bytes; use hyper::{Body, Request, Response}; use serde::Serialize; +use garage_rpc::netapp::bytes_buf::BytesBuf; use garage_rpc::rpc_helper::OrderTag; use garage_table::*; use garage_util::data::*; @@ -566,7 +567,7 @@ type BlockStreamItem = Result; struct Defragmenter> { block_size: usize, block_stream: Pin>>, - buffer: Vec, + buffer: BytesBuf, hash: Option, } @@ -575,7 +576,7 @@ impl> Defragmenter { Self { block_size, block_stream, - buffer: vec![], + buffer: BytesBuf::new(), hash: None, } } @@ -593,7 +594,7 @@ impl> Defragmenter { if self.buffer.is_empty() { let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?; - self.buffer = next_block.to_vec(); // TODO TOO MUCH COPY + self.buffer.extend(next_block); self.hash = next_block_hash; } else if self.buffer.len() + peeked_next_block.len() > self.block_size { break; @@ -604,10 +605,7 @@ impl> Defragmenter { } } - Ok(( - Bytes::from(std::mem::take(&mut self.buffer)), - self.hash.take(), - )) + Ok((self.buffer.take_all(), self.hash.take())) } } diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index dfc284fe..dd95f6e7 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -274,11 +274,11 @@ pub async fn handle_get( .block_manager .rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64))) .await - .unwrap_or_else(|_| { + .unwrap_or_else(|e| { Box::pin(futures::stream::once(async move { Err(std::io::Error::new( std::io::ErrorKind::Other, - "Could not get next block", + format!("Could not get block {}: {}", i, e), )) })) }) -- cgit v1.2.3 From ff30891999b5be5421b80b89da1037e943179d2d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 15:13:07 +0200 Subject: Use streaming block API for get with Range requests --- src/api/s3/get.rs | 93 +++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 60 insertions(+), 33 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index dd95f6e7..ae4c287d 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -7,10 +7,9 @@ use http::header::{ ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED, RANGE, }; -use hyper::body::Bytes; use hyper::{Body, Request, Response, StatusCode}; -use garage_rpc::rpc_helper::OrderTag; +use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag}; use garage_table::EmptyKey; use garage_util::data::*; @@ -274,14 +273,7 @@ pub async fn handle_get( .block_manager .rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64))) .await - .unwrap_or_else(|e| { - Box::pin(futures::stream::once(async move { - Err(std::io::Error::new( - std::io::ErrorKind::Other, - format!("Could not get block {}: {}", i, e), - )) - })) - }) + .unwrap_or_else(|e| error_stream(i, e)) } } }) @@ -437,44 +429,79 @@ fn body_from_blocks_range( all_blocks.len(), 4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize, )); - let mut true_offset = 0; + let mut block_offset: u64 = 0; for (_, b) in all_blocks.iter() { - if true_offset >= end { + if block_offset >= end { break; } // Keep only blocks that have an intersection with the requested range - if true_offset < end && true_offset + b.size > begin { - blocks.push((*b, true_offset)); + if block_offset < end && block_offset + b.size > begin { + blocks.push((*b, block_offset)); } - true_offset += b.size; + block_offset += b.size as u64; } let order_stream = OrderTag::stream(); let body_stream = futures::stream::iter(blocks) .enumerate() - .map(move |(i, (block, true_offset))| { + .map(move |(i, (block, block_offset))| { let garage = garage.clone(); async move { - let data = garage + garage .block_manager - .rpc_get_block(&block.hash, Some(order_stream.order(i as u64))) - .await?; - let start_in_block = if true_offset > begin { - 0 - } else { - begin - true_offset - }; - let end_in_block = if true_offset + block.size < end { - block.size - } else { - end - true_offset - }; - Result::::Ok( - data.slice(start_in_block as usize..end_in_block as usize), - ) + .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) + .await + .unwrap_or_else(|e| error_stream(i, e)) + .scan(block_offset, move |chunk_offset, chunk| { + let r = match chunk { + Ok(chunk_bytes) => { + let chunk_len = chunk_bytes.len() as u64; + let r = if *chunk_offset >= end { + // The current chunk is after the part we want to read. + // Returning None here will stop the scan, the rest of the + // stream will be ignored + None + } else if *chunk_offset + chunk_len <= begin { + // The current chunk is before the part we want to read. + // We return a None that will be removed by the filter_map + // below. + Some(None) + } else { + // The chunk has an intersection with the requested range + let start_in_chunk = if *chunk_offset > begin { + 0 + } else { + begin - *chunk_offset + }; + let end_in_chunk = if *chunk_offset + chunk_len < end { + chunk_len + } else { + end - *chunk_offset + }; + Some(Some(Ok(chunk_bytes + .slice(start_in_chunk as usize..end_in_chunk as usize)))) + }; + *chunk_offset += chunk_bytes.len() as u64; + r + } + Err(e) => Some(Some(Err(e))), + }; + futures::future::ready(r) + }) + .filter_map(futures::future::ready) } }) - .buffered(2); + .buffered(2) + .flatten(); hyper::body::Body::wrap_stream(body_stream) } + +fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream { + Box::pin(futures::stream::once(async move { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Could not get block {}: {}", i, e), + )) + })) +} -- cgit v1.2.3