diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-11 18:51:11 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-11 18:51:11 +0200 |
commit | 53289b69e5037700689665b4edf20f2382ff15f6 (patch) | |
tree | e9920e1dce29e94bfddc3812b44ee2519ba14bed /src/object_table.rs | |
parent | 4a2624b76afff714a70ee7a9e4ffd97c54c7ecc4 (diff) | |
download | garage-53289b69e5037700689665b4edf20f2382ff15f6.tar.gz garage-53289b69e5037700689665b4edf20f2382ff15f6.zip |
Background task runner that replaces tokio::spawn
Diffstat (limited to 'src/object_table.rs')
-rw-r--r-- | src/object_table.rs | 40 |
1 files changed, 35 insertions, 5 deletions
diff --git a/src/object_table.rs b/src/object_table.rs index 392e0dc7..7add9968 100644 --- a/src/object_table.rs +++ b/src/object_table.rs @@ -38,6 +38,12 @@ pub enum ObjectVersionData { FirstBlock(Hash), } +impl ObjectVersion { + fn cmp_key(&self) -> (u64, &UUID) { + (self.timestamp, &self.uuid) + } +} + impl Entry<String, String> for Object { fn partition_key(&self) -> &String { &self.bucket @@ -48,9 +54,10 @@ impl Entry<String, String> for Object { fn merge(&mut self, other: &Self) { for other_v in other.versions.iter() { - match self.versions.binary_search_by(|v| { - (v.timestamp, &v.uuid).cmp(&(other_v.timestamp, &other_v.uuid)) - }) { + match self + .versions + .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) + { Ok(i) => { let mut v = &mut self.versions[i]; if other_v.size > v.size { @@ -91,7 +98,30 @@ impl TableFormat for ObjectTable { type E = Object; async fn updated(&self, old: Option<&Self::E>, new: &Self::E) { - //unimplemented!() - // TODO + let old = old.cloned(); + let new = new.clone(); + let garage = self.garage.read().await.as_ref().cloned().unwrap(); + garage.clone().background.spawn(async move { + // Propagate deletion of old versions + if let Some(old_v) = old { + for v in old_v.versions.iter() { + if new + .versions + .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) + .is_err() + { + let deleted_version = Version { + uuid: v.uuid.clone(), + deleted: true, + blocks: vec![], + bucket: old_v.bucket.clone(), + key: old_v.key.clone(), + }; + garage.version_table.insert(&deleted_version).await?; + } + } + } + Ok(()) + }); } } |