From f319a7d3740ba8b83c9c0eae27edfda1c1d14c03 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 10 Mar 2021 16:21:56 +0100 Subject: Refactor model stuff, including cleaner CRDTs --- src/model/block.rs | 2 +- src/model/block_ref_table.rs | 17 ++++---- src/model/bucket_table.rs | 2 + src/model/garage.rs | 1 - src/model/key_table.rs | 13 ++++-- src/model/object_table.rs | 39 ++++++++---------- src/model/version_table.rs | 96 ++++++++++++++++++-------------------------- 7 files changed, 77 insertions(+), 93 deletions(-) (limited to 'src/model') diff --git a/src/model/block.rs b/src/model/block.rs index 56c85c6a..d3957403 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -420,7 +420,7 @@ impl BlockManager { if Some(&block_ref.block) == last_hash.as_ref() { continue; } - if !block_ref.deleted { + if !block_ref.deleted.get() { last_hash = Some(block_ref.block); self.put_to_resync(&block_ref.block, 0)?; } diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs index 9ab67737..07fa5144 100644 --- a/src/model/block_ref_table.rs +++ b/src/model/block_ref_table.rs @@ -1,9 +1,9 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; -use garage_util::background::*; use garage_util::data::*; +use garage_table::crdt::CRDT; use garage_table::*; use crate::block::*; @@ -17,7 +17,7 @@ pub struct BlockRef { pub version: UUID, // Keep track of deleted status - pub deleted: bool, + pub deleted: crdt::Bool, } impl Entry for BlockRef { @@ -27,16 +27,15 @@ impl Entry for BlockRef { fn sort_key(&self) -> &UUID { &self.version } +} +impl CRDT for BlockRef { fn merge(&mut self, other: &Self) { - if other.deleted { - self.deleted = true; - } + self.deleted.merge(&other.deleted); } } pub struct BlockRefTable { - pub background: Arc, pub block_manager: Arc, } @@ -48,8 +47,8 @@ impl TableSchema for BlockRefTable { fn updated(&self, old: Option, new: Option) { let block = &old.as_ref().or(new.as_ref()).unwrap().block; - let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false); - let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false); + let was_before = old.as_ref().map(|x| !x.deleted.get()).unwrap_or(false); + let is_after = new.as_ref().map(|x| !x.deleted.get()).unwrap_or(false); if is_after && !was_before { if let Err(e) = self.block_manager.block_incref(block) { warn!("block_incref failed for block {:?}: {}", block, e); @@ -63,6 +62,6 @@ impl TableSchema for BlockRefTable { } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted) + filter.apply(entry.deleted.get()) } } diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 2878aa38..5bc8b7f9 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -89,7 +89,9 @@ impl Entry for Bucket { fn sort_key(&self) -> &String { &self.name } +} +impl CRDT for Bucket { fn merge(&mut self, other: &Self) { self.state.merge(&other.state); } diff --git a/src/model/garage.rs b/src/model/garage.rs index 467d0aec..d109fdaa 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -79,7 +79,6 @@ impl Garage { info!("Initialize block_ref_table..."); let block_ref_table = Table::new( BlockRefTable { - background: background.clone(), block_manager: block_manager.clone(), }, data_rep_param.clone(), diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 5942df75..6d8cc6c0 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -use garage_table::crdt::CRDT; +use garage_table::crdt::*; use garage_table::*; use model010::key_table as prev; @@ -66,6 +66,10 @@ pub struct PermissionSet { pub allow_write: bool, } +impl AutoCRDT for PermissionSet { + const WARN_IF_DIFFERENT: bool = true; +} + impl Entry for Key { fn partition_key(&self) -> &EmptyKey { &EmptyKey @@ -73,17 +77,18 @@ impl Entry for Key { fn sort_key(&self) -> &String { &self.key_id } +} +impl CRDT for Key { fn merge(&mut self, other: &Self) { self.name.merge(&other.name); self.deleted.merge(&other.deleted); if self.deleted.get() { self.authorized_buckets.clear(); - return; + } else { + self.authorized_buckets.merge(&other.authorized_buckets); } - - self.authorized_buckets.merge(&other.authorized_buckets); } } diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 16cce72c..75c37f6d 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use garage_util::background::BackgroundRunner; use garage_util::data::*; +use garage_table::crdt::*; use garage_table::table_sharded::*; use garage_table::*; @@ -70,7 +71,7 @@ pub enum ObjectVersionState { Aborted, } -impl ObjectVersionState { +impl CRDT for ObjectVersionState { fn merge(&mut self, other: &Self) { use ObjectVersionState::*; match other { @@ -91,37 +92,30 @@ impl ObjectVersionState { } } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionData { DeleteMarker, Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec), FirstBlock(ObjectVersionMeta, Hash), } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +impl AutoCRDT for ObjectVersionData { + const WARN_IF_DIFFERENT: bool = true; +} + +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersionMeta { pub headers: ObjectVersionHeaders, pub size: u64, pub etag: String, } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersionHeaders { pub content_type: String, pub other: BTreeMap, } -impl ObjectVersionData { - fn merge(&mut self, b: &Self) { - if *self != *b { - warn!( - "Inconsistent object version data: {:?} (local) vs {:?} (remote)", - self, b - ); - } - } -} - impl ObjectVersion { fn cmp_key(&self) -> (u64, UUID) { (self.timestamp, self.uuid) @@ -154,8 +148,11 @@ impl Entry for Object { fn sort_key(&self) -> &String { &self.key } +} +impl CRDT for Object { fn merge(&mut self, other: &Self) { + // Merge versions from other into here for other_v in other.versions.iter() { match self .versions @@ -169,6 +166,9 @@ impl Entry for Object { } } } + + // Remove versions which are obsolete, i.e. those that come + // before the last version which .is_complete(). let last_complete = self .versions .iter() @@ -212,13 +212,8 @@ impl TableSchema for ObjectTable { } }; if newly_deleted { - let deleted_version = Version::new( - v.uuid, - old_v.bucket.clone(), - old_v.key.clone(), - true, - vec![], - ); + let deleted_version = + Version::new(v.uuid, old_v.bucket.clone(), old_v.key.clone(), true); version_table.insert(&deleted_version).await?; } } diff --git a/src/model/version_table.rs b/src/model/version_table.rs index cf9fbe98..26abb64e 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use garage_util::background::BackgroundRunner; use garage_util::data::*; +use garage_table::crdt::*; use garage_table::table_sharded::*; use garage_table::*; @@ -15,8 +16,8 @@ pub struct Version { pub uuid: UUID, // Actual data: the blocks for this version - pub deleted: bool, - blocks: Vec, + pub deleted: crdt::Bool, + pub blocks: crdt::Map, // Back link to bucket+key so that we can figure if // this was deleted later on @@ -25,56 +26,45 @@ pub struct Version { } impl Version { - pub fn new( - uuid: UUID, - bucket: String, - key: String, - deleted: bool, - blocks: Vec, - ) -> Self { - let mut ret = Self { + pub fn new(uuid: UUID, bucket: String, key: String, deleted: bool) -> Self { + Self { uuid, - deleted, - blocks: vec![], + deleted: deleted.into(), + blocks: crdt::Map::new(), bucket, key, - }; - for b in blocks { - ret.add_block(b) - .expect("Twice the same VersionBlock in Version constructor"); } - ret } - /// Adds a block if it wasn't already present - pub fn add_block(&mut self, new: VersionBlock) -> Result<(), ()> { - match self - .blocks - .binary_search_by(|b| b.cmp_key().cmp(&new.cmp_key())) - { - Err(i) => { - self.blocks.insert(i, new); - Ok(()) - } - Ok(_) => Err(()), - } +} + +#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] +pub struct VersionBlockKey { + pub part_number: u64, + pub offset: u64, +} + +impl Ord for VersionBlockKey { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.part_number + .cmp(&other.part_number) + .then(self.offset.cmp(&other.offset)) } - pub fn blocks(&self) -> &[VersionBlock] { - &self.blocks[..] +} + +impl PartialOrd for VersionBlockKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) } } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] pub struct VersionBlock { - pub part_number: u64, - pub offset: u64, pub hash: Hash, pub size: u64, } -impl VersionBlock { - fn cmp_key(&self) -> (u64, u64) { - (self.part_number, self.offset) - } +impl AutoCRDT for VersionBlock { + const WARN_IF_DIFFERENT: bool = true; } impl Entry for Version { @@ -84,23 +74,16 @@ impl Entry for Version { fn sort_key(&self) -> &EmptyKey { &EmptyKey } +} +impl CRDT for Version { fn merge(&mut self, other: &Self) { - if other.deleted { - self.deleted = true; + self.deleted.merge(&other.deleted); + + if self.deleted.get() { self.blocks.clear(); - } else if !self.deleted { - for bi in other.blocks.iter() { - match self - .blocks - .binary_search_by(|x| x.cmp_key().cmp(&bi.cmp_key())) - { - Ok(_) => (), - Err(pos) => { - self.blocks.insert(pos, bi.clone()); - } - } - } + } else { + self.blocks.merge(&other.blocks); } } } @@ -121,14 +104,15 @@ impl TableSchema for VersionTable { self.background.spawn(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of version blocks - if new_v.deleted && !old_v.deleted { + if new_v.deleted.get() && !old_v.deleted.get() { let deleted_block_refs = old_v .blocks + .items() .iter() - .map(|vb| BlockRef { + .map(|(_k, vb)| BlockRef { block: vb.hash, version: old_v.uuid, - deleted: true, + deleted: true.into(), }) .collect::>(); block_ref_table.insert_many(&deleted_block_refs[..]).await?; @@ -139,6 +123,6 @@ impl TableSchema for VersionTable { } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted) + filter.apply(entry.deleted.get()) } } -- cgit v1.2.3 From 0fd7df8fa0741caa622b2bae979aac867f443154 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 10 Mar 2021 16:33:31 +0100 Subject: Switch to blake2 sum for identifying blocks by their data --- src/model/block.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/model') diff --git a/src/model/block.rs b/src/model/block.rs index d3957403..57f4c077 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -176,7 +176,7 @@ impl BlockManager { f.read_to_end(&mut data).await?; drop(f); - if data::sha256sum(&data[..]) != *hash { + if data::blake2sum(&data[..]) != *hash { let _lock = self.data_dir_lock.lock().await; warn!("Block {:?} is corrupted. Deleting and resyncing.", hash); fs::remove_file(path).await?; -- cgit v1.2.3 From 445912dc6a3b65d0726b9378b3542b4061272cf4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 10 Mar 2021 16:38:31 +0100 Subject: Remove migration paths from 0.1 branch --- src/model/Cargo.toml | 1 - src/model/bucket_table.rs | 40 ------------------------------------ src/model/key_table.rs | 28 ------------------------- src/model/object_table.rs | 52 ----------------------------------------------- 4 files changed, 121 deletions(-) (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 48b75d24..caeed66c 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -16,7 +16,6 @@ path = "lib.rs" garage_util = { version = "0.1.1", path = "../util" } garage_rpc = { version = "0.1.1", path = "../rpc" } garage_table = { version = "0.1.1", path = "../table" } -model010 = { package = "garage_model_010b", version = "0.0.1" } bytes = "0.4" rand = "0.7" diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 5bc8b7f9..6330dced 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -5,11 +5,6 @@ use garage_table::*; use crate::key_table::PermissionSet; -// We import the same file but in its version 0.1.0. -// We can then access v0.1.0 data structures. -// We use them to perform migrations. -use model010::bucket_table as prev; - /// A bucket is a collection of objects /// /// Its parameters are not directly accessible as: @@ -108,39 +103,4 @@ impl TableSchema for BucketTable { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { filter.apply(entry.is_deleted()) } - - fn try_migrate(bytes: &[u8]) -> Option { - let old = match rmp_serde::decode::from_read_ref::<_, prev::Bucket>(bytes) { - Ok(x) => x, - Err(_) => return None, - }; - if old.deleted { - Some(Bucket { - name: old.name, - state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Deleted), - }) - } else { - let mut keys = crdt::LWWMap::new(); - for ak in old.authorized_keys() { - keys.merge(&crdt::LWWMap::migrate_from_raw_item( - ak.key_id.clone(), - ak.timestamp, - PermissionSet { - allow_read: ak.allow_read, - allow_write: ak.allow_write, - }, - )); - } - - let params = BucketParams { - authorized_keys: keys, - website: crdt::LWW::new(false), - }; - - Some(Bucket { - name: old.name, - state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Present(params)), - }) - } - } } diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 6d8cc6c0..b4ab65b6 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -3,8 +3,6 @@ use serde::{Deserialize, Serialize}; use garage_table::crdt::*; use garage_table::*; -use model010::key_table as prev; - #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Key { // Primary key @@ -103,30 +101,4 @@ impl TableSchema for KeyTable { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { filter.apply(entry.deleted.get()) } - - fn try_migrate(bytes: &[u8]) -> Option { - let old = match rmp_serde::decode::from_read_ref::<_, prev::Key>(bytes) { - Ok(x) => x, - Err(_) => return None, - }; - let mut new = Self::E { - key_id: old.key_id.clone(), - secret_key: old.secret_key.clone(), - name: crdt::LWW::migrate_from_raw(old.name_timestamp, old.name.clone()), - deleted: crdt::Bool::new(old.deleted), - authorized_buckets: crdt::LWWMap::new(), - }; - for ab in old.authorized_buckets() { - let it = crdt::LWWMap::migrate_from_raw_item( - ab.bucket.clone(), - ab.timestamp, - PermissionSet { - allow_read: ab.allow_read, - allow_write: ab.allow_write, - }, - ); - new.authorized_buckets.merge(&it); - } - Some(new) - } } diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 75c37f6d..3280e7b5 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -11,8 +11,6 @@ use garage_table::*; use crate::version_table::*; -use model010::object_table as prev; - #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { // Primary key @@ -226,55 +224,5 @@ impl TableSchema for ObjectTable { let deleted = !entry.versions.iter().any(|v| v.is_data()); filter.apply(deleted) } - - fn try_migrate(bytes: &[u8]) -> Option { - let old = match rmp_serde::decode::from_read_ref::<_, prev::Object>(bytes) { - Ok(x) => x, - Err(_) => return None, - }; - let new_v = old - .versions() - .iter() - .map(migrate_version) - .collect::>(); - let new = Object::new(old.bucket.clone(), old.key.clone(), new_v); - Some(new) - } } -fn migrate_version(old: &prev::ObjectVersion) -> ObjectVersion { - let headers = ObjectVersionHeaders { - content_type: old.mime_type.clone(), - other: BTreeMap::new(), - }; - let meta = ObjectVersionMeta { - headers: headers.clone(), - size: old.size, - etag: "".to_string(), - }; - let state = match old.state { - prev::ObjectVersionState::Uploading => ObjectVersionState::Uploading(headers), - prev::ObjectVersionState::Aborted => ObjectVersionState::Aborted, - prev::ObjectVersionState::Complete => match &old.data { - prev::ObjectVersionData::Uploading => ObjectVersionState::Uploading(headers), - prev::ObjectVersionData::DeleteMarker => { - ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) - } - prev::ObjectVersionData::Inline(x) => { - ObjectVersionState::Complete(ObjectVersionData::Inline(meta, x.clone())) - } - prev::ObjectVersionData::FirstBlock(h) => { - let mut hash = [0u8; 32]; - hash.copy_from_slice(h.as_ref()); - ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, Hash::from(hash))) - } - }, - }; - let mut uuid = [0u8; 32]; - uuid.copy_from_slice(old.uuid.as_ref()); - ObjectVersion { - uuid: UUID::from(uuid), - timestamp: old.timestamp, - state, - } -} -- cgit v1.2.3 From af7600f989d79d07253405647973828435f9d16c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 10 Mar 2021 17:01:05 +0100 Subject: Correctly implement CompleteMultipartUpload with etag check of parts --- src/model/version_table.rs | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'src/model') diff --git a/src/model/version_table.rs b/src/model/version_table.rs index 26abb64e..7ccc6a33 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -16,8 +16,11 @@ pub struct Version { pub uuid: UUID, // Actual data: the blocks for this version + // In the case of a multipart upload, also store the etags + // of individual parts and check them when doing CompleteMultipartUpload pub deleted: crdt::Bool, pub blocks: crdt::Map, + pub parts_etags: crdt::Map, // Back link to bucket+key so that we can figure if // this was deleted later on @@ -31,6 +34,7 @@ impl Version { uuid, deleted: deleted.into(), blocks: crdt::Map::new(), + parts_etags: crdt::Map::new(), bucket, key, } @@ -82,8 +86,10 @@ impl CRDT for Version { if self.deleted.get() { self.blocks.clear(); + self.parts_etags.clear(); } else { self.blocks.merge(&other.blocks); + self.parts_etags.merge(&other.parts_etags); } } } -- cgit v1.2.3 From 3214dd52dd144c99353830d7340ea158e262b06f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 10 Mar 2021 21:50:09 +0100 Subject: Very minor changes --- src/model/object_table.rs | 1 - 1 file changed, 1 deletion(-) (limited to 'src/model') diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 3280e7b5..cd09f678 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -225,4 +225,3 @@ impl TableSchema for ObjectTable { filter.apply(deleted) } } - -- cgit v1.2.3 From 8d63738cb062e816fc01c6aa2b32936ad31ff65b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Mar 2021 13:47:21 +0100 Subject: Checkpoint: add merkle tree in data table --- src/model/block.rs | 14 ++++++-------- src/model/garage.rs | 19 +++++++------------ 2 files changed, 13 insertions(+), 20 deletions(-) (limited to 'src/model') diff --git a/src/model/block.rs b/src/model/block.rs index 57f4c077..ba5544a3 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -127,18 +127,16 @@ impl BlockManager { } } - pub async fn spawn_background_worker(self: Arc) { + pub fn spawn_background_worker(self: Arc) { // Launch 2 simultaneous workers for background resync loop preprocessing - for i in 0..2usize { + for i in 0..2u64 { let bm2 = self.clone(); let background = self.system.background.clone(); tokio::spawn(async move { - tokio::time::delay_for(Duration::from_secs(10)).await; - background - .spawn_worker(format!("block resync worker {}", i), move |must_exit| { - bm2.resync_loop(must_exit) - }) - .await; + tokio::time::delay_for(Duration::from_secs(10 * (i + 1))).await; + background.spawn_worker(format!("block resync worker {}", i), move |must_exit| { + bm2.resync_loop(must_exit) + }); }); } } diff --git a/src/model/garage.rs b/src/model/garage.rs index d109fdaa..aac79a85 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -35,7 +35,7 @@ pub struct Garage { } impl Garage { - pub async fn new( + pub fn new( config: Config, db: sled::Db, background: Arc, @@ -86,8 +86,7 @@ impl Garage { &db, "block_ref".to_string(), rpc_server, - ) - .await; + ); info!("Initialize version_table..."); let version_table = Table::new( @@ -100,8 +99,7 @@ impl Garage { &db, "version".to_string(), rpc_server, - ) - .await; + ); info!("Initialize object_table..."); let object_table = Table::new( @@ -114,8 +112,7 @@ impl Garage { &db, "object".to_string(), rpc_server, - ) - .await; + ); info!("Initialize bucket_table..."); let bucket_table = Table::new( @@ -125,8 +122,7 @@ impl Garage { &db, "bucket".to_string(), rpc_server, - ) - .await; + ); info!("Initialize key_table_table..."); let key_table = Table::new( @@ -136,8 +132,7 @@ impl Garage { &db, "key".to_string(), rpc_server, - ) - .await; + ); info!("Initialize Garage..."); let garage = Arc::new(Self { @@ -155,7 +150,7 @@ impl Garage { info!("Start block manager background thread..."); garage.block_manager.garage.swap(Some(garage.clone())); - garage.block_manager.clone().spawn_background_worker().await; + garage.block_manager.clone().spawn_background_worker(); garage } -- cgit v1.2.3 From 94f3d287742ff90f179f528421c690b00b71a912 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Mar 2021 16:54:15 +0100 Subject: WIP big refactoring --- src/model/block.rs | 5 ++--- src/model/garage.rs | 4 ++-- src/model/object_table.rs | 2 +- src/model/version_table.rs | 2 +- 4 files changed, 6 insertions(+), 7 deletions(-) (limited to 'src/model') diff --git a/src/model/block.rs b/src/model/block.rs index ba5544a3..987ec9e4 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -19,8 +19,7 @@ use garage_rpc::membership::System; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; -use garage_table::table_sharded::TableShardedReplication; -use garage_table::TableReplication; +use garage_table::replication::{sharded::TableShardedReplication, TableReplication}; use crate::block_ref_table::*; @@ -412,7 +411,7 @@ impl BlockManager { let garage = self.garage.load_full().unwrap(); let mut last_hash = None; let mut i = 0usize; - for entry in garage.block_ref_table.store.iter() { + for entry in garage.block_ref_table.data.store.iter() { let (_k, v_bytes) = entry?; let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(v_bytes.as_ref())?; if Some(&block_ref.block) == last_hash.as_ref() { diff --git a/src/model/garage.rs b/src/model/garage.rs index aac79a85..193c71d2 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -7,8 +7,8 @@ use garage_rpc::membership::System; use garage_rpc::rpc_client::RpcHttpClient; use garage_rpc::rpc_server::RpcServer; -use garage_table::table_fullcopy::*; -use garage_table::table_sharded::*; +use garage_table::replication::sharded::*; +use garage_table::replication::fullcopy::*; use garage_table::*; use crate::block::*; diff --git a/src/model/object_table.rs b/src/model/object_table.rs index cd09f678..99fad3ce 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -6,7 +6,7 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_table::crdt::*; -use garage_table::table_sharded::*; +use garage_table::replication::sharded::*; use garage_table::*; use crate::version_table::*; diff --git a/src/model/version_table.rs b/src/model/version_table.rs index 7ccc6a33..cdc73a85 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -5,7 +5,7 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_table::crdt::*; -use garage_table::table_sharded::*; +use garage_table::replication::sharded::*; use garage_table::*; use crate::block_ref_table::*; -- cgit v1.2.3 From 046b649bcc3b147140fc2b0af0e071d3dd1b2c8d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Mar 2021 18:28:03 +0100 Subject: (not well tested) use merkle tree for sync --- src/model/garage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/model') diff --git a/src/model/garage.rs b/src/model/garage.rs index 193c71d2..ced3c29e 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -7,8 +7,8 @@ use garage_rpc::membership::System; use garage_rpc::rpc_client::RpcHttpClient; use garage_rpc::rpc_server::RpcServer; -use garage_table::replication::sharded::*; use garage_table::replication::fullcopy::*; +use garage_table::replication::sharded::*; use garage_table::*; use crate::block::*; -- cgit v1.2.3 From 3f7a496355bdbeeeee859912fa6fa7a95cb47f3b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Mar 2021 19:06:27 +0100 Subject: More security: don't delete stuff too easily --- src/model/block.rs | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src/model') diff --git a/src/model/block.rs b/src/model/block.rs index 987ec9e4..a3bbe43d 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -305,6 +305,9 @@ impl BlockManager { let ring = self.system.ring.borrow().clone(); let mut who = self.replication.replication_nodes(&hash, &ring); + if who.len() < self.replication.write_quorum(&self.system) { + return Err(Error::Message(format!("Not trying to offload block because we don't have a quorum of nodes to write to"))); + } who.retain(|id| *id != self.system.id); let msg = Arc::new(Message::NeedBlockQuery(*hash)); -- cgit v1.2.3 From c475471e7a8e7544f2be490898f4249cf27a17e9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 12 Mar 2021 19:57:37 +0100 Subject: Implement table gc, currently for block_ref and version only --- src/model/block.rs | 18 ++++++------------ src/model/block_ref_table.rs | 3 +++ src/model/version_table.rs | 3 +++ 3 files changed, 12 insertions(+), 12 deletions(-) (limited to 'src/model') diff --git a/src/model/block.rs b/src/model/block.rs index a3bbe43d..8b065c04 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -248,9 +248,7 @@ impl BlockManager { let time_msec = u64_from_bytes(&time_bytes[0..8]); let now = now_msec(); if now >= time_msec { - let mut hash = [0u8; 32]; - hash.copy_from_slice(hash_bytes.as_ref()); - let hash = Hash::from(hash); + let hash = Hash::try_from(&hash_bytes[..]).unwrap(); if let Err(e) = self.resync_iter(&hash).await { warn!("Failed to resync block {:?}, retrying later: {}", hash, e); @@ -340,15 +338,11 @@ impl BlockManager { need_nodes.len() ); - let put_block_message = Arc::new(self.read_block(hash).await?); - let put_resps = join_all(need_nodes.iter().map(|to| { - self.rpc_client - .call_arc(*to, put_block_message.clone(), BLOCK_RW_TIMEOUT) - })) - .await; - for resp in put_resps { - resp?; - } + let put_block_message = self.read_block(hash).await?; + self.rpc_client.try_call_many( + &need_nodes[..], + put_block_message, + RequestStrategy::with_quorum(need_nodes.len()).with_timeout(BLOCK_RW_TIMEOUT)).await?; } trace!( "Deleting block {:?}, offload finished ({} / {})", diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs index 07fa5144..e4372717 100644 --- a/src/model/block_ref_table.rs +++ b/src/model/block_ref_table.rs @@ -27,6 +27,9 @@ impl Entry for BlockRef { fn sort_key(&self) -> &UUID { &self.version } + fn is_tombstone(&self) -> bool { + self.deleted.get() + } } impl CRDT for BlockRef { diff --git a/src/model/version_table.rs b/src/model/version_table.rs index cdc73a85..841fbfea 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -78,6 +78,9 @@ impl Entry for Version { fn sort_key(&self) -> &EmptyKey { &EmptyKey } + fn is_tombstone(&self) -> bool { + self.deleted.get() + } } impl CRDT for Version { -- cgit v1.2.3 From 831eb35763fdaeecb7b6d6aa13ebd78da14db04e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 12 Mar 2021 21:52:19 +0100 Subject: cargo fmt --- src/model/block.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'src/model') diff --git a/src/model/block.rs b/src/model/block.rs index 8b065c04..eccc2cbd 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -339,10 +339,14 @@ impl BlockManager { ); let put_block_message = self.read_block(hash).await?; - self.rpc_client.try_call_many( + self.rpc_client + .try_call_many( &need_nodes[..], put_block_message, - RequestStrategy::with_quorum(need_nodes.len()).with_timeout(BLOCK_RW_TIMEOUT)).await?; + RequestStrategy::with_quorum(need_nodes.len()) + .with_timeout(BLOCK_RW_TIMEOUT), + ) + .await?; } trace!( "Deleting block {:?}, offload finished ({} / {})", -- cgit v1.2.3 From bdcbdd1cd854bd8125458af0ac20b8682f810967 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 14:46:37 +0100 Subject: Fix list API bug --- src/model/block.rs | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) (limited to 'src/model') diff --git a/src/model/block.rs b/src/model/block.rs index eccc2cbd..9426f683 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -165,7 +165,7 @@ impl BlockManager { Ok(f) => f, Err(e) => { // Not found but maybe we should have had it ?? - self.put_to_resync(hash, 0)?; + self.put_to_resync(hash, Duration::from_millis(0))?; return Err(Into::into(e)); } }; @@ -175,9 +175,11 @@ impl BlockManager { if data::blake2sum(&data[..]) != *hash { let _lock = self.data_dir_lock.lock().await; - warn!("Block {:?} is corrupted. Deleting and resyncing.", hash); - fs::remove_file(path).await?; - self.put_to_resync(&hash, 0)?; + warn!("Block {:?} is corrupted. Renaming to .corrupted and resyncing.", hash); + let mut path2 = path.clone(); + path2.set_extension(".corrupted"); + fs::rename(path, path2).await?; + self.put_to_resync(&hash, Duration::from_millis(0))?; return Err(Error::CorruptData(*hash)); } @@ -215,7 +217,7 @@ impl BlockManager { let old_rc = self.rc.get(&hash)?; self.rc.merge(&hash, vec![1])?; if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { - self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?; + self.put_to_resync(&hash, BLOCK_RW_TIMEOUT)?; } Ok(()) } @@ -223,13 +225,13 @@ impl BlockManager { pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { let new_rc = self.rc.merge(&hash, vec![0])?; if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { - self.put_to_resync(&hash, 0)?; + self.put_to_resync(&hash, Duration::from_secs(0))?; } Ok(()) } - fn put_to_resync(&self, hash: &Hash, delay_millis: u64) -> Result<(), Error> { - let when = now_msec() + delay_millis; + fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> { + let when = now_msec() + delay.as_millis() as u64; trace!("Put resync_queue: {} {:?}", when, hash); let mut key = u64::to_be_bytes(when).to_vec(); key.extend(hash.as_ref()); @@ -252,7 +254,7 @@ impl BlockManager { if let Err(e) = self.resync_iter(&hash).await { warn!("Failed to resync block {:?}, retrying later: {}", hash, e); - self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?; + self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?; n_failures += 1; if n_failures >= 10 { warn!("Too many resync failures, throttling."); @@ -281,6 +283,8 @@ impl BlockManager { } async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> { + let lock = self.data_dir_lock.lock().await; + let path = self.block_path(hash); let exists = fs::metadata(&path).await.is_ok(); @@ -360,6 +364,8 @@ impl BlockManager { } if needed && !exists { + drop(lock); + // TODO find a way to not do this if they are sending it to us // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay // between the RC being incremented and this part being called. @@ -420,7 +426,7 @@ impl BlockManager { } if !block_ref.deleted.get() { last_hash = Some(block_ref.block); - self.put_to_resync(&block_ref.block, 0)?; + self.put_to_resync(&block_ref.block, Duration::from_secs(0))?; } i += 1; if i & 0xFF == 0 && *must_exit.borrow() { @@ -464,7 +470,7 @@ impl BlockManager { }; let mut hash = [0u8; 32]; hash.copy_from_slice(&hash_bytes[..]); - self.put_to_resync(&hash.into(), 0)?; + self.put_to_resync(&hash.into(),Duration::from_secs(0))?; } if *must_exit.borrow() { -- cgit v1.2.3 From 097c339d981dba0420af17d30d1221181d8bf1d7 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 15:26:29 +0100 Subject: Fix race condition --- src/model/block.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'src/model') diff --git a/src/model/block.rs b/src/model/block.rs index 9426f683..5934f20c 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -28,6 +28,7 @@ use crate::garage::Garage; pub const INLINE_THRESHOLD: usize = 3072; const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42); +const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60); const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); @@ -175,7 +176,10 @@ impl BlockManager { if data::blake2sum(&data[..]) != *hash { let _lock = self.data_dir_lock.lock().await; - warn!("Block {:?} is corrupted. Renaming to .corrupted and resyncing.", hash); + warn!( + "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", + hash + ); let mut path2 = path.clone(); path2.set_extension(".corrupted"); fs::rename(path, path2).await?; @@ -225,7 +229,7 @@ impl BlockManager { pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { let new_rc = self.rc.merge(&hash, vec![0])?; if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { - self.put_to_resync(&hash, Duration::from_secs(0))?; + self.put_to_resync(&hash, BLOCK_GC_TIMEOUT)?; } Ok(()) } @@ -470,7 +474,7 @@ impl BlockManager { }; let mut hash = [0u8; 32]; hash.copy_from_slice(&hash_bytes[..]); - self.put_to_resync(&hash.into(),Duration::from_secs(0))?; + self.put_to_resync(&hash.into(), Duration::from_secs(0))?; } if *must_exit.borrow() { -- cgit v1.2.3 From 3bf2df622a070fe8f233bec6d60bd5cca995fbfc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 16:21:41 +0100 Subject: Time and metadata improvements --- src/model/block.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/model') diff --git a/src/model/block.rs b/src/model/block.rs index 5934f20c..36ad867a 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -11,9 +11,9 @@ use tokio::fs; use tokio::prelude::*; use tokio::sync::{watch, Mutex, Notify}; -use garage_util::data; use garage_util::data::*; use garage_util::error::Error; +use garage_util::time::*; use garage_rpc::membership::System; use garage_rpc::rpc_client::*; @@ -174,7 +174,7 @@ impl BlockManager { f.read_to_end(&mut data).await?; drop(f); - if data::blake2sum(&data[..]) != *hash { + if blake2sum(&data[..]) != *hash { let _lock = self.data_dir_lock.lock().await; warn!( "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", -- cgit v1.2.3 From 0290afe1f8eafabf71695d677807e07658d078ab Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 18:27:26 +0100 Subject: Make block rc code more understandable --- src/model/block.rs | 54 +++++++++++++++++++++--------------------------------- 1 file changed, 21 insertions(+), 33 deletions(-) (limited to 'src/model') diff --git a/src/model/block.rs b/src/model/block.rs index 36ad867a..8523474a 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -77,7 +77,6 @@ impl BlockManager { let rc = db .open_tree("block_local_rc") .expect("Unable to open block_local_rc tree"); - rc.set_merge_operator(rc_merge); let resync_queue = db .open_tree("block_local_resync_queue") @@ -194,7 +193,7 @@ impl BlockManager { let needed = self .rc .get(hash.as_ref())? - .map(|x| u64_from_bytes(x.as_ref()) > 0) + .map(|x| u64_from_be_bytes(x) > 0) .unwrap_or(false); if needed { let path = self.block_path(hash); @@ -218,17 +217,27 @@ impl BlockManager { } pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { - let old_rc = self.rc.get(&hash)?; - self.rc.merge(&hash, vec![1])?; - if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { + let old_rc = self.rc.fetch_and_update(&hash, |old| { + let old_v = old.map(u64_from_be_bytes).unwrap_or(0); + Some(u64::to_be_bytes(old_v + 1).to_vec()) + })?; + let old_rc = old_rc.map(u64_from_be_bytes).unwrap_or(0); + if old_rc == 0 { self.put_to_resync(&hash, BLOCK_RW_TIMEOUT)?; } Ok(()) } pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { - let new_rc = self.rc.merge(&hash, vec![0])?; - if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { + let new_rc = self.rc.update_and_fetch(&hash, |old| { + let old_v = old.map(u64_from_be_bytes).unwrap_or(0); + if old_v > 1 { + Some(u64::to_be_bytes(old_v - 1).to_vec()) + } else { + None + } + })?; + if new_rc.is_none() { self.put_to_resync(&hash, BLOCK_GC_TIMEOUT)?; } Ok(()) @@ -251,7 +260,7 @@ impl BlockManager { let mut n_failures = 0usize; while !*must_exit.borrow() { if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { - let time_msec = u64_from_bytes(&time_bytes[0..8]); + let time_msec = u64_from_be_bytes(&time_bytes[0..8]); let now = now_msec(); if now >= time_msec { let hash = Hash::try_from(&hash_bytes[..]).unwrap(); @@ -295,7 +304,7 @@ impl BlockManager { let needed = self .rc .get(hash.as_ref())? - .map(|x| u64_from_bytes(x.as_ref()) > 0) + .map(|x| u64_from_be_bytes(x) > 0) .unwrap_or(false); if exists != needed { @@ -487,30 +496,9 @@ impl BlockManager { } } -fn u64_from_bytes(bytes: &[u8]) -> u64 { - assert!(bytes.len() == 8); +fn u64_from_be_bytes>(bytes: T) -> u64 { + assert!(bytes.as_ref().len() == 8); let mut x8 = [0u8; 8]; - x8.copy_from_slice(bytes); + x8.copy_from_slice(bytes.as_ref()); u64::from_be_bytes(x8) } - -fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option> { - let old = old.map(u64_from_bytes).unwrap_or(0); - assert!(new.len() == 1); - let new = match new[0] { - 0 => { - if old > 0 { - old - 1 - } else { - 0 - } - } - 1 => old + 1, - _ => unreachable!(), - }; - if new == 0 { - None - } else { - Some(u64::to_be_bytes(new).to_vec()) - } -} -- cgit v1.2.3 From 5ee1d956b6c7dd847a304ef524253b2e067e1245 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 19:14:26 +0100 Subject: Allow manipulation of keys by their shorthand in the CLI --- src/model/key_table.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) (limited to 'src/model') diff --git a/src/model/key_table.rs b/src/model/key_table.rs index b4ab65b6..ce5888ce 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -92,13 +92,24 @@ impl CRDT for Key { pub struct KeyTable; +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum KeyFilter { + Deleted(DeletedFilter), + Matches(String), +} + impl TableSchema for KeyTable { type P = EmptyKey; type S = String; type E = Key; - type Filter = DeletedFilter; + type Filter = KeyFilter; fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted.get()) + match filter { + KeyFilter::Deleted(df) => df.apply(entry.deleted.get()), + KeyFilter::Matches(pat) => { + entry.key_id.starts_with(pat) || entry.name.get().to_lowercase() == pat.to_lowercase() + } + } } } -- cgit v1.2.3 From 642bed601f9b5f8a0cf3e99223235e6b3eb33c0a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 19:16:42 +0100 Subject: Make it case-insensitive --- src/model/key_table.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/model') diff --git a/src/model/key_table.rs b/src/model/key_table.rs index ce5888ce..88d7b4ff 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -108,7 +108,8 @@ impl TableSchema for KeyTable { match filter { KeyFilter::Deleted(df) => df.apply(entry.deleted.get()), KeyFilter::Matches(pat) => { - entry.key_id.starts_with(pat) || entry.name.get().to_lowercase() == pat.to_lowercase() + let pat = pat.to_lowercase(); + entry.key_id.to_lowercase().starts_with(&pat) || entry.name.get().to_lowercase() == pat } } } -- cgit v1.2.3 From 667e4e72a8e64a094d57ceeb6442cef08f1ef0e1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 19:51:16 +0100 Subject: Small fixes --- src/model/block.rs | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) (limited to 'src/model') diff --git a/src/model/block.rs b/src/model/block.rs index 8523474a..9fe6c76b 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -27,6 +27,8 @@ use crate::garage::Garage; pub const INLINE_THRESHOLD: usize = 3072; +pub const BACKGROUND_WORKERS: u64 = 1; + const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42); const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60); const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); @@ -56,14 +58,14 @@ pub struct BlockManager { pub data_dir: PathBuf, pub data_dir_lock: Mutex<()>, - pub rc: sled::Tree, + rc: sled::Tree, - pub resync_queue: sled::Tree, - pub resync_notify: Notify, + resync_queue: sled::Tree, + resync_notify: Notify, - pub system: Arc, + system: Arc, rpc_client: Arc>, - pub garage: ArcSwapOption, + pub(crate) garage: ArcSwapOption, } impl BlockManager { @@ -128,7 +130,7 @@ impl BlockManager { pub fn spawn_background_worker(self: Arc) { // Launch 2 simultaneous workers for background resync loop preprocessing - for i in 0..2u64 { + for i in 0..BACKGROUND_WORKERS { let bm2 = self.clone(); let background = self.system.background.clone(); tokio::spawn(async move { @@ -373,7 +375,6 @@ impl BlockManager { ); fs::remove_file(path).await?; - self.resync_queue.remove(&hash)?; } if needed && !exists { @@ -494,6 +495,14 @@ impl BlockManager { } .boxed() } + + pub fn resync_queue_len(&self) -> usize { + self.resync_queue.len() + } + + pub fn rc_len(&self) -> usize { + self.rc.len() + } } fn u64_from_be_bytes>(bytes: T) -> u64 { -- cgit v1.2.3 From 4d4117f2b4eb69b63e2329f6e0b8929e6a8b5b31 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 20:09:44 +0100 Subject: Refactor block resync loop; make workers infaillible --- src/model/block.rs | 58 ++++++++++++++++++++++++++++-------------------------- 1 file changed, 30 insertions(+), 28 deletions(-) (limited to 'src/model') diff --git a/src/model/block.rs b/src/model/block.rs index 9fe6c76b..023ed3ab 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -258,46 +258,48 @@ impl BlockManager { async fn resync_loop( self: Arc, mut must_exit: watch::Receiver, - ) -> Result<(), Error> { - let mut n_failures = 0usize; + ) { while !*must_exit.borrow() { - if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { - let time_msec = u64_from_be_bytes(&time_bytes[0..8]); - let now = now_msec(); - if now >= time_msec { - let hash = Hash::try_from(&hash_bytes[..]).unwrap(); - - if let Err(e) = self.resync_iter(&hash).await { - warn!("Failed to resync block {:?}, retrying later: {}", hash, e); - self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?; - n_failures += 1; - if n_failures >= 10 { - warn!("Too many resync failures, throttling."); - tokio::time::delay_for(Duration::from_secs(1)).await; - } - } else { - n_failures = 0; - } - } else { - self.resync_queue.insert(time_bytes, hash_bytes)?; - let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now)); - select! { - _ = delay.fuse() => (), - _ = self.resync_notify.notified().fuse() => (), - _ = must_exit.recv().fuse() => (), - } + if let Err(e) = self.resync_iter(&mut must_exit).await { + warn!("Error in block resync loop: {}", e); + tokio::time::delay_for(Duration::from_secs(10)).await; + } + } + } + + + async fn resync_iter(&self, must_exit: &mut watch::Receiver) -> Result<(), Error> { + if let Some(first_item) = self.resync_queue.iter().next() { + let (time_bytes, hash_bytes) = first_item?; + let time_msec = u64_from_be_bytes(&time_bytes[0..8]); + let now = now_msec(); + if now >= time_msec { + let hash = Hash::try_from(&hash_bytes[..]).unwrap(); + let res = self.resync_block(&hash).await; + if let Err(e) = &res { + warn!("Error when resyncing {:?}: {}", hash, e); + self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?; } + self.resync_queue.remove(&time_bytes)?; + res?; // propagate error to delay main loop } else { + let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now)); select! { + _ = delay.fuse() => (), _ = self.resync_notify.notified().fuse() => (), _ = must_exit.recv().fuse() => (), } } + } else { + select! { + _ = self.resync_notify.notified().fuse() => (), + _ = must_exit.recv().fuse() => (), + } } Ok(()) } - async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> { + async fn resync_block(&self, hash: &Hash) -> Result<(), Error> { let lock = self.data_dir_lock.lock().await; let path = self.block_path(hash); -- cgit v1.2.3 From 0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 22:36:41 +0100 Subject: WIP migrate to tokio 1 --- src/model/Cargo.toml | 3 +-- src/model/block.rs | 23 +++++++++++++---------- src/model/object_table.rs | 3 ++- src/model/version_table.rs | 3 ++- 4 files changed, 18 insertions(+), 14 deletions(-) (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index caeed66c..8f36cf2e 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -33,5 +33,4 @@ serde_bytes = "0.11" async-trait = "0.1.30" futures = "0.3" futures-util = "0.3" -tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } - +tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } diff --git a/src/model/block.rs b/src/model/block.rs index 023ed3ab..7185372c 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -5,10 +5,9 @@ use std::time::Duration; use arc_swap::ArcSwapOption; use futures::future::*; use futures::select; -use futures::stream::*; use serde::{Deserialize, Serialize}; use tokio::fs; -use tokio::prelude::*; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; use garage_util::data::*; @@ -134,7 +133,7 @@ impl BlockManager { let bm2 = self.clone(); let background = self.system.background.clone(); tokio::spawn(async move { - tokio::time::delay_for(Duration::from_secs(10 * (i + 1))).await; + tokio::time::sleep(Duration::from_secs(10 * (i + 1))).await; background.spawn_worker(format!("block resync worker {}", i), move |must_exit| { bm2.resync_loop(must_exit) }); @@ -251,7 +250,7 @@ impl BlockManager { let mut key = u64::to_be_bytes(when).to_vec(); key.extend(hash.as_ref()); self.resync_queue.insert(key, hash.as_ref())?; - self.resync_notify.notify(); + self.resync_notify.notify_waiters(); Ok(()) } @@ -262,7 +261,7 @@ impl BlockManager { while !*must_exit.borrow() { if let Err(e) = self.resync_iter(&mut must_exit).await { warn!("Error in block resync loop: {}", e); - tokio::time::delay_for(Duration::from_secs(10)).await; + tokio::time::sleep(Duration::from_secs(10)).await; } } } @@ -283,17 +282,17 @@ impl BlockManager { self.resync_queue.remove(&time_bytes)?; res?; // propagate error to delay main loop } else { - let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now)); + let delay = tokio::time::sleep(Duration::from_millis(time_msec - now)); select! { _ = delay.fuse() => (), _ = self.resync_notify.notified().fuse() => (), - _ = must_exit.recv().fuse() => (), + _ = must_exit.changed().fuse() => (), } } } else { select! { _ = self.resync_notify.notified().fuse() => (), - _ = must_exit.recv().fuse() => (), + _ = must_exit.changed().fuse() => (), } } Ok(()) @@ -467,8 +466,12 @@ impl BlockManager { // so that we can offload them if necessary and then delete them locally. async move { let mut ls_data_dir = fs::read_dir(path).await?; - while let Some(data_dir_ent) = ls_data_dir.next().await { - let data_dir_ent = data_dir_ent?; + loop { + let data_dir_ent = ls_data_dir.next_entry().await?; + let data_dir_ent = match data_dir_ent { + Some(x) => x, + None => break, + }; let name = data_dir_ent.file_name(); let name = match name.into_string() { Ok(x) => x, diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 99fad3ce..d08bba70 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -195,7 +195,8 @@ impl TableSchema for ObjectTable { fn updated(&self, old: Option, new: Option) { let version_table = self.version_table.clone(); - self.background.spawn(async move { + // TODO not cancellable + self.background.spawn_cancellable(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of old versions for v in old_v.versions.iter() { diff --git a/src/model/version_table.rs b/src/model/version_table.rs index 841fbfea..19343890 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -110,7 +110,8 @@ impl TableSchema for VersionTable { fn updated(&self, old: Option, new: Option) { let block_ref_table = self.block_ref_table.clone(); - self.background.spawn(async move { + // TODO not cancellable + self.background.spawn_cancellable(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of version blocks if new_v.deleted.get() && !old_v.deleted.get() { -- cgit v1.2.3 From 6a8439fd1345ecae7414386f76dda7a03eb14df2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 23:14:12 +0100 Subject: Some improvements in background worker but we terminate late --- src/model/block.rs | 13 ++++++------- src/model/key_table.rs | 3 ++- src/model/object_table.rs | 3 +-- src/model/version_table.rs | 3 +-- 4 files changed, 10 insertions(+), 12 deletions(-) (limited to 'src/model') diff --git a/src/model/block.rs b/src/model/block.rs index 7185372c..a3958866 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -254,19 +254,18 @@ impl BlockManager { Ok(()) } - async fn resync_loop( - self: Arc, - mut must_exit: watch::Receiver, - ) { + async fn resync_loop(self: Arc, mut must_exit: watch::Receiver) { while !*must_exit.borrow() { if let Err(e) = self.resync_iter(&mut must_exit).await { warn!("Error in block resync loop: {}", e); - tokio::time::sleep(Duration::from_secs(10)).await; + select! { + _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => (), + _ = must_exit.changed().fuse() => (), + } } } } - async fn resync_iter(&self, must_exit: &mut watch::Receiver) -> Result<(), Error> { if let Some(first_item) = self.resync_queue.iter().next() { let (time_bytes, hash_bytes) = first_item?; @@ -280,7 +279,7 @@ impl BlockManager { self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?; } self.resync_queue.remove(&time_bytes)?; - res?; // propagate error to delay main loop + res?; // propagate error to delay main loop } else { let delay = tokio::time::sleep(Duration::from_millis(time_msec - now)); select! { diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 88d7b4ff..02dcf68c 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -109,7 +109,8 @@ impl TableSchema for KeyTable { KeyFilter::Deleted(df) => df.apply(entry.deleted.get()), KeyFilter::Matches(pat) => { let pat = pat.to_lowercase(); - entry.key_id.to_lowercase().starts_with(&pat) || entry.name.get().to_lowercase() == pat + entry.key_id.to_lowercase().starts_with(&pat) + || entry.name.get().to_lowercase() == pat } } } diff --git a/src/model/object_table.rs b/src/model/object_table.rs index d08bba70..99fad3ce 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -195,8 +195,7 @@ impl TableSchema for ObjectTable { fn updated(&self, old: Option, new: Option) { let version_table = self.version_table.clone(); - // TODO not cancellable - self.background.spawn_cancellable(async move { + self.background.spawn(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of old versions for v in old_v.versions.iter() { diff --git a/src/model/version_table.rs b/src/model/version_table.rs index 19343890..841fbfea 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -110,8 +110,7 @@ impl TableSchema for VersionTable { fn updated(&self, old: Option, new: Option) { let block_ref_table = self.block_ref_table.clone(); - // TODO not cancellable - self.background.spawn_cancellable(async move { + self.background.spawn(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of version blocks if new_v.deleted.get() && !old_v.deleted.get() { -- cgit v1.2.3 From 1d9961e4118af0e26068e1d6c5c6c009a1292a88 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 16 Mar 2021 11:14:27 +0100 Subject: Simplify replication logic --- src/model/block.rs | 14 ++++++-------- src/model/garage.rs | 7 ++++++- 2 files changed, 12 insertions(+), 9 deletions(-) (limited to 'src/model') diff --git a/src/model/block.rs b/src/model/block.rs index a3958866..41729685 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -319,10 +319,8 @@ impl BlockManager { if exists && !needed { trace!("Offloading block {:?}", hash); - let ring = self.system.ring.borrow().clone(); - - let mut who = self.replication.replication_nodes(&hash, &ring); - if who.len() < self.replication.write_quorum(&self.system) { + let mut who = self.replication.write_nodes(&hash); + if who.len() < self.replication.write_quorum() { return Err(Error::Message(format!("Not trying to offload block because we don't have a quorum of nodes to write to"))); } who.retain(|id| *id != self.system.id); @@ -367,7 +365,7 @@ impl BlockManager { ) .await?; } - trace!( + info!( "Deleting block {:?}, offload finished ({} / {})", hash, need_nodes.len(), @@ -391,7 +389,7 @@ impl BlockManager { } pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { - let who = self.replication.read_nodes(&hash, &self.system); + let who = self.replication.read_nodes(&hash); let resps = self .rpc_client .try_call_many( @@ -415,12 +413,12 @@ impl BlockManager { } pub async fn rpc_put_block(&self, hash: Hash, data: Vec) -> Result<(), Error> { - let who = self.replication.write_nodes(&hash, &self.system); + let who = self.replication.write_nodes(&hash); self.rpc_client .try_call_many( &who[..], Message::PutBlock(PutBlockMessage { hash, data }), - RequestStrategy::with_quorum(self.replication.write_quorum(&self.system)) + RequestStrategy::with_quorum(self.replication.write_quorum()) .with_timeout(BLOCK_RW_TIMEOUT), ) .await?; diff --git a/src/model/garage.rs b/src/model/garage.rs index ced3c29e..5f7a67c9 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -54,18 +54,23 @@ impl Garage { ); let data_rep_param = TableShardedReplication { + system: system.clone(), replication_factor: config.data_replication_factor, write_quorum: (config.data_replication_factor + 1) / 2, read_quorum: 1, }; let meta_rep_param = TableShardedReplication { + system: system.clone(), replication_factor: config.meta_replication_factor, write_quorum: (config.meta_replication_factor + 1) / 2, read_quorum: (config.meta_replication_factor + 1) / 2, }; - let control_rep_param = TableFullReplication::new(config.control_write_max_faults); + let control_rep_param = TableFullReplication { + system: system.clone(), + max_faults: config.control_write_max_faults, + }; info!("Initialize block manager..."); let block_manager = BlockManager::new( -- cgit v1.2.3 From f4346cc5f45839ace93d2d11ce6beea632fd8f2c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 16 Mar 2021 15:58:40 +0100 Subject: Update dependencies --- src/model/Cargo.toml | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 8f36cf2e..98656ea9 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -17,20 +17,17 @@ garage_util = { version = "0.1.1", path = "../util" } garage_rpc = { version = "0.1.1", path = "../rpc" } garage_table = { version = "0.1.1", path = "../table" } -bytes = "0.4" -rand = "0.7" -hex = "0.3" -sha2 = "0.8" -arc-swap = "0.4" +rand = "0.8" +hex = "0.4" +arc-swap = "1.0" log = "0.4" sled = "0.34" -rmp-serde = "0.14.3" +rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" -async-trait = "0.1.30" futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } -- cgit v1.2.3 From 08bcd5195654ae073afe31c341f0ce4b36094da0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 16 Mar 2021 16:51:15 +0100 Subject: GC object table in a specific case --- src/model/block.rs | 2 +- src/model/object_table.rs | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) (limited to 'src/model') diff --git a/src/model/block.rs b/src/model/block.rs index 41729685..0d9af38f 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -259,7 +259,7 @@ impl BlockManager { if let Err(e) = self.resync_iter(&mut must_exit).await { warn!("Error in block resync loop: {}", e); select! { - _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => (), + _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => (), _ = must_exit.changed().fuse() => (), } } diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 99fad3ce..62606df4 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -146,6 +146,9 @@ impl Entry for Object { fn sort_key(&self) -> &String { &self.key } + fn is_tombstone(&self) -> bool { + self.versions.len() == 1 && self.versions[0].state == ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) + } } impl CRDT for Object { -- cgit v1.2.3 From 4eb16e886388f35d2bdee52b16922421004cf132 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 18 Mar 2021 19:24:59 +0100 Subject: Allow to import keys from previous Garage instance --- src/model/key_table.rs | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'src/model') diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 02dcf68c..fcca3835 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -34,6 +34,15 @@ impl Key { authorized_buckets: crdt::LWWMap::new(), } } + pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self { + Self { + key_id: key_id.to_string(), + secret_key: secret_key.to_string(), + name: crdt::LWW::new(name.to_string()), + deleted: crdt::Bool::new(false), + authorized_buckets: crdt::LWWMap::new(), + } + } pub fn delete(key_id: String) -> Self { Self { key_id, -- cgit v1.2.3