diff options
author | Alex Auvolat <alex@adnab.me> | 2021-03-18 19:27:02 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-03-18 19:27:02 +0100 |
commit | 4348bde180887f5185ca6da6024476e8e8fb2fe6 (patch) | |
tree | 8a35f0e4229d15665af7673eebcaa1b3d75a73cb /src/model | |
parent | 5b659b28ce6bef15072d2fc93f777aa8ff73b2d8 (diff) | |
parent | 4eb16e886388f35d2bdee52b16922421004cf132 (diff) | |
download | garage-4348bde180887f5185ca6da6024476e8e8fb2fe6.tar.gz garage-4348bde180887f5185ca6da6024476e8e8fb2fe6.zip |
Merge branch 'dev-0.2'
Diffstat (limited to 'src/model')
-rw-r--r-- | src/model/Cargo.toml | 15 | ||||
-rw-r--r-- | src/model/block.rs | 241 | ||||
-rw-r--r-- | src/model/block_ref_table.rs | 20 | ||||
-rw-r--r-- | src/model/bucket_table.rs | 42 | ||||
-rw-r--r-- | src/model/garage.rs | 31 | ||||
-rw-r--r-- | src/model/key_table.rs | 65 | ||||
-rw-r--r-- | src/model/object_table.rs | 97 | ||||
-rw-r--r-- | src/model/version_table.rs | 107 |
8 files changed, 260 insertions, 358 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 48b75d24..98656ea9 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -16,23 +16,18 @@ path = "lib.rs" garage_util = { version = "0.1.1", path = "../util" } garage_rpc = { version = "0.1.1", path = "../rpc" } garage_table = { version = "0.1.1", path = "../table" } -model010 = { package = "garage_model_010b", version = "0.0.1" } -bytes = "0.4" -rand = "0.7" -hex = "0.3" -sha2 = "0.8" -arc-swap = "0.4" +rand = "0.8" +hex = "0.4" +arc-swap = "1.0" log = "0.4" sled = "0.34" -rmp-serde = "0.14.3" +rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" -async-trait = "0.1.30" futures = "0.3" futures-util = "0.3" -tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } - +tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } diff --git a/src/model/block.rs b/src/model/block.rs index 56c85c6a..0d9af38f 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -5,22 +5,20 @@ use std::time::Duration; use arc_swap::ArcSwapOption; use futures::future::*; use futures::select; -use futures::stream::*; use serde::{Deserialize, Serialize}; use tokio::fs; -use tokio::prelude::*; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; -use garage_util::data; use garage_util::data::*; use garage_util::error::Error; +use garage_util::time::*; use garage_rpc::membership::System; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; -use garage_table::table_sharded::TableShardedReplication; -use garage_table::TableReplication; +use garage_table::replication::{sharded::TableShardedReplication, TableReplication}; use crate::block_ref_table::*; @@ -28,7 +26,10 @@ use crate::garage::Garage; pub const INLINE_THRESHOLD: usize = 3072; +pub const BACKGROUND_WORKERS: u64 = 1; + const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42); +const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60); const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); @@ -56,14 +57,14 @@ pub struct BlockManager { pub data_dir: PathBuf, pub data_dir_lock: Mutex<()>, - pub rc: sled::Tree, + rc: sled::Tree, - pub resync_queue: sled::Tree, - pub resync_notify: Notify, + resync_queue: sled::Tree, + resync_notify: Notify, - pub system: Arc<System>, + system: Arc<System>, rpc_client: Arc<RpcClient<Message>>, - pub garage: ArcSwapOption<Garage>, + pub(crate) garage: ArcSwapOption<Garage>, } impl BlockManager { @@ -77,7 +78,6 @@ impl BlockManager { let rc = db .open_tree("block_local_rc") .expect("Unable to open block_local_rc tree"); - rc.set_merge_operator(rc_merge); let resync_queue = db .open_tree("block_local_resync_queue") @@ -127,18 +127,16 @@ impl BlockManager { } } - pub async fn spawn_background_worker(self: Arc<Self>) { + pub fn spawn_background_worker(self: Arc<Self>) { // Launch 2 simultaneous workers for background resync loop preprocessing - for i in 0..2usize { + for i in 0..BACKGROUND_WORKERS { let bm2 = self.clone(); let background = self.system.background.clone(); tokio::spawn(async move { - tokio::time::delay_for(Duration::from_secs(10)).await; - background - .spawn_worker(format!("block resync worker {}", i), move |must_exit| { - bm2.resync_loop(must_exit) - }) - .await; + tokio::time::sleep(Duration::from_secs(10 * (i + 1))).await; + background.spawn_worker(format!("block resync worker {}", i), move |must_exit| { + bm2.resync_loop(must_exit) + }); }); } } @@ -168,7 +166,7 @@ impl BlockManager { Ok(f) => f, Err(e) => { // Not found but maybe we should have had it ?? - self.put_to_resync(hash, 0)?; + self.put_to_resync(hash, Duration::from_millis(0))?; return Err(Into::into(e)); } }; @@ -176,11 +174,16 @@ impl BlockManager { f.read_to_end(&mut data).await?; drop(f); - if data::sha256sum(&data[..]) != *hash { + if blake2sum(&data[..]) != *hash { let _lock = self.data_dir_lock.lock().await; - warn!("Block {:?} is corrupted. Deleting and resyncing.", hash); - fs::remove_file(path).await?; - self.put_to_resync(&hash, 0)?; + warn!( + "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", + hash + ); + let mut path2 = path.clone(); + path2.set_extension(".corrupted"); + fs::rename(path, path2).await?; + self.put_to_resync(&hash, Duration::from_millis(0))?; return Err(Error::CorruptData(*hash)); } @@ -191,7 +194,7 @@ impl BlockManager { let needed = self .rc .get(hash.as_ref())? - .map(|x| u64_from_bytes(x.as_ref()) > 0) + .map(|x| u64_from_be_bytes(x) > 0) .unwrap_or(false); if needed { let path = self.block_path(hash); @@ -215,84 +218,95 @@ impl BlockManager { } pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { - let old_rc = self.rc.get(&hash)?; - self.rc.merge(&hash, vec![1])?; - if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { - self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?; + let old_rc = self.rc.fetch_and_update(&hash, |old| { + let old_v = old.map(u64_from_be_bytes).unwrap_or(0); + Some(u64::to_be_bytes(old_v + 1).to_vec()) + })?; + let old_rc = old_rc.map(u64_from_be_bytes).unwrap_or(0); + if old_rc == 0 { + self.put_to_resync(&hash, BLOCK_RW_TIMEOUT)?; } Ok(()) } pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { - let new_rc = self.rc.merge(&hash, vec![0])?; - if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { - self.put_to_resync(&hash, 0)?; + let new_rc = self.rc.update_and_fetch(&hash, |old| { + let old_v = old.map(u64_from_be_bytes).unwrap_or(0); + if old_v > 1 { + Some(u64::to_be_bytes(old_v - 1).to_vec()) + } else { + None + } + })?; + if new_rc.is_none() { + self.put_to_resync(&hash, BLOCK_GC_TIMEOUT)?; } Ok(()) } - fn put_to_resync(&self, hash: &Hash, delay_millis: u64) -> Result<(), Error> { - let when = now_msec() + delay_millis; + fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> { + let when = now_msec() + delay.as_millis() as u64; trace!("Put resync_queue: {} {:?}", when, hash); let mut key = u64::to_be_bytes(when).to_vec(); key.extend(hash.as_ref()); self.resync_queue.insert(key, hash.as_ref())?; - self.resync_notify.notify(); + self.resync_notify.notify_waiters(); Ok(()) } - async fn resync_loop( - self: Arc<Self>, - mut must_exit: watch::Receiver<bool>, - ) -> Result<(), Error> { - let mut n_failures = 0usize; + async fn resync_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { while !*must_exit.borrow() { - if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { - let time_msec = u64_from_bytes(&time_bytes[0..8]); - let now = now_msec(); - if now >= time_msec { - let mut hash = [0u8; 32]; - hash.copy_from_slice(hash_bytes.as_ref()); - let hash = Hash::from(hash); - - if let Err(e) = self.resync_iter(&hash).await { - warn!("Failed to resync block {:?}, retrying later: {}", hash, e); - self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?; - n_failures += 1; - if n_failures >= 10 { - warn!("Too many resync failures, throttling."); - tokio::time::delay_for(Duration::from_secs(1)).await; - } - } else { - n_failures = 0; - } - } else { - self.resync_queue.insert(time_bytes, hash_bytes)?; - let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now)); - select! { - _ = delay.fuse() => (), - _ = self.resync_notify.notified().fuse() => (), - _ = must_exit.recv().fuse() => (), - } + if let Err(e) = self.resync_iter(&mut must_exit).await { + warn!("Error in block resync loop: {}", e); + select! { + _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => (), + _ = must_exit.changed().fuse() => (), } + } + } + } + + async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> { + if let Some(first_item) = self.resync_queue.iter().next() { + let (time_bytes, hash_bytes) = first_item?; + let time_msec = u64_from_be_bytes(&time_bytes[0..8]); + let now = now_msec(); + if now >= time_msec { + let hash = Hash::try_from(&hash_bytes[..]).unwrap(); + let res = self.resync_block(&hash).await; + if let Err(e) = &res { + warn!("Error when resyncing {:?}: {}", hash, e); + self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?; + } + self.resync_queue.remove(&time_bytes)?; + res?; // propagate error to delay main loop } else { + let delay = tokio::time::sleep(Duration::from_millis(time_msec - now)); select! { + _ = delay.fuse() => (), _ = self.resync_notify.notified().fuse() => (), - _ = must_exit.recv().fuse() => (), + _ = must_exit.changed().fuse() => (), } } + } else { + select! { + _ = self.resync_notify.notified().fuse() => (), + _ = must_exit.changed().fuse() => (), + } } Ok(()) } - async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> { + async fn resync_block(&self, hash: &Hash) -> Result<(), Error> { + let lock = self.data_dir_lock.lock().await; + let path = self.block_path(hash); let exists = fs::metadata(&path).await.is_ok(); let needed = self .rc .get(hash.as_ref())? - .map(|x| u64_from_bytes(x.as_ref()) > 0) + .map(|x| u64_from_be_bytes(x) > 0) .unwrap_or(false); if exists != needed { @@ -305,9 +319,10 @@ impl BlockManager { if exists && !needed { trace!("Offloading block {:?}", hash); - let ring = self.system.ring.borrow().clone(); - - let mut who = self.replication.replication_nodes(&hash, &ring); + let mut who = self.replication.write_nodes(&hash); + if who.len() < self.replication.write_quorum() { + return Err(Error::Message(format!("Not trying to offload block because we don't have a quorum of nodes to write to"))); + } who.retain(|id| *id != self.system.id); let msg = Arc::new(Message::NeedBlockQuery(*hash)); @@ -340,17 +355,17 @@ impl BlockManager { need_nodes.len() ); - let put_block_message = Arc::new(self.read_block(hash).await?); - let put_resps = join_all(need_nodes.iter().map(|to| { - self.rpc_client - .call_arc(*to, put_block_message.clone(), BLOCK_RW_TIMEOUT) - })) - .await; - for resp in put_resps { - resp?; - } + let put_block_message = self.read_block(hash).await?; + self.rpc_client + .try_call_many( + &need_nodes[..], + put_block_message, + RequestStrategy::with_quorum(need_nodes.len()) + .with_timeout(BLOCK_RW_TIMEOUT), + ) + .await?; } - trace!( + info!( "Deleting block {:?}, offload finished ({} / {})", hash, need_nodes.len(), @@ -358,10 +373,11 @@ impl BlockManager { ); fs::remove_file(path).await?; - self.resync_queue.remove(&hash)?; } if needed && !exists { + drop(lock); + // TODO find a way to not do this if they are sending it to us // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay // between the RC being incremented and this part being called. @@ -373,7 +389,7 @@ impl BlockManager { } pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { - let who = self.replication.read_nodes(&hash, &self.system); + let who = self.replication.read_nodes(&hash); let resps = self .rpc_client .try_call_many( @@ -397,12 +413,12 @@ impl BlockManager { } pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { - let who = self.replication.write_nodes(&hash, &self.system); + let who = self.replication.write_nodes(&hash); self.rpc_client .try_call_many( &who[..], Message::PutBlock(PutBlockMessage { hash, data }), - RequestStrategy::with_quorum(self.replication.write_quorum(&self.system)) + RequestStrategy::with_quorum(self.replication.write_quorum()) .with_timeout(BLOCK_RW_TIMEOUT), ) .await?; @@ -414,15 +430,15 @@ impl BlockManager { let garage = self.garage.load_full().unwrap(); let mut last_hash = None; let mut i = 0usize; - for entry in garage.block_ref_table.store.iter() { + for entry in garage.block_ref_table.data.store.iter() { let (_k, v_bytes) = entry?; let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(v_bytes.as_ref())?; if Some(&block_ref.block) == last_hash.as_ref() { continue; } - if !block_ref.deleted { + if !block_ref.deleted.get() { last_hash = Some(block_ref.block); - self.put_to_resync(&block_ref.block, 0)?; + self.put_to_resync(&block_ref.block, Duration::from_secs(0))?; } i += 1; if i & 0xFF == 0 && *must_exit.borrow() { @@ -447,8 +463,12 @@ impl BlockManager { // so that we can offload them if necessary and then delete them locally. async move { let mut ls_data_dir = fs::read_dir(path).await?; - while let Some(data_dir_ent) = ls_data_dir.next().await { - let data_dir_ent = data_dir_ent?; + loop { + let data_dir_ent = ls_data_dir.next_entry().await?; + let data_dir_ent = match data_dir_ent { + Some(x) => x, + None => break, + }; let name = data_dir_ent.file_name(); let name = match name.into_string() { Ok(x) => x, @@ -466,7 +486,7 @@ impl BlockManager { }; let mut hash = [0u8; 32]; hash.copy_from_slice(&hash_bytes[..]); - self.put_to_resync(&hash.into(), 0)?; + self.put_to_resync(&hash.into(), Duration::from_secs(0))?; } if *must_exit.borrow() { @@ -477,32 +497,19 @@ impl BlockManager { } .boxed() } + + pub fn resync_queue_len(&self) -> usize { + self.resync_queue.len() + } + + pub fn rc_len(&self) -> usize { + self.rc.len() + } } -fn u64_from_bytes(bytes: &[u8]) -> u64 { - assert!(bytes.len() == 8); +fn u64_from_be_bytes<T: AsRef<[u8]>>(bytes: T) -> u64 { + assert!(bytes.as_ref().len() == 8); let mut x8 = [0u8; 8]; - x8.copy_from_slice(bytes); + x8.copy_from_slice(bytes.as_ref()); u64::from_be_bytes(x8) } - -fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> { - let old = old.map(u64_from_bytes).unwrap_or(0); - assert!(new.len() == 1); - let new = match new[0] { - 0 => { - if old > 0 { - old - 1 - } else { - 0 - } - } - 1 => old + 1, - _ => unreachable!(), - }; - if new == 0 { - None - } else { - Some(u64::to_be_bytes(new).to_vec()) - } -} diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs index 9ab67737..e4372717 100644 --- a/src/model/block_ref_table.rs +++ b/src/model/block_ref_table.rs @@ -1,9 +1,9 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; -use garage_util::background::*; use garage_util::data::*; +use garage_table::crdt::CRDT; use garage_table::*; use crate::block::*; @@ -17,7 +17,7 @@ pub struct BlockRef { pub version: UUID, // Keep track of deleted status - pub deleted: bool, + pub deleted: crdt::Bool, } impl Entry<Hash, UUID> for BlockRef { @@ -27,16 +27,18 @@ impl Entry<Hash, UUID> for BlockRef { fn sort_key(&self) -> &UUID { &self.version } + fn is_tombstone(&self) -> bool { + self.deleted.get() + } +} +impl CRDT for BlockRef { fn merge(&mut self, other: &Self) { - if other.deleted { - self.deleted = true; - } + self.deleted.merge(&other.deleted); } } pub struct BlockRefTable { - pub background: Arc<BackgroundRunner>, pub block_manager: Arc<BlockManager>, } @@ -48,8 +50,8 @@ impl TableSchema for BlockRefTable { fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) { let block = &old.as_ref().or(new.as_ref()).unwrap().block; - let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false); - let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false); + let was_before = old.as_ref().map(|x| !x.deleted.get()).unwrap_or(false); + let is_after = new.as_ref().map(|x| !x.deleted.get()).unwrap_or(false); if is_after && !was_before { if let Err(e) = self.block_manager.block_incref(block) { warn!("block_incref failed for block {:?}: {}", block, e); @@ -63,6 +65,6 @@ impl TableSchema for BlockRefTable { } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted) + filter.apply(entry.deleted.get()) } } diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 2878aa38..6330dced 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -5,11 +5,6 @@ use garage_table::*; use crate::key_table::PermissionSet; -// We import the same file but in its version 0.1.0. -// We can then access v0.1.0 data structures. -// We use them to perform migrations. -use model010::bucket_table as prev; - /// A bucket is a collection of objects /// /// Its parameters are not directly accessible as: @@ -89,7 +84,9 @@ impl Entry<EmptyKey, String> for Bucket { fn sort_key(&self) -> &String { &self.name } +} +impl CRDT for Bucket { fn merge(&mut self, other: &Self) { self.state.merge(&other.state); } @@ -106,39 +103,4 @@ impl TableSchema for BucketTable { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { filter.apply(entry.is_deleted()) } - - fn try_migrate(bytes: &[u8]) -> Option<Self::E> { - let old = match rmp_serde::decode::from_read_ref::<_, prev::Bucket>(bytes) { - Ok(x) => x, - Err(_) => return None, - }; - if old.deleted { - Some(Bucket { - name: old.name, - state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Deleted), - }) - } else { - let mut keys = crdt::LWWMap::new(); - for ak in old.authorized_keys() { - keys.merge(&crdt::LWWMap::migrate_from_raw_item( - ak.key_id.clone(), - ak.timestamp, - PermissionSet { - allow_read: ak.allow_read, - allow_write: ak.allow_write, - }, - )); - } - - let params = BucketParams { - authorized_keys: keys, - website: crdt::LWW::new(false), - }; - - Some(Bucket { - name: old.name, - state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Present(params)), - }) - } - } } diff --git a/src/model/garage.rs b/src/model/garage.rs index 467d0aec..5f7a67c9 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -7,8 +7,8 @@ use garage_rpc::membership::System; use garage_rpc::rpc_client::RpcHttpClient; use garage_rpc::rpc_server::RpcServer; -use garage_table::table_fullcopy::*; -use garage_table::table_sharded::*; +use garage_table::replication::fullcopy::*; +use garage_table::replication::sharded::*; use garage_table::*; use crate::block::*; @@ -35,7 +35,7 @@ pub struct Garage { } impl Garage { - pub async fn new( + pub fn new( config: Config, db: sled::Db, background: Arc<BackgroundRunner>, @@ -54,18 +54,23 @@ impl Garage { ); let data_rep_param = TableShardedReplication { + system: system.clone(), replication_factor: config.data_replication_factor, write_quorum: (config.data_replication_factor + 1) / 2, read_quorum: 1, }; let meta_rep_param = TableShardedReplication { + system: system.clone(), replication_factor: config.meta_replication_factor, write_quorum: (config.meta_replication_factor + 1) / 2, read_quorum: (config.meta_replication_factor + 1) / 2, }; - let control_rep_param = TableFullReplication::new(config.control_write_max_faults); + let control_rep_param = TableFullReplication { + system: system.clone(), + max_faults: config.control_write_max_faults, + }; info!("Initialize block manager..."); let block_manager = BlockManager::new( @@ -79,7 +84,6 @@ impl Garage { info!("Initialize block_ref_table..."); let block_ref_table = Table::new( BlockRefTable { - background: background.clone(), block_manager: block_manager.clone(), }, data_rep_param.clone(), @@ -87,8 +91,7 @@ impl Garage { &db, "block_ref".to_string(), rpc_server, - ) - .await; + ); info!("Initialize version_table..."); let version_table = Table::new( @@ -101,8 +104,7 @@ impl Garage { &db, "version".to_string(), rpc_server, - ) - .await; + ); info!("Initialize object_table..."); let object_table = Table::new( @@ -115,8 +117,7 @@ impl Garage { &db, "object".to_string(), rpc_server, - ) - .await; + ); info!("Initialize bucket_table..."); let bucket_table = Table::new( @@ -126,8 +127,7 @@ impl Garage { &db, "bucket".to_string(), rpc_server, - ) - .await; + ); info!("Initialize key_table_table..."); let key_table = Table::new( @@ -137,8 +137,7 @@ impl Garage { &db, "key".to_string(), rpc_server, - ) - .await; + ); info!("Initialize Garage..."); let garage = Arc::new(Self { @@ -156,7 +155,7 @@ impl Garage { info!("Start block manager background thread..."); garage.block_manager.garage.swap(Some(garage.clone())); - garage.block_manager.clone().spawn_background_worker().await; + garage.block_manager.clone().spawn_background_worker(); garage } diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 5942df75..fcca3835 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -1,10 +1,8 @@ use serde::{Deserialize, Serialize}; -use garage_table::crdt::CRDT; +use garage_table::crdt::*; use garage_table::*; -use model010::key_table as prev; - #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Key { // Primary key @@ -36,6 +34,15 @@ impl Key { authorized_buckets: crdt::LWWMap::new(), } } + pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self { + Self { + key_id: key_id.to_string(), + secret_key: secret_key.to_string(), + name: crdt::LWW::new(name.to_string()), + deleted: crdt::Bool::new(false), + authorized_buckets: crdt::LWWMap::new(), + } + } pub fn delete(key_id: String) -> Self { Self { key_id, @@ -66,6 +73,10 @@ pub struct PermissionSet { pub allow_write: bool, } +impl AutoCRDT for PermissionSet { + const WARN_IF_DIFFERENT: bool = true; +} + impl Entry<EmptyKey, String> for Key { fn partition_key(&self) -> &EmptyKey { &EmptyKey @@ -73,55 +84,43 @@ impl Entry<EmptyKey, String> for Key { fn sort_key(&self) -> &String { &self.key_id } +} +impl CRDT for Key { fn merge(&mut self, other: &Self) { self.name.merge(&other.name); self.deleted.merge(&other.deleted); if self.deleted.get() { self.authorized_buckets.clear(); - return; + } else { + self.authorized_buckets.merge(&other.authorized_buckets); } - - self.authorized_buckets.merge(&other.authorized_buckets); } } pub struct KeyTable; +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum KeyFilter { + Deleted(DeletedFilter), + Matches(String), +} + impl TableSchema for KeyTable { type P = EmptyKey; type S = String; type E = Key; - type Filter = DeletedFilter; + type Filter = KeyFilter; fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted.get()) - } - - fn try_migrate(bytes: &[u8]) -> Option<Self::E> { - let old = match rmp_serde::decode::from_read_ref::<_, prev::Key>(bytes) { - Ok(x) => x, - Err(_) => return None, - }; - let mut new = Self::E { - key_id: old.key_id.clone(), - secret_key: old.secret_key.clone(), - name: crdt::LWW::migrate_from_raw(old.name_timestamp, old.name.clone()), - deleted: crdt::Bool::new(old.deleted), - authorized_buckets: crdt::LWWMap::new(), - }; - for ab in old.authorized_buckets() { - let it = crdt::LWWMap::migrate_from_raw_item( - ab.bucket.clone(), - ab.timestamp, - PermissionSet { - allow_read: ab.allow_read, - allow_write: ab.allow_write, - }, - ); - new.authorized_buckets.merge(&it); + match filter { + KeyFilter::Deleted(df) => df.apply(entry.deleted.get()), + KeyFilter::Matches(pat) => { + let pat = pat.to_lowercase(); + entry.key_id.to_lowercase().starts_with(&pat) + || entry.name.get().to_lowercase() == pat + } } - Some(new) } } diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 16cce72c..62606df4 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -5,13 +5,12 @@ use std::sync::Arc; use garage_util::background::BackgroundRunner; use garage_util::data::*; -use garage_table::table_sharded::*; +use garage_table::crdt::*; +use garage_table::replication::sharded::*; use garage_table::*; use crate::version_table::*; -use model010::object_table as prev; - #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { // Primary key @@ -70,7 +69,7 @@ pub enum ObjectVersionState { Aborted, } -impl ObjectVersionState { +impl CRDT for ObjectVersionState { fn merge(&mut self, other: &Self) { use ObjectVersionState::*; match other { @@ -91,37 +90,30 @@ impl ObjectVersionState { } } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionData { DeleteMarker, Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), FirstBlock(ObjectVersionMeta, Hash), } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +impl AutoCRDT for ObjectVersionData { + const WARN_IF_DIFFERENT: bool = true; +} + +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersionMeta { pub headers: ObjectVersionHeaders, pub size: u64, pub etag: String, } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersionHeaders { pub content_type: String, pub other: BTreeMap<String, String>, } -impl ObjectVersionData { - fn merge(&mut self, b: &Self) { - if *self != *b { - warn!( - "Inconsistent object version data: {:?} (local) vs {:?} (remote)", - self, b - ); - } - } -} - impl ObjectVersion { fn cmp_key(&self) -> (u64, UUID) { (self.timestamp, self.uuid) @@ -154,8 +146,14 @@ impl Entry<String, String> for Object { fn sort_key(&self) -> &String { &self.key } + fn is_tombstone(&self) -> bool { + self.versions.len() == 1 && self.versions[0].state == ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) + } +} +impl CRDT for Object { fn merge(&mut self, other: &Self) { + // Merge versions from other into here for other_v in other.versions.iter() { match self .versions @@ -169,6 +167,9 @@ impl Entry<String, String> for Object { } } } + + // Remove versions which are obsolete, i.e. those that come + // before the last version which .is_complete(). let last_complete = self .versions .iter() @@ -212,13 +213,8 @@ impl TableSchema for ObjectTable { } }; if newly_deleted { - let deleted_version = Version::new( - v.uuid, - old_v.bucket.clone(), - old_v.key.clone(), - true, - vec![], - ); + let deleted_version = + Version::new(v.uuid, old_v.bucket.clone(), old_v.key.clone(), true); version_table.insert(&deleted_version).await?; } } @@ -231,55 +227,4 @@ impl TableSchema for ObjectTable { let deleted = !entry.versions.iter().any(|v| v.is_data()); filter.apply(deleted) } - - fn try_migrate(bytes: &[u8]) -> Option<Self::E> { - let old = match rmp_serde::decode::from_read_ref::<_, prev::Object>(bytes) { - Ok(x) => x, - Err(_) => return None, - }; - let new_v = old - .versions() - .iter() - .map(migrate_version) - .collect::<Vec<_>>(); - let new = Object::new(old.bucket.clone(), old.key.clone(), new_v); - Some(new) - } -} - -fn migrate_version(old: &prev::ObjectVersion) -> ObjectVersion { - let headers = ObjectVersionHeaders { - content_type: old.mime_type.clone(), - other: BTreeMap::new(), - }; - let meta = ObjectVersionMeta { - headers: headers.clone(), - size: old.size, - etag: "".to_string(), - }; - let state = match old.state { - prev::ObjectVersionState::Uploading => ObjectVersionState::Uploading(headers), - prev::ObjectVersionState::Aborted => ObjectVersionState::Aborted, - prev::ObjectVersionState::Complete => match &old.data { - prev::ObjectVersionData::Uploading => ObjectVersionState::Uploading(headers), - prev::ObjectVersionData::DeleteMarker => { - ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) - } - prev::ObjectVersionData::Inline(x) => { - ObjectVersionState::Complete(ObjectVersionData::Inline(meta, x.clone())) - } - prev::ObjectVersionData::FirstBlock(h) => { - let mut hash = [0u8; 32]; - hash.copy_from_slice(h.as_ref()); - ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, Hash::from(hash))) - } - }, - }; - let mut uuid = [0u8; 32]; - uuid.copy_from_slice(old.uuid.as_ref()); - ObjectVersion { - uuid: UUID::from(uuid), - timestamp: old.timestamp, - state, - } } diff --git a/src/model/version_table.rs b/src/model/version_table.rs index cf9fbe98..841fbfea 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -4,7 +4,8 @@ use std::sync::Arc; use garage_util::background::BackgroundRunner; use garage_util::data::*; -use garage_table::table_sharded::*; +use garage_table::crdt::*; +use garage_table::replication::sharded::*; use garage_table::*; use crate::block_ref_table::*; @@ -15,8 +16,11 @@ pub struct Version { pub uuid: UUID, // Actual data: the blocks for this version - pub deleted: bool, - blocks: Vec<VersionBlock>, + // In the case of a multipart upload, also store the etags + // of individual parts and check them when doing CompleteMultipartUpload + pub deleted: crdt::Bool, + pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, + pub parts_etags: crdt::Map<u64, String>, // Back link to bucket+key so that we can figure if // this was deleted later on @@ -25,56 +29,46 @@ pub struct Version { } impl Version { - pub fn new( - uuid: UUID, - bucket: String, - key: String, - deleted: bool, - blocks: Vec<VersionBlock>, - ) -> Self { - let mut ret = Self { + pub fn new(uuid: UUID, bucket: String, key: String, deleted: bool) -> Self { + Self { uuid, - deleted, - blocks: vec![], + deleted: deleted.into(), + blocks: crdt::Map::new(), + parts_etags: crdt::Map::new(), bucket, key, - }; - for b in blocks { - ret.add_block(b) - .expect("Twice the same VersionBlock in Version constructor"); } - ret } - /// Adds a block if it wasn't already present - pub fn add_block(&mut self, new: VersionBlock) -> Result<(), ()> { - match self - .blocks - .binary_search_by(|b| b.cmp_key().cmp(&new.cmp_key())) - { - Err(i) => { - self.blocks.insert(i, new); - Ok(()) - } - Ok(_) => Err(()), - } +} + +#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] +pub struct VersionBlockKey { + pub part_number: u64, + pub offset: u64, +} + +impl Ord for VersionBlockKey { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.part_number + .cmp(&other.part_number) + .then(self.offset.cmp(&other.offset)) } - pub fn blocks(&self) -> &[VersionBlock] { - &self.blocks[..] +} + +impl PartialOrd for VersionBlockKey { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) } } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] pub struct VersionBlock { - pub part_number: u64, - pub offset: u64, pub hash: Hash, pub size: u64, } -impl VersionBlock { - fn cmp_key(&self) -> (u64, u64) { - (self.part_number, self.offset) - } +impl AutoCRDT for VersionBlock { + const WARN_IF_DIFFERENT: bool = true; } impl Entry<Hash, EmptyKey> for Version { @@ -84,23 +78,21 @@ impl Entry<Hash, EmptyKey> for Version { fn sort_key(&self) -> &EmptyKey { &EmptyKey } + fn is_tombstone(&self) -> bool { + self.deleted.get() + } +} +impl CRDT for Version { fn merge(&mut self, other: &Self) { - if other.deleted { - self.deleted = true; + self.deleted.merge(&other.deleted); + + if self.deleted.get() { self.blocks.clear(); - } else if !self.deleted { - for bi in other.blocks.iter() { - match self - .blocks - .binary_search_by(|x| x.cmp_key().cmp(&bi.cmp_key())) - { - Ok(_) => (), - Err(pos) => { - self.blocks.insert(pos, bi.clone()); - } - } - } + self.parts_etags.clear(); + } else { + self.blocks.merge(&other.blocks); + self.parts_etags.merge(&other.parts_etags); } } } @@ -121,14 +113,15 @@ impl TableSchema for VersionTable { self.background.spawn(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of version blocks - if new_v.deleted && !old_v.deleted { + if new_v.deleted.get() && !old_v.deleted.get() { let deleted_block_refs = old_v .blocks + .items() .iter() - .map(|vb| BlockRef { + .map(|(_k, vb)| BlockRef { block: vb.hash, version: old_v.uuid, - deleted: true, + deleted: true.into(), }) .collect::<Vec<_>>(); block_ref_table.insert_many(&deleted_block_refs[..]).await?; @@ -139,6 +132,6 @@ impl TableSchema for VersionTable { } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted) + filter.apply(entry.deleted.get()) } } |