diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/api/s3_copy.rs | 56 | ||||
-rw-r--r-- | src/api/s3_delete.rs | 8 | ||||
-rw-r--r-- | src/api/s3_put.rs | 43 | ||||
-rw-r--r-- | src/garage/repair.rs | 20 | ||||
-rw-r--r-- | src/model/block.rs | 10 |
5 files changed, 79 insertions, 58 deletions
diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs index c6c30095..8407faee 100644 --- a/src/api/s3_copy.rs +++ b/src/api/s3_copy.rs @@ -41,45 +41,64 @@ pub async fn handle_copy( }; let new_uuid = gen_uuid(); + let new_timestamp = now_msec(); let dest_object_version = ObjectVersion { uuid: new_uuid, - timestamp: now_msec(), + timestamp: new_timestamp, state: ObjectVersionState::Complete(source_last_state.clone()), }; + let dest_object = Object::new( + dest_bucket.to_string(), + dest_key.to_string(), + vec![dest_object_version], + ); - match &source_last_state { + match source_last_state { ObjectVersionData::DeleteMarker => { return Err(Error::NotFound); } ObjectVersionData::Inline(_meta, _bytes) => { - let dest_object = Object::new( - dest_bucket.to_string(), - dest_key.to_string(), - vec![dest_object_version], - ); garage.object_table.insert(&dest_object).await?; } - ObjectVersionData::FirstBlock(_meta, _first_block_hash) => { + ObjectVersionData::FirstBlock(meta, _first_block_hash) => { + // Get block list from source version let source_version = garage .version_table .get(&source_last_v.uuid, &EmptyKey) .await?; let source_version = source_version.ok_or(Error::NotFound)?; + // Write an "uploading" marker in Object table + // This holds a reference to the object in the Version table + // so that it won't be deleted, e.g. by repair_versions. + let tmp_dest_object_version = ObjectVersion { + uuid: new_uuid, + timestamp: new_timestamp, + state: ObjectVersionState::Uploading(meta.headers.clone()), + }; + let tmp_dest_object = Object::new( + dest_bucket.to_string(), + dest_key.to_string(), + vec![tmp_dest_object_version], + ); + garage.object_table.insert(&tmp_dest_object).await?; + + // Write version in the version table. Even with empty block list, + // this means that the BlockRef entries linked to this version cannot be + // marked as deleted (they are marked as deleted only if the Version + // doesn't exist or is marked as deleted). let mut dest_version = Version::new( new_uuid, dest_bucket.to_string(), dest_key.to_string(), false, ); + garage.version_table.insert(&dest_version).await?; + + // Fill in block list for version and insert block refs for (bk, bv) in source_version.blocks.items().iter() { dest_version.blocks.put(*bk, *bv); } - let dest_object = Object::new( - dest_bucket.to_string(), - dest_key.to_string(), - vec![dest_object_version], - ); let dest_block_refs = dest_version .blocks .items() @@ -91,14 +110,21 @@ pub async fn handle_copy( }) .collect::<Vec<_>>(); futures::try_join!( - garage.object_table.insert(&dest_object), garage.version_table.insert(&dest_version), garage.block_ref_table.insert_many(&dest_block_refs[..]), )?; + + // Insert final object + // We do this last because otherwise there is a race condition in the case where + // the copy call has the same source and destination (this happens, rclone does + // it to update the modification timestamp for instance). If we did this concurrently + // with the stuff before, the block's reference counts could be decremented before + // they are incremented again for the new version, leading to data being deleted. + garage.object_table.insert(&dest_object).await?; } } - let now = Utc::now(); + let now = Utc::now(); // FIXME use the unix timestamp from above let last_modified = now.to_rfc3339_opts(SecondsFormat::Secs, true); let mut xml = String::new(); writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap(); diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs index 4b6a2b18..7f752566 100644 --- a/src/api/s3_delete.rs +++ b/src/api/s3_delete.rs @@ -29,16 +29,16 @@ async fn handle_delete_internal( _ => true, }); - let mut must_delete = None; + let mut version_to_delete = None; let mut timestamp = now_msec(); for v in interesting_versions { - if v.timestamp + 1 > timestamp || must_delete.is_none() { - must_delete = Some(v.uuid); + if v.timestamp + 1 > timestamp || version_to_delete.is_none() { + version_to_delete = Some(v.uuid); } timestamp = std::cmp::max(timestamp, v.timestamp + 1); } - let deleted_version = must_delete.ok_or(Error::NotFound)?; + let deleted_version = version_to_delete.ok_or(Error::NotFound)?; let version_uuid = gen_uuid(); diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index c1774d6b..6f675e37 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -87,17 +87,21 @@ pub async fn handle_put( // that we are uploading something let mut object_version = ObjectVersion { uuid: version_uuid, - timestamp: now_msec(), + timestamp: version_timestamp, state: ObjectVersionState::Uploading(headers.clone()), }; let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; // Initialize corresponding entry in version table + // Write this entry now, even with empty block list, + // to prevent block_ref entries from being deleted (they can be deleted + // if the reference a version that isn't found in the version table) let version = Version::new(version_uuid, bucket.into(), key.into(), false); - let first_block_hash = blake2sum(&first_block[..]); + garage.version_table.insert(&version).await?; // Transfer data and verify checksum + let first_block_hash = blake2sum(&first_block[..]); let tx_result = read_and_put_blocks( &garage, &version, @@ -173,7 +177,7 @@ fn ensure_checksum_matches( } async fn read_and_put_blocks( - garage: &Arc<Garage>, + garage: &Garage, version: &Version, part_number: u64, first_block: Vec<u8>, @@ -187,7 +191,7 @@ async fn read_and_put_blocks( let mut next_offset = first_block.len(); let mut put_curr_version_block = put_block_meta( - garage.clone(), + &garage, &version, part_number, 0, @@ -207,7 +211,7 @@ async fn read_and_put_blocks( let block_hash = blake2sum(&block[..]); let block_len = block.len(); put_curr_version_block = put_block_meta( - garage.clone(), + &garage, &version, part_number, next_offset as u64, @@ -231,14 +235,13 @@ async fn read_and_put_blocks( } async fn put_block_meta( - garage: Arc<Garage>, + garage: &Garage, version: &Version, part_number: u64, offset: u64, hash: Hash, size: u64, ) -> Result<(), GarageError> { - // TODO: don't clone, restart from empty block list ?? let mut version = version.clone(); version.blocks.put( VersionBlockKey { @@ -316,6 +319,7 @@ pub async fn handle_create_multipart_upload( let version_uuid = gen_uuid(); let headers = get_headers(req)?; + // Create object in object table let object_version = ObjectVersion { uuid: version_uuid, timestamp: now_msec(), @@ -324,6 +328,14 @@ pub async fn handle_create_multipart_upload( let object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]); garage.object_table.insert(&object).await?; + // Insert empty version so that block_ref entries refer to something + // (they are inserted concurrently with blocks in the version table, so + // there is the possibility that they are inserted before the version table + // is created, in which case it is allowed to delete them, e.g. in repair_*) + let version = Version::new(version_uuid, bucket.into(), key.into(), false); + garage.version_table.insert(&version).await?; + + // Send success response let mut xml = String::new(); writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap(); writeln!( @@ -450,14 +462,12 @@ pub async fn handle_complete_multipart_upload( )?; let object = object.ok_or(Error::BadRequest(format!("Object not found")))?; - let object_version = object + let mut object_version = object .versions() .iter() - .find(|v| v.uuid == version_uuid && v.is_uploading()); - let mut object_version = match object_version { - None => return Err(Error::NotFound), - Some(x) => x.clone(), - }; + .find(|v| v.uuid == version_uuid && v.is_uploading()) + .cloned() + .ok_or(Error::BadRequest(format!("Version not found")))?; let version = version.ok_or(Error::BadRequest(format!("Version not found")))?; if version.blocks.len() == 0 { @@ -498,12 +508,7 @@ pub async fn handle_complete_multipart_upload( let etag = format!("{}-{}", hex::encode(etag_md5_hasher.finalize()), num_parts); // Calculate total size of final object - let total_size = version - .blocks - .items() - .iter() - .map(|x| x.1.size) - .fold(0, |x, y| x + y); + let total_size = version.blocks.items().iter().map(|x| x.1.size).sum(); // Write final object version object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( diff --git a/src/garage/repair.rs b/src/garage/repair.rs index 47fc1ae1..599c1965 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -82,13 +82,7 @@ impl Repair { .versions() .iter() .any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted), - None => { - warn!( - "Repair versions: object for version {:?} not found, skipping.", - version - ); - continue; - } + None => false, }; if !version_exists { info!("Repair versions: marking version as deleted: {:?}", version); @@ -127,16 +121,8 @@ impl Repair { .version_table .get(&block_ref.version, &EmptyKey) .await?; - let ref_exists = match version { - Some(v) => !v.deleted.get(), - None => { - warn!( - "Block ref repair: version for block ref {:?} not found, skipping.", - block_ref - ); - continue; - } - }; + // The version might not exist if it has been GC'ed + let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false); if !ref_exists { info!( "Repair block ref: marking block_ref as deleted: {:?}", diff --git a/src/model/block.rs b/src/model/block.rs index 9426f683..5934f20c 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -28,6 +28,7 @@ use crate::garage::Garage; pub const INLINE_THRESHOLD: usize = 3072; const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42); +const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60); const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); @@ -175,7 +176,10 @@ impl BlockManager { if data::blake2sum(&data[..]) != *hash { let _lock = self.data_dir_lock.lock().await; - warn!("Block {:?} is corrupted. Renaming to .corrupted and resyncing.", hash); + warn!( + "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", + hash + ); let mut path2 = path.clone(); path2.set_extension(".corrupted"); fs::rename(path, path2).await?; @@ -225,7 +229,7 @@ impl BlockManager { pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { let new_rc = self.rc.merge(&hash, vec![0])?; if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { - self.put_to_resync(&hash, Duration::from_secs(0))?; + self.put_to_resync(&hash, BLOCK_GC_TIMEOUT)?; } Ok(()) } @@ -470,7 +474,7 @@ impl BlockManager { }; let mut hash = [0u8; 32]; hash.copy_from_slice(&hash_bytes[..]); - self.put_to_resync(&hash.into(),Duration::from_secs(0))?; + self.put_to_resync(&hash.into(), Duration::from_secs(0))?; } if *must_exit.borrow() { |