aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/api/api_server.rs8
-rw-r--r--src/api/error.rs16
-rw-r--r--src/api/s3_get.rs95
-rw-r--r--src/api/s3_put.rs38
-rw-r--r--src/web/web_server.rs6
5 files changed, 129 insertions, 34 deletions
diff --git a/src/api/api_server.rs b/src/api/api_server.rs
index e02b9204..315116c8 100644
--- a/src/api/api_server.rs
+++ b/src/api/api_server.rs
@@ -157,8 +157,12 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
let resp = match endpoint {
Endpoint::Options => handle_options(&req, &bucket).await,
- Endpoint::HeadObject { key, .. } => handle_head(garage, &req, bucket_id, &key).await,
- Endpoint::GetObject { key, .. } => handle_get(garage, &req, bucket_id, &key).await,
+ Endpoint::HeadObject {
+ key, part_number, ..
+ } => handle_head(garage, &req, bucket_id, &key, part_number).await,
+ Endpoint::GetObject {
+ key, part_number, ..
+ } => handle_get(garage, &req, bucket_id, &key, part_number).await,
Endpoint::UploadPart {
key,
part_number,
diff --git a/src/api/error.rs b/src/api/error.rs
index 8602cfdc..d945295a 100644
--- a/src/api/error.rs
+++ b/src/api/error.rs
@@ -58,6 +58,19 @@ pub enum Error {
#[error(display = "At least one of the preconditions you specified did not hold")]
PreconditionFailed,
+ /// Parts specified in CMU request do not match parts actually uploaded
+ #[error(display = "Parts given to CompleteMultipartUpload do not match uploaded parts")]
+ InvalidPart,
+
+ /// Parts given to CompleteMultipartUpload were not in ascending order
+ #[error(display = "Parts given to CompleteMultipartUpload were not in ascending order")]
+ InvalidPartOrder,
+
+ /// In CompleteMultipartUpload: not enough data
+ /// (here we are more lenient than AWS S3)
+ #[error(display = "Proposed upload is smaller than the minimum allowed object size")]
+ EntityTooSmall,
+
// Category: bad request
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
#[error(display = "Invalid UTF-8: {}", _0)]
@@ -143,6 +156,9 @@ impl Error {
Error::BucketAlreadyExists => "BucketAlreadyExists",
Error::BucketNotEmpty => "BucketNotEmpty",
Error::PreconditionFailed => "PreconditionFailed",
+ Error::InvalidPart => "InvalidPart",
+ Error::InvalidPartOrder => "InvalidPartOrder",
+ Error::EntityTooSmall => "EntityTooSmall",
Error::Forbidden(_) => "AccessDenied",
Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
Error::NotImplemented(_) => "NotImplemented",
diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs
index 67ab2b59..fdb36231 100644
--- a/src/api/s3_get.rs
+++ b/src/api/s3_get.rs
@@ -3,6 +3,10 @@ use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH};
use futures::stream::*;
+use http::header::{
+ ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE,
+ IF_NONE_MATCH, LAST_MODIFIED,
+};
use hyper::body::Bytes;
use hyper::{Body, Request, Response, StatusCode};
@@ -24,15 +28,12 @@ fn object_headers(
let date_str = httpdate::fmt_http_date(date);
let mut resp = Response::builder()
- .header(
- "Content-Type",
- version_meta.headers.content_type.to_string(),
- )
- .header("Last-Modified", date_str)
- .header("Accept-Ranges", "bytes".to_string());
+ .header(CONTENT_TYPE, version_meta.headers.content_type.to_string())
+ .header(LAST_MODIFIED, date_str)
+ .header(ACCEPT_RANGES, "bytes".to_string());
if !version_meta.etag.is_empty() {
- resp = resp.header("ETag", format!("\"{}\"", version_meta.etag));
+ resp = resp.header(ETAG, format!("\"{}\"", version_meta.etag));
}
for (k, v) in version_meta.headers.other.iter() {
@@ -52,7 +53,7 @@ fn try_answer_cached(
// precedence and If-Modified-Since is ignored (as per 6.Precedence from rfc7232). The rational
// being that etag based matching is more accurate, it has no issue with sub-second precision
// for instance (in case of very fast updates)
- let cached = if let Some(none_match) = req.headers().get(http::header::IF_NONE_MATCH) {
+ let cached = if let Some(none_match) = req.headers().get(IF_NONE_MATCH) {
let none_match = none_match.to_str().ok()?;
let expected = format!("\"{}\"", version_meta.etag);
let found = none_match
@@ -60,7 +61,7 @@ fn try_answer_cached(
.map(str::trim)
.any(|etag| etag == expected || etag == "\"*\"");
found
- } else if let Some(modified_since) = req.headers().get(http::header::IF_MODIFIED_SINCE) {
+ } else if let Some(modified_since) = req.headers().get(IF_MODIFIED_SINCE) {
let modified_since = modified_since.to_str().ok()?;
let client_date = httpdate::parse_http_date(modified_since).ok()?;
let server_date = UNIX_EPOCH + Duration::from_millis(version.timestamp);
@@ -87,6 +88,7 @@ pub async fn handle_head(
req: &Request<Body>,
bucket_id: Uuid,
key: &str,
+ part_number: Option<u64>,
) -> Result<Response<Body>, Error> {
let object = garage
.object_table
@@ -94,30 +96,68 @@ pub async fn handle_head(
.await?
.ok_or(Error::NoSuchKey)?;
- let version = object
+ let object_version = object
.versions()
.iter()
.rev()
.find(|v| v.is_data())
.ok_or(Error::NoSuchKey)?;
- let version_meta = match &version.state {
- ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta,
- ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta,
+ let version_data = match &object_version.state {
+ ObjectVersionState::Complete(c) => c,
+ _ => unreachable!(),
+ };
+
+ let version_meta = match version_data {
+ ObjectVersionData::Inline(meta, _) => meta,
+ ObjectVersionData::FirstBlock(meta, _) => meta,
_ => unreachable!(),
};
- if let Some(cached) = try_answer_cached(version, version_meta, req) {
+ if let Some(cached) = try_answer_cached(object_version, version_meta, req) {
return Ok(cached);
}
- let body: Body = Body::empty();
- let response = object_headers(version, version_meta)
- .header("Content-Length", format!("{}", version_meta.size))
- .status(StatusCode::OK)
- .body(body)
- .unwrap();
- Ok(response)
+ if let Some(pn) = part_number {
+ if let ObjectVersionData::Inline(_, _) = version_data {
+ // Not a multipart upload
+ return Err(Error::BadRequest(
+ "Cannot process part_number argument: not a multipart upload".into(),
+ ));
+ }
+
+ let version = garage
+ .version_table
+ .get(&object_version.uuid, &EmptyKey)
+ .await?
+ .ok_or(Error::NoSuchKey)?;
+ if !version.has_part_number(pn) {
+ return Err(Error::BadRequest(format!(
+ "Part number {} does not exist",
+ pn
+ )));
+ }
+
+ let part_size: u64 = version
+ .blocks
+ .items()
+ .iter()
+ .filter(|(k, _)| k.part_number == pn)
+ .map(|(_, b)| b.size)
+ .sum();
+ let n_parts = version.parts_etags.items().len();
+
+ Ok(object_headers(object_version, version_meta)
+ .header(CONTENT_LENGTH, format!("{}", part_size))
+ .header("x-amz-mp-parts-count", format!("{}", n_parts))
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+ } else {
+ Ok(object_headers(object_version, version_meta)
+ .header(CONTENT_LENGTH, format!("{}", version_meta.size))
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+ }
}
/// Handle GET request
@@ -126,7 +166,14 @@ pub async fn handle_get(
req: &Request<Body>,
bucket_id: Uuid,
key: &str,
+ part_number: Option<u64>,
) -> Result<Response<Body>, Error> {
+ if part_number.is_some() {
+ return Err(Error::NotImplemented(
+ "part_number not supported for GetObject".into(),
+ ));
+ }
+
let object = garage
.object_table
.get(&bucket_id, &key.to_string())
@@ -182,7 +229,7 @@ pub async fn handle_get(
}
let resp_builder = object_headers(last_v, last_v_meta)
- .header("Content-Length", format!("{}", last_v_meta.size))
+ .header(CONTENT_LENGTH, format!("{}", last_v_meta.size))
.status(StatusCode::OK);
match &last_v_data {
@@ -238,9 +285,9 @@ async fn handle_get_range(
end: u64,
) -> Result<Response<Body>, Error> {
let resp_builder = object_headers(version, version_meta)
- .header("Content-Length", format!("{}", end - begin))
+ .header(CONTENT_LENGTH, format!("{}", end - begin))
.header(
- "Content-Range",
+ CONTENT_RANGE,
format!("bytes {}-{}/{}", begin, end - 1, version_meta.size),
)
.status(StatusCode::PARTIAL_CONTENT);
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index a3ae290d..f52080a6 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -1,4 +1,4 @@
-use std::collections::{BTreeMap, VecDeque};
+use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::sync::Arc;
use chrono::{DateTime, NaiveDateTime, Utc};
@@ -520,6 +520,7 @@ pub async fn handle_complete_multipart_upload(
let version_uuid = decode_upload_id(upload_id)?;
+ // Get object and version
let key = key.to_string();
let (object, version) = futures::try_join!(
garage.object_table.get(&bucket_id, &key),
@@ -544,6 +545,20 @@ pub async fn handle_complete_multipart_upload(
_ => unreachable!(),
};
+ // Check that part numbers are an increasing sequence.
+ // (it doesn't need to start at 1 nor to be a continuous sequence,
+ // see discussion in #192)
+ if body_list_of_parts.is_empty() {
+ return Err(Error::EntityTooSmall);
+ }
+ if !body_list_of_parts
+ .iter()
+ .zip(body_list_of_parts.iter().skip(1))
+ .all(|(p1, p2)| p1.part_number < p2.part_number)
+ {
+ return Err(Error::InvalidPartOrder);
+ }
+
// Check that the list of parts they gave us corresponds to the parts we have here
debug!("Expected parts from request: {:?}", body_list_of_parts);
debug!("Parts stored in version: {:?}", version.parts_etags.items());
@@ -557,17 +572,30 @@ pub async fn handle_complete_multipart_upload(
.map(|x| (&x.part_number, &x.etag))
.eq(parts);
if !same_parts {
+ return Err(Error::InvalidPart);
+ }
+
+ // Check that all blocks belong to one of the parts
+ let block_parts = version
+ .blocks
+ .items()
+ .iter()
+ .map(|(bk, _)| bk.part_number)
+ .collect::<BTreeSet<_>>();
+ let same_parts = body_list_of_parts
+ .iter()
+ .map(|x| x.part_number)
+ .eq(block_parts.into_iter());
+ if !same_parts {
return Err(Error::BadRequest(
- "We don't have the same parts".to_string(),
+ "Part numbers in block list and part list do not match. This can happen if a part was partially uploaded. Please abort the multipart upload and try again.".into(),
));
}
// Calculate etag of final object
// To understand how etags are calculated, read more here:
// https://teppen.io/2018/06/23/aws_s3_etags/
- let num_parts = version.blocks.items().last().unwrap().0.part_number
- - version.blocks.items().first().unwrap().0.part_number
- + 1;
+ let num_parts = body_list_of_parts.len();
let mut etag_md5_hasher = Md5::new();
for (_, etag) in version.parts_etags.items().iter() {
etag_md5_hasher.update(etag.as_bytes());
diff --git a/src/web/web_server.rs b/src/web/web_server.rs
index 64112feb..6c7d7c35 100644
--- a/src/web/web_server.rs
+++ b/src/web/web_server.rs
@@ -134,8 +134,8 @@ async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response
let ret_doc = match *req.method() {
Method::OPTIONS => handle_options(req, &bucket).await,
- Method::HEAD => handle_head(garage.clone(), req, bucket_id, &key).await,
- Method::GET => handle_get(garage.clone(), req, bucket_id, &key).await,
+ Method::HEAD => handle_head(garage.clone(), req, bucket_id, &key, None).await,
+ Method::GET => handle_get(garage.clone(), req, bucket_id, &key, None).await,
_ => Err(ApiError::BadRequest("HTTP method not supported".into())),
}
.map_err(Error::from);
@@ -166,7 +166,7 @@ async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response
.body(Body::empty())
.unwrap();
- match handle_get(garage, &req2, bucket_id, &error_document).await {
+ match handle_get(garage, &req2, bucket_id, &error_document, None).await {
Ok(mut error_doc) => {
// The error won't be logged back in handle_request,
// so log it here