diff options
author | Alex Auvolat <alex@adnab.me> | 2020-12-12 16:05:28 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-12-12 16:05:28 +0100 |
commit | 0b3084ca5ff7884f149f679c6dc391bab46d902d (patch) | |
tree | 9e2797fea45bb0fe5f55afc205366d45c2c3b0dd /src | |
parent | cbd10c1b0a1325fbd7cf91a43ea0044ba1aae409 (diff) | |
parent | 022b386a5085cad79d649a82846c41cad730920b (diff) | |
download | garage-0b3084ca5ff7884f149f679c6dc391bab46d902d.tar.gz garage-0b3084ca5ff7884f149f679c6dc391bab46d902d.zip |
Merge branch 'master' into doc/modeldoc/model
Diffstat (limited to 'src')
-rw-r--r-- | src/api/Cargo.toml | 1 | ||||
-rw-r--r-- | src/api/s3_get.rs | 56 | ||||
-rw-r--r-- | src/api/s3_list.rs | 36 | ||||
-rw-r--r-- | src/api/s3_put.rs | 39 | ||||
-rw-r--r-- | src/table/table.rs | 3 |
5 files changed, 92 insertions, 43 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index a366f9b8..079993c3 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -27,6 +27,7 @@ md-5 = "0.9.1" sha2 = "0.8" hmac = "0.7" crypto-mac = "0.7" +rand = "0.7" futures = "0.3" futures-util = "0.3" diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs index a68c485b..1a23f476 100644 --- a/src/api/s3_get.rs +++ b/src/api/s3_get.rs @@ -24,11 +24,13 @@ fn object_headers( "Content-Type", version_meta.headers.content_type.to_string(), ) - .header("Content-Length", format!("{}", version_meta.size)) - .header("ETag", version_meta.etag.to_string()) .header("Last-Modified", date_str) .header("Accept-Ranges", format!("bytes")); + if !version_meta.etag.is_empty() { + resp = resp.header("ETag", format!("\"{}\"", version_meta.etag)); + } + for (k, v) in version_meta.headers.other.iter() { resp = resp.header(k, v.to_string()); } @@ -63,6 +65,7 @@ pub async fn handle_head( let body: Body = Body::from(vec![]); let response = object_headers(&version, version_meta) + .header("Content-Length", format!("{}", version_meta.size)) .status(StatusCode::OK) .body(body) .unwrap(); @@ -123,7 +126,9 @@ pub async fn handle_get( .await; } - let resp_builder = object_headers(&last_v, last_v_meta).status(StatusCode::OK); + let resp_builder = object_headers(&last_v, last_v_meta) + .header("Content-Length", format!("{}", last_v_meta.size)) + .status(StatusCode::OK); match &last_v_data { ObjectVersionData::DeleteMarker => unreachable!(), @@ -161,7 +166,7 @@ pub async fn handle_get( } }) .buffered(2); - //let body: Body = Box::new(StreamBody::new(Box::pin(body_stream))); + let body = hyper::body::Body::wrap_stream(body_stream); Ok(resp_builder.body(body)?) } @@ -181,9 +186,10 @@ pub async fn handle_get_range( } let resp_builder = object_headers(version, version_meta) + .header("Content-Length", format!("{}", end - begin)) .header( "Content-Range", - format!("bytes {}-{}/{}", begin, end, version_meta.size), + format!("bytes {}-{}/{}", begin, end - 1, version_meta.size), ) .status(StatusCode::PARTIAL_CONTENT); @@ -206,35 +212,49 @@ pub async fn handle_get_range( None => return Err(Error::NotFound), }; - let blocks = version - .blocks() - .iter() - .cloned() - .filter(|block| block.offset + block.size > begin && block.offset < end) - .collect::<Vec<_>>(); + // We will store here the list of blocks that have an intersection with the requested + // range, as well as their "true offset", which is their actual offset in the complete + // file (whereas block.offset designates the offset of the block WITHIN THE PART + // block.part_number, which is not the same in the case of a multipart upload) + let mut blocks = Vec::with_capacity(std::cmp::min( + version.blocks().len(), + 4 + ((end - begin) / std::cmp::max(version.blocks()[0].size as u64, 1024)) as usize, + )); + let mut true_offset = 0; + for b in version.blocks().iter() { + if true_offset >= end { + break; + } + // Keep only blocks that have an intersection with the requested range + if true_offset < end && true_offset + b.size > begin { + blocks.push((b.clone(), true_offset)); + } + true_offset += b.size; + } let body_stream = futures::stream::iter(blocks) - .map(move |block| { + .map(move |(block, true_offset)| { let garage = garage.clone(); async move { let data = garage.block_manager.rpc_get_block(&block.hash).await?; - let start_in_block = if block.offset > begin { + let data = Bytes::from(data); + let start_in_block = if true_offset > begin { 0 } else { - begin - block.offset + begin - true_offset }; - let end_in_block = if block.offset + block.size < end { + let end_in_block = if true_offset + block.size < end { block.size } else { - end - block.offset + end - true_offset }; Result::<Bytes, Error>::Ok(Bytes::from( - data[start_in_block as usize..end_in_block as usize].to_vec(), + data.slice(start_in_block as usize..end_in_block as usize), )) } }) .buffered(2); - //let body: Body = Box::new(StreamBody::new(Box::pin(body_stream))); + let body = hyper::body::Body::wrap_stream(body_stream); Ok(resp_builder.body(body)?) } diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index 3b739a8a..599d0d11 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -18,6 +18,7 @@ use crate::encoding::*; struct ListResultInfo { last_modified: u64, size: u64, + etag: String, } pub async fn handle_list( @@ -56,12 +57,12 @@ pub async fn handle_list( for object in objects.iter() { if !object.key.starts_with(prefix) { - truncated = false; + truncated = None; break 'query_loop; } if let Some(version) = object.versions().iter().find(|x| x.is_data()) { if result_keys.len() + result_common_prefixes.len() >= max_keys { - truncated = true; + truncated = Some(object.key.to_string()); break 'query_loop; } let common_prefix = if delimiter.len() > 0 { @@ -75,19 +76,18 @@ pub async fn handle_list( if let Some(pfx) = common_prefix { result_common_prefixes.insert(pfx.to_string()); } else { - let size = match &version.state { - ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => { - meta.size - } + let meta = match &version.state { + ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta, ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => { - meta.size + meta } _ => unreachable!(), }; let info = match result_keys.get(&object.key) { None => ListResultInfo { last_modified: version.timestamp, - size, + size: meta.size, + etag: meta.etag.to_string(), }, Some(_lri) => { return Err(Error::Message(format!("Duplicate key?? {}", object.key))) @@ -98,7 +98,7 @@ pub async fn handle_list( } } if objects.len() < max_keys + 1 { - truncated = false; + truncated = None; break 'query_loop; } if objects.len() > 0 { @@ -113,11 +113,22 @@ pub async fn handle_list( r#"<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">"# ) .unwrap(); - writeln!(&mut xml, "\t<Bucket>{}</Bucket>", bucket).unwrap(); + writeln!(&mut xml, "\t<Name>{}</Name>", bucket).unwrap(); writeln!(&mut xml, "\t<Prefix>{}</Prefix>", prefix).unwrap(); + if let Some(mkr) = marker { + writeln!(&mut xml, "\t<Marker>{}</Marker>", mkr).unwrap(); + } writeln!(&mut xml, "\t<KeyCount>{}</KeyCount>", result_keys.len()).unwrap(); writeln!(&mut xml, "\t<MaxKeys>{}</MaxKeys>", max_keys).unwrap(); - writeln!(&mut xml, "\t<IsTruncated>{}</IsTruncated>", truncated).unwrap(); + writeln!( + &mut xml, + "\t<IsTruncated>{}</IsTruncated>", + truncated.is_some() + ) + .unwrap(); + if let Some(next_marker) = truncated { + writeln!(&mut xml, "\t<NextMarker>{}</NextMarker>", next_marker).unwrap(); + } for (key, info) in result_keys.iter() { let last_modif = NaiveDateTime::from_timestamp(info.last_modified as i64 / 1000, 0); let last_modif = DateTime::<Utc>::from_utc(last_modif, Utc); @@ -132,6 +143,9 @@ pub async fn handle_list( .unwrap(); writeln!(&mut xml, "\t\t<LastModified>{}</LastModified>", last_modif).unwrap(); writeln!(&mut xml, "\t\t<Size>{}</Size>", info.size).unwrap(); + if !info.etag.is_empty() { + writeln!(&mut xml, "\t\t<ETag>\"{}\"</ETag>", info.etag).unwrap(); + } writeln!(&mut xml, "\t\t<StorageClass>STANDARD</StorageClass>").unwrap(); writeln!(&mut xml, "\t</Contents>").unwrap(); } diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index a528720d..c42309b2 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -51,12 +51,7 @@ pub async fn handle_put( let md5sum_arr = md5sum.finalize(); let md5sum_hex = hex::encode(md5sum_arr); - let mut sha256sum = Sha256::new(); - sha256sum.input(&first_block[..]); - let sha256sum_arr = sha256sum.result(); - let mut hash = [0u8; 32]; - hash.copy_from_slice(&sha256sum_arr[..]); - let sha256sum_hash = Hash::from(hash); + let sha256sum_hash = hash(&first_block[..]); ensure_checksum_matches( md5sum_arr.as_slice(), @@ -253,7 +248,7 @@ impl BodyChunker { body, read_all: false, block_size, - buf: VecDeque::new(), + buf: VecDeque::with_capacity(2 * block_size), } } async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> { @@ -278,11 +273,10 @@ impl BodyChunker { } } -pub fn put_response(version_uuid: UUID, etag: String) -> Response<Body> { +pub fn put_response(version_uuid: UUID, md5sum_hex: String) -> Response<Body> { Response::builder() .header("x-amz-version-id", hex::encode(version_uuid)) - .header("ETag", etag) - // TODO ETag + .header("ETag", format!("\"{}\"", md5sum_hex)) .body(Body::from(vec![])) .unwrap() } @@ -369,7 +363,7 @@ pub async fn handle_put_part( } // Copy block to store - let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]); + let version = Version::new(version_uuid, bucket, key, false, vec![]); let first_block_hash = hash(&first_block[..]); let (_, md5sum_arr, sha256sum) = read_and_put_blocks( &garage, @@ -388,7 +382,11 @@ pub async fn handle_put_part( content_sha256, )?; - Ok(Response::new(Body::from(vec![]))) + let response = Response::builder() + .header("ETag", format!("\"{}\"", hex::encode(md5sum_arr))) + .body(Body::from(vec![])) + .unwrap(); + Ok(response) } pub async fn handle_complete_multipart_upload( @@ -430,6 +428,21 @@ pub async fn handle_complete_multipart_upload( _ => unreachable!(), }; + // ETag calculation: we produce ETags that have the same form as + // those of S3 multipart uploads, but we don't use their actual + // calculation for the first part (we use random bytes). This + // shouldn't impact compatibility as the S3 docs specify that + // the ETag is an opaque value in case of a multipart upload. + // See also: https://teppen.io/2018/06/23/aws_s3_etags/ + let num_parts = version.blocks().last().unwrap().part_number + - version.blocks().first().unwrap().part_number + + 1; + let etag = format!( + "{}-{}", + hex::encode(&rand::random::<[u8; 16]>()[..]), + num_parts + ); + // TODO: check that all the parts that they pretend they gave us are indeed there // TODO: when we read the XML from _req, remember to check the sha256 sum of the payload // against the signed x-amz-content-sha256 @@ -444,7 +457,7 @@ pub async fn handle_complete_multipart_upload( ObjectVersionMeta { headers, size: total_size, - etag: "".to_string(), // TODO + etag: etag, }, version.blocks()[0].hash, )); diff --git a/src/table/table.rs b/src/table/table.rs index 5dfee3c8..acb46325 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -391,7 +391,8 @@ where let (old_entry, new_entry) = self.store.transaction(|db| { let (old_entry, new_entry) = match db.get(&tree_key)? { Some(prev_bytes) => { - let old_entry = self.decode_entry(&prev_bytes) + let old_entry = self + .decode_entry(&prev_bytes) .map_err(sled::ConflictableTransactionError::Abort)?; let mut new_entry = old_entry.clone(); new_entry.merge(&update); |