aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/Cargo.toml1
-rw-r--r--src/api/s3_get.rs56
-rw-r--r--src/api/s3_list.rs36
-rw-r--r--src/api/s3_put.rs39
-rw-r--r--src/table/table.rs3
5 files changed, 92 insertions, 43 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index a366f9b8..079993c3 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -27,6 +27,7 @@ md-5 = "0.9.1"
sha2 = "0.8"
hmac = "0.7"
crypto-mac = "0.7"
+rand = "0.7"
futures = "0.3"
futures-util = "0.3"
diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs
index a68c485b..1a23f476 100644
--- a/src/api/s3_get.rs
+++ b/src/api/s3_get.rs
@@ -24,11 +24,13 @@ fn object_headers(
"Content-Type",
version_meta.headers.content_type.to_string(),
)
- .header("Content-Length", format!("{}", version_meta.size))
- .header("ETag", version_meta.etag.to_string())
.header("Last-Modified", date_str)
.header("Accept-Ranges", format!("bytes"));
+ if !version_meta.etag.is_empty() {
+ resp = resp.header("ETag", format!("\"{}\"", version_meta.etag));
+ }
+
for (k, v) in version_meta.headers.other.iter() {
resp = resp.header(k, v.to_string());
}
@@ -63,6 +65,7 @@ pub async fn handle_head(
let body: Body = Body::from(vec![]);
let response = object_headers(&version, version_meta)
+ .header("Content-Length", format!("{}", version_meta.size))
.status(StatusCode::OK)
.body(body)
.unwrap();
@@ -123,7 +126,9 @@ pub async fn handle_get(
.await;
}
- let resp_builder = object_headers(&last_v, last_v_meta).status(StatusCode::OK);
+ let resp_builder = object_headers(&last_v, last_v_meta)
+ .header("Content-Length", format!("{}", last_v_meta.size))
+ .status(StatusCode::OK);
match &last_v_data {
ObjectVersionData::DeleteMarker => unreachable!(),
@@ -161,7 +166,7 @@ pub async fn handle_get(
}
})
.buffered(2);
- //let body: Body = Box::new(StreamBody::new(Box::pin(body_stream)));
+
let body = hyper::body::Body::wrap_stream(body_stream);
Ok(resp_builder.body(body)?)
}
@@ -181,9 +186,10 @@ pub async fn handle_get_range(
}
let resp_builder = object_headers(version, version_meta)
+ .header("Content-Length", format!("{}", end - begin))
.header(
"Content-Range",
- format!("bytes {}-{}/{}", begin, end, version_meta.size),
+ format!("bytes {}-{}/{}", begin, end - 1, version_meta.size),
)
.status(StatusCode::PARTIAL_CONTENT);
@@ -206,35 +212,49 @@ pub async fn handle_get_range(
None => return Err(Error::NotFound),
};
- let blocks = version
- .blocks()
- .iter()
- .cloned()
- .filter(|block| block.offset + block.size > begin && block.offset < end)
- .collect::<Vec<_>>();
+ // We will store here the list of blocks that have an intersection with the requested
+ // range, as well as their "true offset", which is their actual offset in the complete
+ // file (whereas block.offset designates the offset of the block WITHIN THE PART
+ // block.part_number, which is not the same in the case of a multipart upload)
+ let mut blocks = Vec::with_capacity(std::cmp::min(
+ version.blocks().len(),
+ 4 + ((end - begin) / std::cmp::max(version.blocks()[0].size as u64, 1024)) as usize,
+ ));
+ let mut true_offset = 0;
+ for b in version.blocks().iter() {
+ if true_offset >= end {
+ break;
+ }
+ // Keep only blocks that have an intersection with the requested range
+ if true_offset < end && true_offset + b.size > begin {
+ blocks.push((b.clone(), true_offset));
+ }
+ true_offset += b.size;
+ }
let body_stream = futures::stream::iter(blocks)
- .map(move |block| {
+ .map(move |(block, true_offset)| {
let garage = garage.clone();
async move {
let data = garage.block_manager.rpc_get_block(&block.hash).await?;
- let start_in_block = if block.offset > begin {
+ let data = Bytes::from(data);
+ let start_in_block = if true_offset > begin {
0
} else {
- begin - block.offset
+ begin - true_offset
};
- let end_in_block = if block.offset + block.size < end {
+ let end_in_block = if true_offset + block.size < end {
block.size
} else {
- end - block.offset
+ end - true_offset
};
Result::<Bytes, Error>::Ok(Bytes::from(
- data[start_in_block as usize..end_in_block as usize].to_vec(),
+ data.slice(start_in_block as usize..end_in_block as usize),
))
}
})
.buffered(2);
- //let body: Body = Box::new(StreamBody::new(Box::pin(body_stream)));
+
let body = hyper::body::Body::wrap_stream(body_stream);
Ok(resp_builder.body(body)?)
}
diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs
index 3b739a8a..599d0d11 100644
--- a/src/api/s3_list.rs
+++ b/src/api/s3_list.rs
@@ -18,6 +18,7 @@ use crate::encoding::*;
struct ListResultInfo {
last_modified: u64,
size: u64,
+ etag: String,
}
pub async fn handle_list(
@@ -56,12 +57,12 @@ pub async fn handle_list(
for object in objects.iter() {
if !object.key.starts_with(prefix) {
- truncated = false;
+ truncated = None;
break 'query_loop;
}
if let Some(version) = object.versions().iter().find(|x| x.is_data()) {
if result_keys.len() + result_common_prefixes.len() >= max_keys {
- truncated = true;
+ truncated = Some(object.key.to_string());
break 'query_loop;
}
let common_prefix = if delimiter.len() > 0 {
@@ -75,19 +76,18 @@ pub async fn handle_list(
if let Some(pfx) = common_prefix {
result_common_prefixes.insert(pfx.to_string());
} else {
- let size = match &version.state {
- ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => {
- meta.size
- }
+ let meta = match &version.state {
+ ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta,
ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => {
- meta.size
+ meta
}
_ => unreachable!(),
};
let info = match result_keys.get(&object.key) {
None => ListResultInfo {
last_modified: version.timestamp,
- size,
+ size: meta.size,
+ etag: meta.etag.to_string(),
},
Some(_lri) => {
return Err(Error::Message(format!("Duplicate key?? {}", object.key)))
@@ -98,7 +98,7 @@ pub async fn handle_list(
}
}
if objects.len() < max_keys + 1 {
- truncated = false;
+ truncated = None;
break 'query_loop;
}
if objects.len() > 0 {
@@ -113,11 +113,22 @@ pub async fn handle_list(
r#"<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">"#
)
.unwrap();
- writeln!(&mut xml, "\t<Bucket>{}</Bucket>", bucket).unwrap();
+ writeln!(&mut xml, "\t<Name>{}</Name>", bucket).unwrap();
writeln!(&mut xml, "\t<Prefix>{}</Prefix>", prefix).unwrap();
+ if let Some(mkr) = marker {
+ writeln!(&mut xml, "\t<Marker>{}</Marker>", mkr).unwrap();
+ }
writeln!(&mut xml, "\t<KeyCount>{}</KeyCount>", result_keys.len()).unwrap();
writeln!(&mut xml, "\t<MaxKeys>{}</MaxKeys>", max_keys).unwrap();
- writeln!(&mut xml, "\t<IsTruncated>{}</IsTruncated>", truncated).unwrap();
+ writeln!(
+ &mut xml,
+ "\t<IsTruncated>{}</IsTruncated>",
+ truncated.is_some()
+ )
+ .unwrap();
+ if let Some(next_marker) = truncated {
+ writeln!(&mut xml, "\t<NextMarker>{}</NextMarker>", next_marker).unwrap();
+ }
for (key, info) in result_keys.iter() {
let last_modif = NaiveDateTime::from_timestamp(info.last_modified as i64 / 1000, 0);
let last_modif = DateTime::<Utc>::from_utc(last_modif, Utc);
@@ -132,6 +143,9 @@ pub async fn handle_list(
.unwrap();
writeln!(&mut xml, "\t\t<LastModified>{}</LastModified>", last_modif).unwrap();
writeln!(&mut xml, "\t\t<Size>{}</Size>", info.size).unwrap();
+ if !info.etag.is_empty() {
+ writeln!(&mut xml, "\t\t<ETag>\"{}\"</ETag>", info.etag).unwrap();
+ }
writeln!(&mut xml, "\t\t<StorageClass>STANDARD</StorageClass>").unwrap();
writeln!(&mut xml, "\t</Contents>").unwrap();
}
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index a528720d..c42309b2 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -51,12 +51,7 @@ pub async fn handle_put(
let md5sum_arr = md5sum.finalize();
let md5sum_hex = hex::encode(md5sum_arr);
- let mut sha256sum = Sha256::new();
- sha256sum.input(&first_block[..]);
- let sha256sum_arr = sha256sum.result();
- let mut hash = [0u8; 32];
- hash.copy_from_slice(&sha256sum_arr[..]);
- let sha256sum_hash = Hash::from(hash);
+ let sha256sum_hash = hash(&first_block[..]);
ensure_checksum_matches(
md5sum_arr.as_slice(),
@@ -253,7 +248,7 @@ impl BodyChunker {
body,
read_all: false,
block_size,
- buf: VecDeque::new(),
+ buf: VecDeque::with_capacity(2 * block_size),
}
}
async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
@@ -278,11 +273,10 @@ impl BodyChunker {
}
}
-pub fn put_response(version_uuid: UUID, etag: String) -> Response<Body> {
+pub fn put_response(version_uuid: UUID, md5sum_hex: String) -> Response<Body> {
Response::builder()
.header("x-amz-version-id", hex::encode(version_uuid))
- .header("ETag", etag)
- // TODO ETag
+ .header("ETag", format!("\"{}\"", md5sum_hex))
.body(Body::from(vec![]))
.unwrap()
}
@@ -369,7 +363,7 @@ pub async fn handle_put_part(
}
// Copy block to store
- let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]);
+ let version = Version::new(version_uuid, bucket, key, false, vec![]);
let first_block_hash = hash(&first_block[..]);
let (_, md5sum_arr, sha256sum) = read_and_put_blocks(
&garage,
@@ -388,7 +382,11 @@ pub async fn handle_put_part(
content_sha256,
)?;
- Ok(Response::new(Body::from(vec![])))
+ let response = Response::builder()
+ .header("ETag", format!("\"{}\"", hex::encode(md5sum_arr)))
+ .body(Body::from(vec![]))
+ .unwrap();
+ Ok(response)
}
pub async fn handle_complete_multipart_upload(
@@ -430,6 +428,21 @@ pub async fn handle_complete_multipart_upload(
_ => unreachable!(),
};
+ // ETag calculation: we produce ETags that have the same form as
+ // those of S3 multipart uploads, but we don't use their actual
+ // calculation for the first part (we use random bytes). This
+ // shouldn't impact compatibility as the S3 docs specify that
+ // the ETag is an opaque value in case of a multipart upload.
+ // See also: https://teppen.io/2018/06/23/aws_s3_etags/
+ let num_parts = version.blocks().last().unwrap().part_number
+ - version.blocks().first().unwrap().part_number
+ + 1;
+ let etag = format!(
+ "{}-{}",
+ hex::encode(&rand::random::<[u8; 16]>()[..]),
+ num_parts
+ );
+
// TODO: check that all the parts that they pretend they gave us are indeed there
// TODO: when we read the XML from _req, remember to check the sha256 sum of the payload
// against the signed x-amz-content-sha256
@@ -444,7 +457,7 @@ pub async fn handle_complete_multipart_upload(
ObjectVersionMeta {
headers,
size: total_size,
- etag: "".to_string(), // TODO
+ etag: etag,
},
version.blocks()[0].hash,
));
diff --git a/src/table/table.rs b/src/table/table.rs
index 5dfee3c8..acb46325 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -391,7 +391,8 @@ where
let (old_entry, new_entry) = self.store.transaction(|db| {
let (old_entry, new_entry) = match db.get(&tree_key)? {
Some(prev_bytes) => {
- let old_entry = self.decode_entry(&prev_bytes)
+ let old_entry = self
+ .decode_entry(&prev_bytes)
.map_err(sled::ConflictableTransactionError::Abort)?;
let mut new_entry = old_entry.clone();
new_entry.merge(&update);