aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/s3_copy.rs56
-rw-r--r--src/api/s3_delete.rs8
-rw-r--r--src/api/s3_put.rs43
-rw-r--r--src/garage/repair.rs20
-rw-r--r--src/model/block.rs10
5 files changed, 79 insertions, 58 deletions
diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs
index c6c30095..8407faee 100644
--- a/src/api/s3_copy.rs
+++ b/src/api/s3_copy.rs
@@ -41,45 +41,64 @@ pub async fn handle_copy(
};
let new_uuid = gen_uuid();
+ let new_timestamp = now_msec();
let dest_object_version = ObjectVersion {
uuid: new_uuid,
- timestamp: now_msec(),
+ timestamp: new_timestamp,
state: ObjectVersionState::Complete(source_last_state.clone()),
};
+ let dest_object = Object::new(
+ dest_bucket.to_string(),
+ dest_key.to_string(),
+ vec![dest_object_version],
+ );
- match &source_last_state {
+ match source_last_state {
ObjectVersionData::DeleteMarker => {
return Err(Error::NotFound);
}
ObjectVersionData::Inline(_meta, _bytes) => {
- let dest_object = Object::new(
- dest_bucket.to_string(),
- dest_key.to_string(),
- vec![dest_object_version],
- );
garage.object_table.insert(&dest_object).await?;
}
- ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
+ ObjectVersionData::FirstBlock(meta, _first_block_hash) => {
+ // Get block list from source version
let source_version = garage
.version_table
.get(&source_last_v.uuid, &EmptyKey)
.await?;
let source_version = source_version.ok_or(Error::NotFound)?;
+ // Write an "uploading" marker in Object table
+ // This holds a reference to the object in the Version table
+ // so that it won't be deleted, e.g. by repair_versions.
+ let tmp_dest_object_version = ObjectVersion {
+ uuid: new_uuid,
+ timestamp: new_timestamp,
+ state: ObjectVersionState::Uploading(meta.headers.clone()),
+ };
+ let tmp_dest_object = Object::new(
+ dest_bucket.to_string(),
+ dest_key.to_string(),
+ vec![tmp_dest_object_version],
+ );
+ garage.object_table.insert(&tmp_dest_object).await?;
+
+ // Write version in the version table. Even with empty block list,
+ // this means that the BlockRef entries linked to this version cannot be
+ // marked as deleted (they are marked as deleted only if the Version
+ // doesn't exist or is marked as deleted).
let mut dest_version = Version::new(
new_uuid,
dest_bucket.to_string(),
dest_key.to_string(),
false,
);
+ garage.version_table.insert(&dest_version).await?;
+
+ // Fill in block list for version and insert block refs
for (bk, bv) in source_version.blocks.items().iter() {
dest_version.blocks.put(*bk, *bv);
}
- let dest_object = Object::new(
- dest_bucket.to_string(),
- dest_key.to_string(),
- vec![dest_object_version],
- );
let dest_block_refs = dest_version
.blocks
.items()
@@ -91,14 +110,21 @@ pub async fn handle_copy(
})
.collect::<Vec<_>>();
futures::try_join!(
- garage.object_table.insert(&dest_object),
garage.version_table.insert(&dest_version),
garage.block_ref_table.insert_many(&dest_block_refs[..]),
)?;
+
+ // Insert final object
+ // We do this last because otherwise there is a race condition in the case where
+ // the copy call has the same source and destination (this happens, rclone does
+ // it to update the modification timestamp for instance). If we did this concurrently
+ // with the stuff before, the block's reference counts could be decremented before
+ // they are incremented again for the new version, leading to data being deleted.
+ garage.object_table.insert(&dest_object).await?;
}
}
- let now = Utc::now();
+ let now = Utc::now(); // FIXME use the unix timestamp from above
let last_modified = now.to_rfc3339_opts(SecondsFormat::Secs, true);
let mut xml = String::new();
writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap();
diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs
index 4b6a2b18..7f752566 100644
--- a/src/api/s3_delete.rs
+++ b/src/api/s3_delete.rs
@@ -29,16 +29,16 @@ async fn handle_delete_internal(
_ => true,
});
- let mut must_delete = None;
+ let mut version_to_delete = None;
let mut timestamp = now_msec();
for v in interesting_versions {
- if v.timestamp + 1 > timestamp || must_delete.is_none() {
- must_delete = Some(v.uuid);
+ if v.timestamp + 1 > timestamp || version_to_delete.is_none() {
+ version_to_delete = Some(v.uuid);
}
timestamp = std::cmp::max(timestamp, v.timestamp + 1);
}
- let deleted_version = must_delete.ok_or(Error::NotFound)?;
+ let deleted_version = version_to_delete.ok_or(Error::NotFound)?;
let version_uuid = gen_uuid();
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index c1774d6b..6f675e37 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -87,17 +87,21 @@ pub async fn handle_put(
// that we are uploading something
let mut object_version = ObjectVersion {
uuid: version_uuid,
- timestamp: now_msec(),
+ timestamp: version_timestamp,
state: ObjectVersionState::Uploading(headers.clone()),
};
let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
// Initialize corresponding entry in version table
+ // Write this entry now, even with empty block list,
+ // to prevent block_ref entries from being deleted (they can be deleted
+ // if the reference a version that isn't found in the version table)
let version = Version::new(version_uuid, bucket.into(), key.into(), false);
- let first_block_hash = blake2sum(&first_block[..]);
+ garage.version_table.insert(&version).await?;
// Transfer data and verify checksum
+ let first_block_hash = blake2sum(&first_block[..]);
let tx_result = read_and_put_blocks(
&garage,
&version,
@@ -173,7 +177,7 @@ fn ensure_checksum_matches(
}
async fn read_and_put_blocks(
- garage: &Arc<Garage>,
+ garage: &Garage,
version: &Version,
part_number: u64,
first_block: Vec<u8>,
@@ -187,7 +191,7 @@ async fn read_and_put_blocks(
let mut next_offset = first_block.len();
let mut put_curr_version_block = put_block_meta(
- garage.clone(),
+ &garage,
&version,
part_number,
0,
@@ -207,7 +211,7 @@ async fn read_and_put_blocks(
let block_hash = blake2sum(&block[..]);
let block_len = block.len();
put_curr_version_block = put_block_meta(
- garage.clone(),
+ &garage,
&version,
part_number,
next_offset as u64,
@@ -231,14 +235,13 @@ async fn read_and_put_blocks(
}
async fn put_block_meta(
- garage: Arc<Garage>,
+ garage: &Garage,
version: &Version,
part_number: u64,
offset: u64,
hash: Hash,
size: u64,
) -> Result<(), GarageError> {
- // TODO: don't clone, restart from empty block list ??
let mut version = version.clone();
version.blocks.put(
VersionBlockKey {
@@ -316,6 +319,7 @@ pub async fn handle_create_multipart_upload(
let version_uuid = gen_uuid();
let headers = get_headers(req)?;
+ // Create object in object table
let object_version = ObjectVersion {
uuid: version_uuid,
timestamp: now_msec(),
@@ -324,6 +328,14 @@ pub async fn handle_create_multipart_upload(
let object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]);
garage.object_table.insert(&object).await?;
+ // Insert empty version so that block_ref entries refer to something
+ // (they are inserted concurrently with blocks in the version table, so
+ // there is the possibility that they are inserted before the version table
+ // is created, in which case it is allowed to delete them, e.g. in repair_*)
+ let version = Version::new(version_uuid, bucket.into(), key.into(), false);
+ garage.version_table.insert(&version).await?;
+
+ // Send success response
let mut xml = String::new();
writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap();
writeln!(
@@ -450,14 +462,12 @@ pub async fn handle_complete_multipart_upload(
)?;
let object = object.ok_or(Error::BadRequest(format!("Object not found")))?;
- let object_version = object
+ let mut object_version = object
.versions()
.iter()
- .find(|v| v.uuid == version_uuid && v.is_uploading());
- let mut object_version = match object_version {
- None => return Err(Error::NotFound),
- Some(x) => x.clone(),
- };
+ .find(|v| v.uuid == version_uuid && v.is_uploading())
+ .cloned()
+ .ok_or(Error::BadRequest(format!("Version not found")))?;
let version = version.ok_or(Error::BadRequest(format!("Version not found")))?;
if version.blocks.len() == 0 {
@@ -498,12 +508,7 @@ pub async fn handle_complete_multipart_upload(
let etag = format!("{}-{}", hex::encode(etag_md5_hasher.finalize()), num_parts);
// Calculate total size of final object
- let total_size = version
- .blocks
- .items()
- .iter()
- .map(|x| x.1.size)
- .fold(0, |x, y| x + y);
+ let total_size = version.blocks.items().iter().map(|x| x.1.size).sum();
// Write final object version
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
diff --git a/src/garage/repair.rs b/src/garage/repair.rs
index 47fc1ae1..599c1965 100644
--- a/src/garage/repair.rs
+++ b/src/garage/repair.rs
@@ -82,13 +82,7 @@ impl Repair {
.versions()
.iter()
.any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted),
- None => {
- warn!(
- "Repair versions: object for version {:?} not found, skipping.",
- version
- );
- continue;
- }
+ None => false,
};
if !version_exists {
info!("Repair versions: marking version as deleted: {:?}", version);
@@ -127,16 +121,8 @@ impl Repair {
.version_table
.get(&block_ref.version, &EmptyKey)
.await?;
- let ref_exists = match version {
- Some(v) => !v.deleted.get(),
- None => {
- warn!(
- "Block ref repair: version for block ref {:?} not found, skipping.",
- block_ref
- );
- continue;
- }
- };
+ // The version might not exist if it has been GC'ed
+ let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false);
if !ref_exists {
info!(
"Repair block ref: marking block_ref as deleted: {:?}",
diff --git a/src/model/block.rs b/src/model/block.rs
index 9426f683..5934f20c 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -28,6 +28,7 @@ use crate::garage::Garage;
pub const INLINE_THRESHOLD: usize = 3072;
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42);
+const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
@@ -175,7 +176,10 @@ impl BlockManager {
if data::blake2sum(&data[..]) != *hash {
let _lock = self.data_dir_lock.lock().await;
- warn!("Block {:?} is corrupted. Renaming to .corrupted and resyncing.", hash);
+ warn!(
+ "Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
+ hash
+ );
let mut path2 = path.clone();
path2.set_extension(".corrupted");
fs::rename(path, path2).await?;
@@ -225,7 +229,7 @@ impl BlockManager {
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
let new_rc = self.rc.merge(&hash, vec![0])?;
if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
- self.put_to_resync(&hash, Duration::from_secs(0))?;
+ self.put_to_resync(&hash, BLOCK_GC_TIMEOUT)?;
}
Ok(())
}
@@ -470,7 +474,7 @@ impl BlockManager {
};
let mut hash = [0u8; 32];
hash.copy_from_slice(&hash_bytes[..]);
- self.put_to_resync(&hash.into(),Duration::from_secs(0))?;
+ self.put_to_resync(&hash.into(), Duration::from_secs(0))?;
}
if *must_exit.borrow() {