aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3_put.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-15 15:26:29 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-15 15:26:29 +0100
commit097c339d981dba0420af17d30d1221181d8bf1d7 (patch)
tree0d91832b3fb89b95bb4341ca580619f7081378a9 /src/api/s3_put.rs
parentbdcbdd1cd854bd8125458af0ac20b8682f810967 (diff)
downloadgarage-097c339d981dba0420af17d30d1221181d8bf1d7.tar.gz
garage-097c339d981dba0420af17d30d1221181d8bf1d7.zip
Fix race condition
Diffstat (limited to 'src/api/s3_put.rs')
-rw-r--r--src/api/s3_put.rs43
1 files changed, 24 insertions, 19 deletions
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index c1774d6b..6f675e37 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -87,17 +87,21 @@ pub async fn handle_put(
// that we are uploading something
let mut object_version = ObjectVersion {
uuid: version_uuid,
- timestamp: now_msec(),
+ timestamp: version_timestamp,
state: ObjectVersionState::Uploading(headers.clone()),
};
let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
// Initialize corresponding entry in version table
+ // Write this entry now, even with empty block list,
+ // to prevent block_ref entries from being deleted (they can be deleted
+ // if the reference a version that isn't found in the version table)
let version = Version::new(version_uuid, bucket.into(), key.into(), false);
- let first_block_hash = blake2sum(&first_block[..]);
+ garage.version_table.insert(&version).await?;
// Transfer data and verify checksum
+ let first_block_hash = blake2sum(&first_block[..]);
let tx_result = read_and_put_blocks(
&garage,
&version,
@@ -173,7 +177,7 @@ fn ensure_checksum_matches(
}
async fn read_and_put_blocks(
- garage: &Arc<Garage>,
+ garage: &Garage,
version: &Version,
part_number: u64,
first_block: Vec<u8>,
@@ -187,7 +191,7 @@ async fn read_and_put_blocks(
let mut next_offset = first_block.len();
let mut put_curr_version_block = put_block_meta(
- garage.clone(),
+ &garage,
&version,
part_number,
0,
@@ -207,7 +211,7 @@ async fn read_and_put_blocks(
let block_hash = blake2sum(&block[..]);
let block_len = block.len();
put_curr_version_block = put_block_meta(
- garage.clone(),
+ &garage,
&version,
part_number,
next_offset as u64,
@@ -231,14 +235,13 @@ async fn read_and_put_blocks(
}
async fn put_block_meta(
- garage: Arc<Garage>,
+ garage: &Garage,
version: &Version,
part_number: u64,
offset: u64,
hash: Hash,
size: u64,
) -> Result<(), GarageError> {
- // TODO: don't clone, restart from empty block list ??
let mut version = version.clone();
version.blocks.put(
VersionBlockKey {
@@ -316,6 +319,7 @@ pub async fn handle_create_multipart_upload(
let version_uuid = gen_uuid();
let headers = get_headers(req)?;
+ // Create object in object table
let object_version = ObjectVersion {
uuid: version_uuid,
timestamp: now_msec(),
@@ -324,6 +328,14 @@ pub async fn handle_create_multipart_upload(
let object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]);
garage.object_table.insert(&object).await?;
+ // Insert empty version so that block_ref entries refer to something
+ // (they are inserted concurrently with blocks in the version table, so
+ // there is the possibility that they are inserted before the version table
+ // is created, in which case it is allowed to delete them, e.g. in repair_*)
+ let version = Version::new(version_uuid, bucket.into(), key.into(), false);
+ garage.version_table.insert(&version).await?;
+
+ // Send success response
let mut xml = String::new();
writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap();
writeln!(
@@ -450,14 +462,12 @@ pub async fn handle_complete_multipart_upload(
)?;
let object = object.ok_or(Error::BadRequest(format!("Object not found")))?;
- let object_version = object
+ let mut object_version = object
.versions()
.iter()
- .find(|v| v.uuid == version_uuid && v.is_uploading());
- let mut object_version = match object_version {
- None => return Err(Error::NotFound),
- Some(x) => x.clone(),
- };
+ .find(|v| v.uuid == version_uuid && v.is_uploading())
+ .cloned()
+ .ok_or(Error::BadRequest(format!("Version not found")))?;
let version = version.ok_or(Error::BadRequest(format!("Version not found")))?;
if version.blocks.len() == 0 {
@@ -498,12 +508,7 @@ pub async fn handle_complete_multipart_upload(
let etag = format!("{}-{}", hex::encode(etag_md5_hasher.finalize()), num_parts);
// Calculate total size of final object
- let total_size = version
- .blocks
- .items()
- .iter()
- .map(|x| x.1.size)
- .fold(0, |x, y| x + y);
+ let total_size = version.blocks.items().iter().map(|x| x.1.size).sum();
// Write final object version
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(