aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/model')
-rw-r--r--src/model/block_ref_table.rs14
-rw-r--r--src/model/bucket_table.rs7
-rw-r--r--src/model/key_table.rs8
-rw-r--r--src/model/object_table.rs53
-rw-r--r--src/model/version_table.rs37
5 files changed, 51 insertions, 68 deletions
diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs
index 5a7d9aa1..9ab67737 100644
--- a/src/model/block_ref_table.rs
+++ b/src/model/block_ref_table.rs
@@ -1,10 +1,8 @@
-use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use garage_util::background::*;
use garage_util::data::*;
-use garage_util::error::Error;
use garage_table::*;
@@ -42,24 +40,26 @@ pub struct BlockRefTable {
pub block_manager: Arc<BlockManager>,
}
-#[async_trait]
impl TableSchema for BlockRefTable {
type P = Hash;
type S = UUID;
type E = BlockRef;
type Filter = DeletedFilter;
- async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
+ fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let block = &old.as_ref().or(new.as_ref()).unwrap().block;
let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false);
let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false);
if is_after && !was_before {
- self.block_manager.block_incref(block)?;
+ if let Err(e) = self.block_manager.block_incref(block) {
+ warn!("block_incref failed for block {:?}: {}", block, e);
+ }
}
if was_before && !is_after {
- self.block_manager.block_decref(block)?;
+ if let Err(e) = self.block_manager.block_decref(block) {
+ warn!("block_decref failed for block {:?}: {}", block, e);
+ }
}
- Ok(())
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index a101555f..af42d551 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -1,10 +1,8 @@
-use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use garage_table::crdt::CRDT;
use garage_table::*;
-use garage_util::error::Error;
use crate::key_table::PermissionSet;
@@ -100,17 +98,12 @@ impl Entry<EmptyKey, String> for Bucket {
pub struct BucketTable;
-#[async_trait]
impl TableSchema for BucketTable {
type P = EmptyKey;
type S = String;
type E = Bucket;
type Filter = DeletedFilter;
- async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> {
- Ok(())
- }
-
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
filter.apply(entry.is_deleted())
}
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index 20da3cc6..5942df75 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -1,11 +1,8 @@
-use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use garage_table::crdt::CRDT;
use garage_table::*;
-use garage_util::error::Error;
-
use model010::key_table as prev;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
@@ -92,17 +89,12 @@ impl Entry<EmptyKey, String> for Key {
pub struct KeyTable;
-#[async_trait]
impl TableSchema for KeyTable {
type P = EmptyKey;
type S = String;
type E = Key;
type Filter = DeletedFilter;
- async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> {
- Ok(())
- }
-
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
filter.apply(entry.deleted.get())
}
diff --git a/src/model/object_table.rs b/src/model/object_table.rs
index 929b63f0..16cce72c 100644
--- a/src/model/object_table.rs
+++ b/src/model/object_table.rs
@@ -1,11 +1,9 @@
-use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
-use garage_util::error::Error;
use garage_table::table_sharded::*;
use garage_table::*;
@@ -191,41 +189,42 @@ pub struct ObjectTable {
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
}
-#[async_trait]
impl TableSchema for ObjectTable {
type P = String;
type S = String;
type E = Object;
type Filter = DeletedFilter;
- async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
+ fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let version_table = self.version_table.clone();
- if let (Some(old_v), Some(new_v)) = (old, new) {
- // Propagate deletion of old versions
- for v in old_v.versions.iter() {
- let newly_deleted = match new_v
- .versions
- .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
- {
- Err(_) => true,
- Ok(i) => {
- new_v.versions[i].state == ObjectVersionState::Aborted
- && v.state != ObjectVersionState::Aborted
+ self.background.spawn(async move {
+ if let (Some(old_v), Some(new_v)) = (old, new) {
+ // Propagate deletion of old versions
+ for v in old_v.versions.iter() {
+ let newly_deleted = match new_v
+ .versions
+ .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
+ {
+ Err(_) => true,
+ Ok(i) => {
+ new_v.versions[i].state == ObjectVersionState::Aborted
+ && v.state != ObjectVersionState::Aborted
+ }
+ };
+ if newly_deleted {
+ let deleted_version = Version::new(
+ v.uuid,
+ old_v.bucket.clone(),
+ old_v.key.clone(),
+ true,
+ vec![],
+ );
+ version_table.insert(&deleted_version).await?;
}
- };
- if newly_deleted {
- let deleted_version = Version::new(
- v.uuid,
- old_v.bucket.clone(),
- old_v.key.clone(),
- true,
- vec![],
- );
- version_table.insert(&deleted_version).await?;
}
}
- }
- Ok(())
+ Ok(())
+ })
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
diff --git a/src/model/version_table.rs b/src/model/version_table.rs
index 0d831998..cf9fbe98 100644
--- a/src/model/version_table.rs
+++ b/src/model/version_table.rs
@@ -1,10 +1,8 @@
-use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
-use garage_util::error::Error;
use garage_table::table_sharded::*;
use garage_table::*;
@@ -112,31 +110,32 @@ pub struct VersionTable {
pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
}
-#[async_trait]
impl TableSchema for VersionTable {
type P = Hash;
type S = EmptyKey;
type E = Version;
type Filter = DeletedFilter;
- async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
+ fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let block_ref_table = self.block_ref_table.clone();
- if let (Some(old_v), Some(new_v)) = (old, new) {
- // Propagate deletion of version blocks
- if new_v.deleted && !old_v.deleted {
- let deleted_block_refs = old_v
- .blocks
- .iter()
- .map(|vb| BlockRef {
- block: vb.hash,
- version: old_v.uuid,
- deleted: true,
- })
- .collect::<Vec<_>>();
- block_ref_table.insert_many(&deleted_block_refs[..]).await?;
+ self.background.spawn(async move {
+ if let (Some(old_v), Some(new_v)) = (old, new) {
+ // Propagate deletion of version blocks
+ if new_v.deleted && !old_v.deleted {
+ let deleted_block_refs = old_v
+ .blocks
+ .iter()
+ .map(|vb| BlockRef {
+ block: vb.hash,
+ version: old_v.uuid,
+ deleted: true,
+ })
+ .collect::<Vec<_>>();
+ block_ref_table.insert_many(&deleted_block_refs[..]).await?;
+ }
}
- }
- Ok(())
+ Ok(())
+ })
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {