diff options
author | Alex Auvolat <alex@adnab.me> | 2021-03-15 15:26:29 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-03-15 15:26:29 +0100 |
commit | 097c339d981dba0420af17d30d1221181d8bf1d7 (patch) | |
tree | 0d91832b3fb89b95bb4341ca580619f7081378a9 /src/api/s3_put.rs | |
parent | bdcbdd1cd854bd8125458af0ac20b8682f810967 (diff) | |
download | garage-097c339d981dba0420af17d30d1221181d8bf1d7.tar.gz garage-097c339d981dba0420af17d30d1221181d8bf1d7.zip |
Fix race condition
Diffstat (limited to 'src/api/s3_put.rs')
-rw-r--r-- | src/api/s3_put.rs | 43 |
1 files changed, 24 insertions, 19 deletions
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index c1774d6b..6f675e37 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -87,17 +87,21 @@ pub async fn handle_put( // that we are uploading something let mut object_version = ObjectVersion { uuid: version_uuid, - timestamp: now_msec(), + timestamp: version_timestamp, state: ObjectVersionState::Uploading(headers.clone()), }; let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; // Initialize corresponding entry in version table + // Write this entry now, even with empty block list, + // to prevent block_ref entries from being deleted (they can be deleted + // if the reference a version that isn't found in the version table) let version = Version::new(version_uuid, bucket.into(), key.into(), false); - let first_block_hash = blake2sum(&first_block[..]); + garage.version_table.insert(&version).await?; // Transfer data and verify checksum + let first_block_hash = blake2sum(&first_block[..]); let tx_result = read_and_put_blocks( &garage, &version, @@ -173,7 +177,7 @@ fn ensure_checksum_matches( } async fn read_and_put_blocks( - garage: &Arc<Garage>, + garage: &Garage, version: &Version, part_number: u64, first_block: Vec<u8>, @@ -187,7 +191,7 @@ async fn read_and_put_blocks( let mut next_offset = first_block.len(); let mut put_curr_version_block = put_block_meta( - garage.clone(), + &garage, &version, part_number, 0, @@ -207,7 +211,7 @@ async fn read_and_put_blocks( let block_hash = blake2sum(&block[..]); let block_len = block.len(); put_curr_version_block = put_block_meta( - garage.clone(), + &garage, &version, part_number, next_offset as u64, @@ -231,14 +235,13 @@ async fn read_and_put_blocks( } async fn put_block_meta( - garage: Arc<Garage>, + garage: &Garage, version: &Version, part_number: u64, offset: u64, hash: Hash, size: u64, ) -> Result<(), GarageError> { - // TODO: don't clone, restart from empty block list ?? let mut version = version.clone(); version.blocks.put( VersionBlockKey { @@ -316,6 +319,7 @@ pub async fn handle_create_multipart_upload( let version_uuid = gen_uuid(); let headers = get_headers(req)?; + // Create object in object table let object_version = ObjectVersion { uuid: version_uuid, timestamp: now_msec(), @@ -324,6 +328,14 @@ pub async fn handle_create_multipart_upload( let object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]); garage.object_table.insert(&object).await?; + // Insert empty version so that block_ref entries refer to something + // (they are inserted concurrently with blocks in the version table, so + // there is the possibility that they are inserted before the version table + // is created, in which case it is allowed to delete them, e.g. in repair_*) + let version = Version::new(version_uuid, bucket.into(), key.into(), false); + garage.version_table.insert(&version).await?; + + // Send success response let mut xml = String::new(); writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap(); writeln!( @@ -450,14 +462,12 @@ pub async fn handle_complete_multipart_upload( )?; let object = object.ok_or(Error::BadRequest(format!("Object not found")))?; - let object_version = object + let mut object_version = object .versions() .iter() - .find(|v| v.uuid == version_uuid && v.is_uploading()); - let mut object_version = match object_version { - None => return Err(Error::NotFound), - Some(x) => x.clone(), - }; + .find(|v| v.uuid == version_uuid && v.is_uploading()) + .cloned() + .ok_or(Error::BadRequest(format!("Version not found")))?; let version = version.ok_or(Error::BadRequest(format!("Version not found")))?; if version.blocks.len() == 0 { @@ -498,12 +508,7 @@ pub async fn handle_complete_multipart_upload( let etag = format!("{}-{}", hex::encode(etag_md5_hasher.finalize()), num_parts); // Calculate total size of final object - let total_size = version - .blocks - .items() - .iter() - .map(|x| x.1.size) - .fold(0, |x, y| x + y); + let total_size = version.blocks.items().iter().map(|x| x.1.size).sum(); // Write final object version object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( |