aboutsummaryrefslogtreecommitdiff
path: root/src/object_table.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-11 18:51:11 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-11 18:51:11 +0200
commit53289b69e5037700689665b4edf20f2382ff15f6 (patch)
treee9920e1dce29e94bfddc3812b44ee2519ba14bed /src/object_table.rs
parent4a2624b76afff714a70ee7a9e4ffd97c54c7ecc4 (diff)
downloadgarage-53289b69e5037700689665b4edf20f2382ff15f6.tar.gz
garage-53289b69e5037700689665b4edf20f2382ff15f6.zip
Background task runner that replaces tokio::spawn
Diffstat (limited to 'src/object_table.rs')
-rw-r--r--src/object_table.rs40
1 files changed, 35 insertions, 5 deletions
diff --git a/src/object_table.rs b/src/object_table.rs
index 392e0dc7..7add9968 100644
--- a/src/object_table.rs
+++ b/src/object_table.rs
@@ -38,6 +38,12 @@ pub enum ObjectVersionData {
FirstBlock(Hash),
}
+impl ObjectVersion {
+ fn cmp_key(&self) -> (u64, &UUID) {
+ (self.timestamp, &self.uuid)
+ }
+}
+
impl Entry<String, String> for Object {
fn partition_key(&self) -> &String {
&self.bucket
@@ -48,9 +54,10 @@ impl Entry<String, String> for Object {
fn merge(&mut self, other: &Self) {
for other_v in other.versions.iter() {
- match self.versions.binary_search_by(|v| {
- (v.timestamp, &v.uuid).cmp(&(other_v.timestamp, &other_v.uuid))
- }) {
+ match self
+ .versions
+ .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key()))
+ {
Ok(i) => {
let mut v = &mut self.versions[i];
if other_v.size > v.size {
@@ -91,7 +98,30 @@ impl TableFormat for ObjectTable {
type E = Object;
async fn updated(&self, old: Option<&Self::E>, new: &Self::E) {
- //unimplemented!()
- // TODO
+ let old = old.cloned();
+ let new = new.clone();
+ let garage = self.garage.read().await.as_ref().cloned().unwrap();
+ garage.clone().background.spawn(async move {
+ // Propagate deletion of old versions
+ if let Some(old_v) = old {
+ for v in old_v.versions.iter() {
+ if new
+ .versions
+ .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
+ .is_err()
+ {
+ let deleted_version = Version {
+ uuid: v.uuid.clone(),
+ deleted: true,
+ blocks: vec![],
+ bucket: old_v.bucket.clone(),
+ key: old_v.key.clone(),
+ };
+ garage.version_table.insert(&deleted_version).await?;
+ }
+ }
+ }
+ Ok(())
+ });
}
}