aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/s3_copy.rs20
-rw-r--r--src/api/s3_delete.rs11
-rw-r--r--src/api/s3_get.rs59
-rw-r--r--src/api/s3_list.rs8
-rw-r--r--src/api/s3_put.rs72
5 files changed, 100 insertions, 70 deletions
diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs
index 0af1b13a..b54701e2 100644
--- a/src/api/s3_copy.rs
+++ b/src/api/s3_copy.rs
@@ -39,27 +39,23 @@ pub async fn handle_copy(
Some(v) => v,
None => return Err(Error::NotFound),
};
+ let source_last_state = match &source_last_v.state {
+ ObjectVersionState::Complete(x) => x,
+ _ => unreachable!(),
+ };
let new_uuid = gen_uuid();
let dest_object_version = ObjectVersion {
uuid: new_uuid,
timestamp: now_msec(),
- mime_type: source_last_v.mime_type.clone(),
- size: source_last_v.size,
- state: ObjectVersionState::Complete,
- data: source_last_v.data.clone(),
+ state: ObjectVersionState::Complete(source_last_state.clone()),
};
- match &source_last_v.data {
- ObjectVersionData::Uploading => {
- return Err(Error::Message(format!(
- "Version is_complete() but data is stil Uploading (internal error)"
- )));
- }
+ match &source_last_state {
ObjectVersionData::DeleteMarker => {
return Err(Error::NotFound);
}
- ObjectVersionData::Inline(_bytes) => {
+ ObjectVersionData::Inline(_meta, _bytes) => {
let dest_object = Object::new(
dest_bucket.to_string(),
dest_key.to_string(),
@@ -67,7 +63,7 @@ pub async fn handle_copy(
);
garage.object_table.insert(&dest_object).await?;
}
- ObjectVersionData::FirstBlock(_first_block_hash) => {
+ ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
let source_version = garage
.version_table
.get(&source_last_v.uuid, &EmptyKey)
diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs
index 949ad6a1..c5cd5970 100644
--- a/src/api/s3_delete.rs
+++ b/src/api/s3_delete.rs
@@ -29,7 +29,11 @@ async fn handle_delete_internal(
};
let interesting_versions = object.versions().iter().filter(|v| {
- v.data != ObjectVersionData::DeleteMarker && v.state != ObjectVersionState::Aborted
+ match v.state {
+ ObjectVersionState::Aborted => false,
+ ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false,
+ _ => true,
+ }
});
let mut must_delete = None;
@@ -54,10 +58,7 @@ async fn handle_delete_internal(
vec![ObjectVersion {
uuid: version_uuid,
timestamp: now_msec(),
- mime_type: "application/x-delete-marker".into(),
- size: 0,
- state: ObjectVersionState::Complete,
- data: ObjectVersionData::DeleteMarker,
+ state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
}],
);
diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs
index 42cf55d1..25b3d3e3 100644
--- a/src/api/s3_get.rs
+++ b/src/api/s3_get.rs
@@ -12,13 +12,14 @@ use garage_table::EmptyKey;
use garage_model::garage::Garage;
use garage_model::object_table::*;
-fn object_headers(version: &ObjectVersion) -> http::response::Builder {
+fn object_headers(version: &ObjectVersion, version_meta: &ObjectVersionMeta) -> http::response::Builder {
let date = UNIX_EPOCH + Duration::from_millis(version.timestamp);
let date_str = httpdate::fmt_http_date(date);
Response::builder()
- .header("Content-Type", version.mime_type.to_string())
- .header("Content-Length", format!("{}", version.size))
+ .header("Content-Type", version_meta.headers.content_type.to_string())
+ .header("Content-Length", format!("{}", version_meta.size))
+ .header("ETag", version_meta.etag.to_string())
.header("Last-Modified", date_str)
.header("Accept-Ranges", format!("bytes"))
}
@@ -47,9 +48,14 @@ pub async fn handle_head(
Some(v) => v,
None => return Err(Error::NotFound),
};
+ let version_meta = match &version.state {
+ ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta,
+ ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta,
+ _ => unreachable!(),
+ };
let body: Body = Body::from(vec![]);
- let response = object_headers(&version)
+ let response = object_headers(&version, version_meta)
.status(StatusCode::OK)
.body(body)
.unwrap();
@@ -81,13 +87,22 @@ pub async fn handle_get(
Some(v) => v,
None => return Err(Error::NotFound),
};
+ let last_v_data = match &last_v.state {
+ ObjectVersionState::Complete(x) => x,
+ _ => unreachable!(),
+ };
+ let last_v_meta = match last_v_data {
+ ObjectVersionData::DeleteMarker => return Err(Error::NotFound),
+ ObjectVersionData::Inline(meta, _) => meta,
+ ObjectVersionData::FirstBlock(meta, _) => meta,
+ };
let range = match req.headers().get("range") {
Some(range) => {
let range_str = range
.to_str()
.map_err(|e| Error::BadRequest(format!("Invalid range header: {}", e)))?;
- let mut ranges = http_range::HttpRange::parse(range_str, last_v.size)
+ let mut ranges = http_range::HttpRange::parse(range_str, last_v_meta.size)
.map_err(|_e| Error::BadRequest(format!("Invalid range")))?;
if ranges.len() > 1 {
return Err(Error::BadRequest(format!("Multiple ranges not supported")));
@@ -98,21 +113,18 @@ pub async fn handle_get(
None => None,
};
if let Some(range) = range {
- return handle_get_range(garage, last_v, range.start, range.start + range.length).await;
+ return handle_get_range(garage, last_v, last_v_data, last_v_meta, range.start, range.start + range.length).await;
}
- let resp_builder = object_headers(&last_v).status(StatusCode::OK);
+ let resp_builder = object_headers(&last_v, last_v_meta).status(StatusCode::OK);
- match &last_v.data {
- ObjectVersionData::Uploading => Err(Error::Message(format!(
- "Version is_complete() but data is stil Uploading (internal error)"
- ))),
- ObjectVersionData::DeleteMarker => Err(Error::NotFound),
- ObjectVersionData::Inline(bytes) => {
+ match &last_v_data {
+ ObjectVersionData::DeleteMarker => unreachable!(),
+ ObjectVersionData::Inline(_, bytes) => {
let body: Body = Body::from(bytes.to_vec());
Ok(resp_builder.body(body)?)
}
- ObjectVersionData::FirstBlock(first_block_hash) => {
+ ObjectVersionData::FirstBlock(_, first_block_hash) => {
let read_first_block = garage.block_manager.rpc_get_block(&first_block_hash);
let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey);
@@ -155,26 +167,25 @@ pub async fn handle_get(
pub async fn handle_get_range(
garage: Arc<Garage>,
version: &ObjectVersion,
+ version_data: &ObjectVersionData,
+ version_meta: &ObjectVersionMeta,
begin: u64,
end: u64,
) -> Result<Response<Body>, Error> {
- if end > version.size {
+ if end > version_meta.size {
return Err(Error::BadRequest(format!("Range not included in file")));
}
- let resp_builder = object_headers(&version)
+ let resp_builder = object_headers(version, version_meta)
.header(
"Content-Range",
- format!("bytes {}-{}/{}", begin, end, version.size),
+ format!("bytes {}-{}/{}", begin, end, version_meta.size),
)
.status(StatusCode::PARTIAL_CONTENT);
- match &version.data {
- ObjectVersionData::Uploading => Err(Error::Message(format!(
- "Version is_complete() but data is stil Uploading (internal error)"
- ))),
- ObjectVersionData::DeleteMarker => Err(Error::NotFound),
- ObjectVersionData::Inline(bytes) => {
+ match &version_data {
+ ObjectVersionData::DeleteMarker => unreachable!(),
+ ObjectVersionData::Inline(_meta, bytes) => {
if end as usize <= bytes.len() {
let body: Body = Body::from(bytes[begin as usize..end as usize].to_vec());
Ok(resp_builder.body(body)?)
@@ -182,7 +193,7 @@ pub async fn handle_get_range(
Err(Error::Message(format!("Internal error: requested range not present in inline bytes when it should have been")))
}
}
- ObjectVersionData::FirstBlock(_first_block_hash) => {
+ ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
let version = garage.version_table.get(&version.uuid, &EmptyKey).await?;
let version = match version {
Some(v) => v,
diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs
index 1f0eccf5..3fca1348 100644
--- a/src/api/s3_list.rs
+++ b/src/api/s3_list.rs
@@ -8,6 +8,7 @@ use hyper::{Body, Response};
use garage_util::error::Error;
use garage_model::garage::Garage;
+use garage_model::object_table::*;
use crate::encoding::*;
@@ -73,10 +74,15 @@ pub async fn handle_list(
if let Some(pfx) = common_prefix {
result_common_prefixes.insert(pfx.to_string());
} else {
+ let size = match &version.state {
+ ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta.size,
+ ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta.size,
+ _ => unreachable!(),
+ };
let info = match result_keys.get(&object.key) {
None => ListResultInfo {
last_modified: version.timestamp,
- size: version.size,
+ size,
},
Some(_lri) => {
return Err(Error::Message(format!("Duplicate key?? {}", object.key)))
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index c5d0a31c..75622168 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -1,4 +1,4 @@
-use std::collections::VecDeque;
+use std::collections::{VecDeque, BTreeMap};
use std::fmt::Write;
use std::sync::Arc;
@@ -24,7 +24,11 @@ pub async fn handle_put(
key: &str,
) -> Result<Response<Body>, Error> {
let version_uuid = gen_uuid();
- let mime_type = get_mime_type(&req)?;
+ let headers = ObjectVersionHeaders{
+ content_type: get_mime_type(&req)?,
+ other: BTreeMap::new(), // TODO
+ };
+
let body = req.into_body();
let mut chunker = BodyChunker::new(body, garage.config.block_size);
@@ -36,15 +40,17 @@ pub async fn handle_put(
let mut object_version = ObjectVersion {
uuid: version_uuid,
timestamp: now_msec(),
- mime_type,
- size: first_block.len() as u64,
- state: ObjectVersionState::Uploading,
- data: ObjectVersionData::Uploading,
+ state: ObjectVersionState::Uploading(headers.clone()),
};
if first_block.len() < INLINE_THRESHOLD {
- object_version.data = ObjectVersionData::Inline(first_block);
- object_version.state = ObjectVersionState::Complete;
+ object_version.state = ObjectVersionState::Complete(ObjectVersionData::Inline(
+ ObjectVersionMeta{
+ headers,
+ size: first_block.len() as u64,
+ etag: "".to_string(), // TODO
+ },
+ first_block));
let object = Object::new(bucket.into(), key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
@@ -54,7 +60,6 @@ pub async fn handle_put(
let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]);
let first_block_hash = hash(&first_block[..]);
- object_version.data = ObjectVersionData::FirstBlock(first_block_hash);
let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
@@ -70,8 +75,13 @@ pub async fn handle_put(
// TODO: if at any step we have an error, we should undo everything we did
- object_version.state = ObjectVersionState::Complete;
- object_version.size = total_size;
+ object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
+ ObjectVersionMeta{
+ headers,
+ size: total_size,
+ etag: "".to_string(), // TODO
+ },
+ first_block_hash));
let object = Object::new(bucket.into(), key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
@@ -197,6 +207,7 @@ impl BodyChunker {
pub fn put_response(version_uuid: UUID) -> Response<Body> {
Response::builder()
.header("x-amz-version-id", hex::encode(version_uuid))
+ // TODO ETag
.body(Body::from(vec![]))
.unwrap()
}
@@ -208,15 +219,15 @@ pub async fn handle_create_multipart_upload(
key: &str,
) -> Result<Response<Body>, Error> {
let version_uuid = gen_uuid();
- let mime_type = get_mime_type(req)?;
+ let headers = ObjectVersionHeaders{
+ content_type: get_mime_type(&req)?,
+ other: BTreeMap::new(), // TODO
+ };
let object_version = ObjectVersion {
uuid: version_uuid,
timestamp: now_msec(),
- mime_type,
- size: 0,
- state: ObjectVersionState::Uploading,
- data: ObjectVersionData::Uploading,
+ state: ObjectVersionState::Uploading(headers),
};
let object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]);
garage.object_table.insert(&object).await?;
@@ -276,9 +287,7 @@ pub async fn handle_put_part(
Some(x) => x,
};
if !object.versions().iter().any(|v| {
- v.uuid == version_uuid
- && v.state == ObjectVersionState::Uploading
- && v.data == ObjectVersionData::Uploading
+ v.uuid == version_uuid && v.is_uploading()
}) {
return Err(Error::BadRequest(format!(
"Multipart upload does not exist or is otherwise invalid"
@@ -322,9 +331,7 @@ pub async fn handle_complete_multipart_upload(
Some(x) => x,
};
let object_version = object.versions().iter().find(|v| {
- v.uuid == version_uuid
- && v.state == ObjectVersionState::Uploading
- && v.data == ObjectVersionData::Uploading
+ v.uuid == version_uuid && v.is_uploading()
});
let mut object_version = match object_version {
None => {
@@ -341,6 +348,10 @@ pub async fn handle_complete_multipart_upload(
if version.blocks().len() == 0 {
return Err(Error::BadRequest(format!("No data was uploaded")));
}
+ let headers = match object_version.state {
+ ObjectVersionState::Uploading(headers) => headers.clone(),
+ _ => unreachable!(),
+ };
// TODO: check that all the parts that they pretend they gave us are indeed there
// TODO: check MD5 sum of all uploaded parts? but that would mean we have to store them somewhere...
@@ -350,9 +361,16 @@ pub async fn handle_complete_multipart_upload(
.iter()
.map(|x| x.size)
.fold(0, |x, y| x + y);
- object_version.size = total_size;
- object_version.state = ObjectVersionState::Complete;
- object_version.data = ObjectVersionData::FirstBlock(version.blocks()[0].hash);
+ object_version.state = ObjectVersionState::Complete(
+ ObjectVersionData::FirstBlock(
+ ObjectVersionMeta{
+ headers,
+ size: total_size,
+ etag: "".to_string(),// TODO
+ },
+ version.blocks()[0].hash)
+ );
+
let final_object = Object::new(bucket.clone(), key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
@@ -394,9 +412,7 @@ pub async fn handle_abort_multipart_upload(
Some(x) => x,
};
let object_version = object.versions().iter().find(|v| {
- v.uuid == version_uuid
- && v.state == ObjectVersionState::Uploading
- && v.data == ObjectVersionData::Uploading
+ v.uuid == version_uuid && v.is_uploading()
});
let mut object_version = match object_version {
None => {