From bec26a13129d8215d5ff97ad0cf587f6572f67ac Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 7 Jul 2020 13:59:22 +0200 Subject: Rename garage_core to garage_model --- Cargo.lock | 6 +- Cargo.toml | 2 +- src/api/Cargo.toml | 2 +- src/api/api_server.rs | 2 +- src/api/s3_copy.rs | 8 +- src/api/s3_delete.rs | 4 +- src/api/s3_get.rs | 4 +- src/api/s3_list.rs | 2 +- src/api/s3_put.rs | 10 +- src/api/signature.rs | 4 +- src/core/Cargo.toml | 34 --- src/core/block.rs | 506 ------------------------------------------- src/core/block_ref_table.rs | 68 ------ src/core/bucket_table.rs | 121 ----------- src/core/garage.rs | 166 -------------- src/core/key_table.rs | 154 ------------- src/core/lib.rs | 10 - src/core/object_table.rs | 198 ----------------- src/core/version_table.rs | 145 ------------- src/garage/Cargo.toml | 2 +- src/garage/admin_rpc.rs | 6 +- src/garage/repair.rs | 8 +- src/garage/server.rs | 2 +- src/model/Cargo.toml | 34 +++ src/model/block.rs | 506 +++++++++++++++++++++++++++++++++++++++++++ src/model/block_ref_table.rs | 68 ++++++ src/model/bucket_table.rs | 121 +++++++++++ src/model/garage.rs | 166 ++++++++++++++ src/model/key_table.rs | 154 +++++++++++++ src/model/lib.rs | 10 + src/model/object_table.rs | 198 +++++++++++++++++ src/model/version_table.rs | 145 +++++++++++++ 32 files changed, 1433 insertions(+), 1433 deletions(-) delete mode 100644 src/core/Cargo.toml delete mode 100644 src/core/block.rs delete mode 100644 src/core/block_ref_table.rs delete mode 100644 src/core/bucket_table.rs delete mode 100644 src/core/garage.rs delete mode 100644 src/core/key_table.rs delete mode 100644 src/core/lib.rs delete mode 100644 src/core/object_table.rs delete mode 100644 src/core/version_table.rs create mode 100644 src/model/Cargo.toml create mode 100644 src/model/block.rs create mode 100644 src/model/block_ref_table.rs create mode 100644 src/model/bucket_table.rs create mode 100644 src/model/garage.rs create mode 100644 src/model/key_table.rs create mode 100644 src/model/lib.rs create mode 100644 src/model/object_table.rs create mode 100644 src/model/version_table.rs diff --git a/Cargo.lock b/Cargo.lock index 1b0aad78..866d90e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -339,7 +339,7 @@ dependencies = [ "futures 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "garage_api 0.1.0", - "garage_core 0.1.0", + "garage_model 0.1.0", "garage_rpc 0.1.0", "garage_table 0.1.0", "garage_util 0.1.0", @@ -365,7 +365,7 @@ dependencies = [ "crypto-mac 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", - "garage_core 0.1.0", + "garage_model 0.1.0", "garage_table 0.1.0", "garage_util 0.1.0", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -383,7 +383,7 @@ dependencies = [ ] [[package]] -name = "garage_core" +name = "garage_model" version = "0.1.0" dependencies = [ "arc-swap 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index b3043acd..7a8c74e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = [ "src/util", "src/rpc", "src/table", - "src/core", + "src/model", "src/api", "src/garage", ] diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 56e0e2a8..99838a56 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -12,7 +12,7 @@ path = "lib.rs" [dependencies] garage_util = { path = "../util" } garage_table = { path = "../table" } -garage_core = { path = "../core" } +garage_model = { path = "../model" } bytes = "0.4" hex = "0.3" diff --git a/src/api/api_server.rs b/src/api/api_server.rs index 32506ccd..699dc5c4 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -9,7 +9,7 @@ use hyper::{Body, Method, Request, Response, Server}; use garage_util::error::Error; -use garage_core::garage::Garage; +use garage_model::garage::Garage; use crate::http_util::*; use crate::signature::check_signature; diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs index f8eaa0e7..6a8d8f87 100644 --- a/src/api/s3_copy.rs +++ b/src/api/s3_copy.rs @@ -8,10 +8,10 @@ use garage_table::*; use garage_util::data::*; use garage_util::error::Error; -use garage_core::block_ref_table::*; -use garage_core::garage::Garage; -use garage_core::object_table::*; -use garage_core::version_table::*; +use garage_model::block_ref_table::*; +use garage_model::garage::Garage; +use garage_model::object_table::*; +use garage_model::version_table::*; use crate::http_util::*; diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs index e77ab314..60714b0c 100644 --- a/src/api/s3_delete.rs +++ b/src/api/s3_delete.rs @@ -6,8 +6,8 @@ use hyper::{Body, Request, Response}; use garage_util::data::*; use garage_util::error::Error; -use garage_core::garage::Garage; -use garage_core::object_table::*; +use garage_model::garage::Garage; +use garage_model::object_table::*; use crate::encoding::*; use crate::http_util::*; diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs index 3ed0f914..63200ca3 100644 --- a/src/api/s3_get.rs +++ b/src/api/s3_get.rs @@ -9,8 +9,8 @@ use garage_util::error::Error; use garage_table::EmptyKey; -use garage_core::garage::Garage; -use garage_core::object_table::*; +use garage_model::garage::Garage; +use garage_model::object_table::*; use crate::http_util::*; diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index b8babbbf..c4fbf6f2 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -7,7 +7,7 @@ use hyper::Response; use garage_util::error::Error; -use garage_core::garage::Garage; +use garage_model::garage::Garage; use crate::encoding::*; use crate::http_util::*; diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index e1e4c02d..bddfa444 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -9,11 +9,11 @@ use garage_table::*; use garage_util::data::*; use garage_util::error::Error; -use garage_core::block::INLINE_THRESHOLD; -use garage_core::block_ref_table::*; -use garage_core::garage::Garage; -use garage_core::object_table::*; -use garage_core::version_table::*; +use garage_model::block::INLINE_THRESHOLD; +use garage_model::block_ref_table::*; +use garage_model::garage::Garage; +use garage_model::object_table::*; +use garage_model::version_table::*; use crate::encoding::*; use crate::http_util::*; diff --git a/src/api/signature.rs b/src/api/signature.rs index a1ccfd08..65f31f21 100644 --- a/src/api/signature.rs +++ b/src/api/signature.rs @@ -8,8 +8,8 @@ use sha2::{Digest, Sha256}; use garage_table::*; use garage_util::error::Error; -use garage_core::garage::Garage; -use garage_core::key_table::*; +use garage_model::garage::Garage; +use garage_model::key_table::*; use crate::encoding::uri_encode; diff --git a/src/core/Cargo.toml b/src/core/Cargo.toml deleted file mode 100644 index 1e482d87..00000000 --- a/src/core/Cargo.toml +++ /dev/null @@ -1,34 +0,0 @@ -[package] -name = "garage_core" -version = "0.1.0" -authors = ["Alex Auvolat "] -edition = "2018" - -[lib] -path = "lib.rs" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -garage_util = { path = "../util" } -garage_rpc = { path = "../rpc" } -garage_table = { path = "../table" } - -bytes = "0.4" -rand = "0.7" -hex = "0.3" -sha2 = "0.8" -arc-swap = "0.4" -log = "0.4" - -sled = "0.31" - -rmp-serde = "0.14.3" -serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } -serde_bytes = "0.11" - -async-trait = "0.1.30" -futures = "0.3" -futures-util = "0.3" -tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } - diff --git a/src/core/block.rs b/src/core/block.rs deleted file mode 100644 index af8b9efb..00000000 --- a/src/core/block.rs +++ /dev/null @@ -1,506 +0,0 @@ -use std::path::PathBuf; -use std::sync::Arc; -use std::time::Duration; - -use arc_swap::ArcSwapOption; -use futures::future::*; -use futures::select; -use futures::stream::*; -use serde::{Deserialize, Serialize}; -use tokio::fs; -use tokio::prelude::*; -use tokio::sync::{watch, Mutex, Notify}; - -use garage_util::data; -use garage_util::data::*; -use garage_util::error::Error; - -use garage_rpc::membership::System; -use garage_rpc::rpc_client::*; -use garage_rpc::rpc_server::*; - -use garage_table::table_sharded::TableShardedReplication; -use garage_table::TableReplication; - -use crate::block_ref_table::*; - -use crate::garage::Garage; - -pub const INLINE_THRESHOLD: usize = 3072; - -const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42); -const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); -const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); - -#[derive(Debug, Serialize, Deserialize)] -pub enum Message { - Ok, - GetBlock(Hash), - PutBlock(PutBlockMessage), - NeedBlockQuery(Hash), - NeedBlockReply(bool), -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct PutBlockMessage { - pub hash: Hash, - - #[serde(with = "serde_bytes")] - pub data: Vec, -} - -impl RpcMessage for Message {} - -pub struct BlockManager { - pub replication: TableShardedReplication, - pub data_dir: PathBuf, - pub data_dir_lock: Mutex<()>, - - pub rc: sled::Tree, - - pub resync_queue: sled::Tree, - pub resync_notify: Notify, - - pub system: Arc, - rpc_client: Arc>, - pub garage: ArcSwapOption, -} - -impl BlockManager { - pub fn new( - db: &sled::Db, - data_dir: PathBuf, - replication: TableShardedReplication, - system: Arc, - rpc_server: &mut RpcServer, - ) -> Arc { - let rc = db - .open_tree("block_local_rc") - .expect("Unable to open block_local_rc tree"); - rc.set_merge_operator(rc_merge); - - let resync_queue = db - .open_tree("block_local_resync_queue") - .expect("Unable to open block_local_resync_queue tree"); - - let rpc_path = "block_manager"; - let rpc_client = system.rpc_client::(rpc_path); - - let block_manager = Arc::new(Self { - replication, - data_dir, - data_dir_lock: Mutex::new(()), - rc, - resync_queue, - resync_notify: Notify::new(), - system, - rpc_client, - garage: ArcSwapOption::from(None), - }); - block_manager - .clone() - .register_handler(rpc_server, rpc_path.into()); - block_manager - } - - fn register_handler(self: Arc, rpc_server: &mut RpcServer, path: String) { - let self2 = self.clone(); - rpc_server.add_handler::(path, move |msg, _addr| { - let self2 = self2.clone(); - async move { self2.handle(&msg).await } - }); - - let self2 = self.clone(); - self.rpc_client - .set_local_handler(self.system.id, move |msg| { - let self2 = self2.clone(); - async move { self2.handle(&msg).await } - }); - } - - async fn handle(self: Arc, msg: &Message) -> Result { - match msg { - Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await, - Message::GetBlock(h) => self.read_block(h).await, - Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply), - _ => Err(Error::BadRequest(format!("Unexpected RPC message"))), - } - } - - pub async fn spawn_background_worker(self: Arc) { - // Launch 2 simultaneous workers for background resync loop preprocessing - for i in 0..2usize { - let bm2 = self.clone(); - let background = self.system.background.clone(); - tokio::spawn(async move { - tokio::time::delay_for(Duration::from_secs(10)).await; - background - .spawn_worker(format!("block resync worker {}", i), move |must_exit| { - bm2.resync_loop(must_exit) - }) - .await; - }); - } - } - - pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result { - let _lock = self.data_dir_lock.lock().await; - - let mut path = self.block_dir(hash); - fs::create_dir_all(&path).await?; - - path.push(hex::encode(hash)); - if fs::metadata(&path).await.is_ok() { - return Ok(Message::Ok); - } - - let mut f = fs::File::create(path).await?; - f.write_all(data).await?; - drop(f); - - Ok(Message::Ok) - } - - pub async fn read_block(&self, hash: &Hash) -> Result { - let path = self.block_path(hash); - - let mut f = match fs::File::open(&path).await { - Ok(f) => f, - Err(e) => { - // Not found but maybe we should have had it ?? - self.put_to_resync(hash, 0)?; - return Err(Into::into(e)); - } - }; - let mut data = vec![]; - f.read_to_end(&mut data).await?; - drop(f); - - if data::hash(&data[..]) != *hash { - let _lock = self.data_dir_lock.lock().await; - warn!("Block {:?} is corrupted. Deleting and resyncing.", hash); - fs::remove_file(path).await?; - self.put_to_resync(&hash, 0)?; - return Err(Error::CorruptData(*hash)); - } - - Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) - } - - pub async fn need_block(&self, hash: &Hash) -> Result { - let needed = self - .rc - .get(hash.as_ref())? - .map(|x| u64_from_bytes(x.as_ref()) > 0) - .unwrap_or(false); - if needed { - let path = self.block_path(hash); - let exists = fs::metadata(&path).await.is_ok(); - Ok(!exists) - } else { - Ok(false) - } - } - - fn block_dir(&self, hash: &Hash) -> PathBuf { - let mut path = self.data_dir.clone(); - path.push(hex::encode(&hash.as_slice()[0..1])); - path.push(hex::encode(&hash.as_slice()[1..2])); - path - } - fn block_path(&self, hash: &Hash) -> PathBuf { - let mut path = self.block_dir(hash); - path.push(hex::encode(hash.as_ref())); - path - } - - pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { - let old_rc = self.rc.get(&hash)?; - self.rc.merge(&hash, vec![1])?; - if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { - self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?; - } - Ok(()) - } - - pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { - let new_rc = self.rc.merge(&hash, vec![0])?; - if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { - self.put_to_resync(&hash, 0)?; - } - Ok(()) - } - - fn put_to_resync(&self, hash: &Hash, delay_millis: u64) -> Result<(), Error> { - let when = now_msec() + delay_millis; - trace!("Put resync_queue: {} {:?}", when, hash); - let mut key = u64::to_be_bytes(when).to_vec(); - key.extend(hash.as_ref()); - self.resync_queue.insert(key, hash.as_ref())?; - self.resync_notify.notify(); - Ok(()) - } - - async fn resync_loop( - self: Arc, - mut must_exit: watch::Receiver, - ) -> Result<(), Error> { - let mut n_failures = 0usize; - while !*must_exit.borrow() { - if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { - let time_msec = u64_from_bytes(&time_bytes[0..8]); - let now = now_msec(); - if now >= time_msec { - let mut hash = [0u8; 32]; - hash.copy_from_slice(hash_bytes.as_ref()); - let hash = Hash::from(hash); - - if let Err(e) = self.resync_iter(&hash).await { - warn!("Failed to resync block {:?}, retrying later: {}", hash, e); - self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?; - n_failures += 1; - if n_failures >= 10 { - warn!("Too many resync failures, throttling."); - tokio::time::delay_for(Duration::from_secs(1)).await; - } - } else { - n_failures = 0; - } - } else { - self.resync_queue.insert(time_bytes, hash_bytes)?; - let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now)); - select! { - _ = delay.fuse() => (), - _ = self.resync_notify.notified().fuse() => (), - _ = must_exit.recv().fuse() => (), - } - } - } else { - select! { - _ = self.resync_notify.notified().fuse() => (), - _ = must_exit.recv().fuse() => (), - } - } - } - Ok(()) - } - - async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> { - let path = self.block_path(hash); - - let exists = fs::metadata(&path).await.is_ok(); - let needed = self - .rc - .get(hash.as_ref())? - .map(|x| u64_from_bytes(x.as_ref()) > 0) - .unwrap_or(false); - - if exists != needed { - info!( - "Resync block {:?}: exists {}, needed {}", - hash, exists, needed - ); - } - - if exists && !needed { - let garage = self.garage.load_full().unwrap(); - let active_refs = garage - .block_ref_table - .get_range(&hash, None, Some(()), 1) - .await?; - let needed_by_others = !active_refs.is_empty(); - if needed_by_others { - let ring = self.system.ring.borrow().clone(); - let who = self.replication.replication_nodes(&hash, &ring); - let msg = Arc::new(Message::NeedBlockQuery(*hash)); - let who_needs_fut = who.iter().map(|to| { - self.rpc_client - .call_arc(*to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT) - }); - let who_needs = join_all(who_needs_fut).await; - - let mut need_nodes = vec![]; - for (node, needed) in who.into_iter().zip(who_needs.iter()) { - match needed { - Ok(Message::NeedBlockReply(needed)) => { - if *needed { - need_nodes.push(node); - } - } - Err(e) => { - return Err(Error::Message(format!( - "Should delete block, but unable to confirm that all other nodes that need it have it: {}", - e - ))); - } - Ok(_) => { - return Err(Error::Message(format!( - "Unexpected response to NeedBlockQuery RPC" - ))); - } - } - } - - if need_nodes.len() > 0 { - let put_block_message = self.read_block(hash).await?; - self.rpc_client - .try_call_many( - &need_nodes[..], - put_block_message, - RequestStrategy::with_quorum(need_nodes.len()) - .with_timeout(BLOCK_RW_TIMEOUT), - ) - .await?; - } - } - fs::remove_file(path).await?; - self.resync_queue.remove(&hash)?; - } - - if needed && !exists { - // TODO find a way to not do this if they are sending it to us - // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay - // between the RC being incremented and this part being called. - let block_data = self.rpc_get_block(&hash).await?; - self.write_block(hash, &block_data[..]).await?; - } - - Ok(()) - } - - pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { - let who = self.replication.read_nodes(&hash, &self.system); - let resps = self - .rpc_client - .try_call_many( - &who[..], - Message::GetBlock(*hash), - RequestStrategy::with_quorum(1) - .with_timeout(BLOCK_RW_TIMEOUT) - .interrupt_after_quorum(true), - ) - .await?; - - for resp in resps { - if let Message::PutBlock(msg) = resp { - return Ok(msg.data); - } - } - Err(Error::Message(format!( - "Unable to read block {:?}: no valid blocks returned", - hash - ))) - } - - pub async fn rpc_put_block(&self, hash: Hash, data: Vec) -> Result<(), Error> { - let who = self.replication.write_nodes(&hash, &self.system); - self.rpc_client - .try_call_many( - &who[..], - Message::PutBlock(PutBlockMessage { hash, data }), - RequestStrategy::with_quorum(self.replication.write_quorum()) - .with_timeout(BLOCK_RW_TIMEOUT), - ) - .await?; - Ok(()) - } - - pub async fn repair_data_store(&self, must_exit: &watch::Receiver) -> Result<(), Error> { - // 1. Repair blocks from RC table - let garage = self.garage.load_full().unwrap(); - let mut last_hash = None; - let mut i = 0usize; - for entry in garage.block_ref_table.store.iter() { - let (_k, v_bytes) = entry?; - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(v_bytes.as_ref())?; - if Some(&block_ref.block) == last_hash.as_ref() { - continue; - } - if !block_ref.deleted { - last_hash = Some(block_ref.block); - self.put_to_resync(&block_ref.block, 0)?; - } - i += 1; - if i & 0xFF == 0 && *must_exit.borrow() { - return Ok(()); - } - } - - // 2. Repair blocks actually on disk - let mut ls_data_dir = fs::read_dir(&self.data_dir).await?; - while let Some(data_dir_ent) = ls_data_dir.next().await { - let data_dir_ent = data_dir_ent?; - let dir_name = data_dir_ent.file_name(); - let dir_name = match dir_name.into_string() { - Ok(x) => x, - Err(_) => continue, - }; - if dir_name.len() != 2 || hex::decode(&dir_name).is_err() { - continue; - } - - let mut ls_data_dir_2 = match fs::read_dir(data_dir_ent.path()).await { - Err(e) => { - warn!( - "Warning: could not list dir {:?}: {}", - data_dir_ent.path().to_str(), - e - ); - continue; - } - Ok(x) => x, - }; - while let Some(file) = ls_data_dir_2.next().await { - let file = file?; - let file_name = file.file_name(); - let file_name = match file_name.into_string() { - Ok(x) => x, - Err(_) => continue, - }; - if file_name.len() != 64 { - continue; - } - let hash_bytes = match hex::decode(&file_name) { - Ok(h) => h, - Err(_) => continue, - }; - let mut hash = [0u8; 32]; - hash.copy_from_slice(&hash_bytes[..]); - self.put_to_resync(&hash.into(), 0)?; - - if *must_exit.borrow() { - return Ok(()); - } - } - } - Ok(()) - } -} - -fn u64_from_bytes(bytes: &[u8]) -> u64 { - assert!(bytes.len() == 8); - let mut x8 = [0u8; 8]; - x8.copy_from_slice(bytes); - u64::from_be_bytes(x8) -} - -fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option> { - let old = old.map(u64_from_bytes).unwrap_or(0); - assert!(new.len() == 1); - let new = match new[0] { - 0 => { - if old > 0 { - old - 1 - } else { - 0 - } - } - 1 => old + 1, - _ => unreachable!(), - }; - if new == 0 { - None - } else { - Some(u64::to_be_bytes(new).to_vec()) - } -} diff --git a/src/core/block_ref_table.rs b/src/core/block_ref_table.rs deleted file mode 100644 index a00438c0..00000000 --- a/src/core/block_ref_table.rs +++ /dev/null @@ -1,68 +0,0 @@ -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; - -use garage_util::background::*; -use garage_util::data::*; -use garage_util::error::Error; - -use garage_table::*; - -use crate::block::*; - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct BlockRef { - // Primary key - pub block: Hash, - - // Sort key - pub version: UUID, - - // Keep track of deleted status - pub deleted: bool, -} - -impl Entry for BlockRef { - fn partition_key(&self) -> &Hash { - &self.block - } - fn sort_key(&self) -> &UUID { - &self.version - } - - fn merge(&mut self, other: &Self) { - if other.deleted { - self.deleted = true; - } - } -} - -pub struct BlockRefTable { - pub background: Arc, - pub block_manager: Arc, -} - -#[async_trait] -impl TableSchema for BlockRefTable { - type P = Hash; - type S = UUID; - type E = BlockRef; - type Filter = (); - - async fn updated(&self, old: Option, new: Option) -> Result<(), Error> { - let block = &old.as_ref().or(new.as_ref()).unwrap().block; - let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false); - let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false); - if is_after && !was_before { - self.block_manager.block_incref(block)?; - } - if was_before && !is_after { - self.block_manager.block_decref(block)?; - } - Ok(()) - } - - fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { - !entry.deleted - } -} diff --git a/src/core/bucket_table.rs b/src/core/bucket_table.rs deleted file mode 100644 index 28234d82..00000000 --- a/src/core/bucket_table.rs +++ /dev/null @@ -1,121 +0,0 @@ -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; - -use garage_table::*; -use garage_util::error::Error; - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct Bucket { - // Primary key - pub name: String, - - // Timestamp and deletion - // Upon version increment, all info is replaced - pub timestamp: u64, - pub deleted: bool, - - // Authorized keys - authorized_keys: Vec, -} - -impl Bucket { - pub fn new( - name: String, - timestamp: u64, - deleted: bool, - authorized_keys: Vec, - ) -> Self { - let mut ret = Bucket { - name, - timestamp, - deleted, - authorized_keys: vec![], - }; - for key in authorized_keys { - ret.add_key(key) - .expect("Duplicate AllowedKey in Bucket constructor"); - } - ret - } - /// Add a key only if it is not already present - pub fn add_key(&mut self, key: AllowedKey) -> Result<(), ()> { - match self - .authorized_keys - .binary_search_by(|k| k.key_id.cmp(&key.key_id)) - { - Err(i) => { - self.authorized_keys.insert(i, key); - Ok(()) - } - Ok(_) => Err(()), - } - } - pub fn authorized_keys(&self) -> &[AllowedKey] { - &self.authorized_keys[..] - } - pub fn clear_keys(&mut self) { - self.authorized_keys.clear(); - } -} - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct AllowedKey { - pub key_id: String, - pub timestamp: u64, - pub allow_read: bool, - pub allow_write: bool, -} - -impl Entry for Bucket { - fn partition_key(&self) -> &EmptyKey { - &EmptyKey - } - fn sort_key(&self) -> &String { - &self.name - } - - fn merge(&mut self, other: &Self) { - if other.timestamp < self.timestamp { - *self = other.clone(); - return; - } - if self.timestamp > other.timestamp || self.deleted { - return; - } - - for ak in other.authorized_keys.iter() { - match self - .authorized_keys - .binary_search_by(|our_ak| our_ak.key_id.cmp(&ak.key_id)) - { - Ok(i) => { - let our_ak = &mut self.authorized_keys[i]; - if ak.timestamp > our_ak.timestamp { - *our_ak = ak.clone(); - } - } - Err(i) => { - self.authorized_keys.insert(i, ak.clone()); - } - } - } - } -} - -pub struct BucketTable; - -#[async_trait] -impl TableSchema for BucketTable { - type P = EmptyKey; - type S = String; - type E = Bucket; - type Filter = (); - - async fn updated(&self, _old: Option, _new: Option) -> Result<(), Error> { - Ok(()) - } - - fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { - !entry.deleted - } -} diff --git a/src/core/garage.rs b/src/core/garage.rs deleted file mode 100644 index 46e0d02f..00000000 --- a/src/core/garage.rs +++ /dev/null @@ -1,166 +0,0 @@ -use std::sync::Arc; - -use garage_util::background::*; -use garage_util::config::*; - -use garage_rpc::membership::System; -use garage_rpc::rpc_client::RpcHttpClient; -use garage_rpc::rpc_server::RpcServer; - -use garage_table::table_fullcopy::*; -use garage_table::table_sharded::*; -use garage_table::*; - -use crate::block::*; -use crate::block_ref_table::*; -use crate::bucket_table::*; -use crate::key_table::*; -use crate::object_table::*; -use crate::version_table::*; - -pub struct Garage { - pub config: Config, - - pub db: sled::Db, - pub background: Arc, - pub system: Arc, - pub block_manager: Arc, - - pub bucket_table: Arc>, - pub key_table: Arc>, - - pub object_table: Arc>, - pub version_table: Arc>, - pub block_ref_table: Arc>, -} - -impl Garage { - pub async fn new( - config: Config, - db: sled::Db, - background: Arc, - rpc_server: &mut RpcServer, - ) -> Arc { - info!("Initialize membership management system..."); - let rpc_http_client = Arc::new( - RpcHttpClient::new(config.max_concurrent_rpc_requests, &config.rpc_tls) - .expect("Could not create RPC client"), - ); - let system = System::new( - config.metadata_dir.clone(), - rpc_http_client, - background.clone(), - rpc_server, - ); - - let data_rep_param = TableShardedReplication { - replication_factor: config.data_replication_factor, - write_quorum: (config.data_replication_factor + 1) / 2, - read_quorum: 1, - }; - - let meta_rep_param = TableShardedReplication { - replication_factor: config.meta_replication_factor, - write_quorum: (config.meta_replication_factor + 1) / 2, - read_quorum: (config.meta_replication_factor + 1) / 2, - }; - - let control_rep_param = TableFullReplication::new( - config.meta_epidemic_fanout, - (config.meta_epidemic_fanout + 1) / 2, - ); - - info!("Initialize block manager..."); - let block_manager = BlockManager::new( - &db, - config.data_dir.clone(), - data_rep_param.clone(), - system.clone(), - rpc_server, - ); - - info!("Initialize block_ref_table..."); - let block_ref_table = Table::new( - BlockRefTable { - background: background.clone(), - block_manager: block_manager.clone(), - }, - data_rep_param.clone(), - system.clone(), - &db, - "block_ref".to_string(), - rpc_server, - ) - .await; - - info!("Initialize version_table..."); - let version_table = Table::new( - VersionTable { - background: background.clone(), - block_ref_table: block_ref_table.clone(), - }, - meta_rep_param.clone(), - system.clone(), - &db, - "version".to_string(), - rpc_server, - ) - .await; - - info!("Initialize object_table..."); - let object_table = Table::new( - ObjectTable { - background: background.clone(), - version_table: version_table.clone(), - }, - meta_rep_param.clone(), - system.clone(), - &db, - "object".to_string(), - rpc_server, - ) - .await; - - info!("Initialize bucket_table..."); - let bucket_table = Table::new( - BucketTable, - control_rep_param.clone(), - system.clone(), - &db, - "bucket".to_string(), - rpc_server, - ) - .await; - - info!("Initialize key_table_table..."); - let key_table = Table::new( - KeyTable, - control_rep_param.clone(), - system.clone(), - &db, - "key".to_string(), - rpc_server, - ) - .await; - - info!("Initialize Garage..."); - let garage = Arc::new(Self { - config, - db, - system: system.clone(), - block_manager, - background, - bucket_table, - key_table, - object_table, - version_table, - block_ref_table, - }); - - info!("Start block manager background thread..."); - garage.block_manager.garage.swap(Some(garage.clone())); - garage.block_manager.clone().spawn_background_worker().await; - - garage - } -} diff --git a/src/core/key_table.rs b/src/core/key_table.rs deleted file mode 100644 index 76d163b5..00000000 --- a/src/core/key_table.rs +++ /dev/null @@ -1,154 +0,0 @@ -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; - -use garage_table::*; -use garage_util::data::*; -use garage_util::error::Error; - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct Key { - // Primary key - pub key_id: String, - - // Associated secret key (immutable) - pub secret_key: String, - - // Name - pub name: String, - pub name_timestamp: u64, - - // Deletion - pub deleted: bool, - - // Authorized keys - authorized_buckets: Vec, -} - -impl Key { - pub fn new(name: String, buckets: Vec) -> Self { - let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..])); - let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]); - let mut ret = Self { - key_id, - secret_key, - name, - name_timestamp: now_msec(), - deleted: false, - authorized_buckets: vec![], - }; - for b in buckets { - ret.add_bucket(b) - .expect("Duplicate AllowedBucket in Key constructor"); - } - ret - } - pub fn delete(key_id: String) -> Self { - Self { - key_id, - secret_key: "".into(), - name: "".into(), - name_timestamp: now_msec(), - deleted: true, - authorized_buckets: vec![], - } - } - /// Add an authorized bucket, only if it wasn't there before - pub fn add_bucket(&mut self, new: AllowedBucket) -> Result<(), ()> { - match self - .authorized_buckets - .binary_search_by(|b| b.bucket.cmp(&new.bucket)) - { - Err(i) => { - self.authorized_buckets.insert(i, new); - Ok(()) - } - Ok(_) => Err(()), - } - } - pub fn authorized_buckets(&self) -> &[AllowedBucket] { - &self.authorized_buckets[..] - } - pub fn clear_buckets(&mut self) { - self.authorized_buckets.clear(); - } - pub fn allow_read(&self, bucket: &str) -> bool { - self.authorized_buckets - .iter() - .find(|x| x.bucket.as_str() == bucket) - .map(|x| x.allow_read) - .unwrap_or(false) - } - pub fn allow_write(&self, bucket: &str) -> bool { - self.authorized_buckets - .iter() - .find(|x| x.bucket.as_str() == bucket) - .map(|x| x.allow_write) - .unwrap_or(false) - } -} - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct AllowedBucket { - pub bucket: String, - pub timestamp: u64, - pub allow_read: bool, - pub allow_write: bool, -} - -impl Entry for Key { - fn partition_key(&self) -> &EmptyKey { - &EmptyKey - } - fn sort_key(&self) -> &String { - &self.key_id - } - - fn merge(&mut self, other: &Self) { - if other.deleted { - self.deleted = true; - } - if self.deleted { - self.authorized_buckets.clear(); - return; - } - if other.name_timestamp > self.name_timestamp { - self.name_timestamp = other.name_timestamp; - self.name = other.name.clone(); - } - - for ab in other.authorized_buckets.iter() { - match self - .authorized_buckets - .binary_search_by(|our_ab| our_ab.bucket.cmp(&ab.bucket)) - { - Ok(i) => { - let our_ab = &mut self.authorized_buckets[i]; - if ab.timestamp > our_ab.timestamp { - *our_ab = ab.clone(); - } - } - Err(i) => { - self.authorized_buckets.insert(i, ab.clone()); - } - } - } - } -} - -pub struct KeyTable; - -#[async_trait] -impl TableSchema for KeyTable { - type P = EmptyKey; - type S = String; - type E = Key; - type Filter = (); - - async fn updated(&self, _old: Option, _new: Option) -> Result<(), Error> { - Ok(()) - } - - fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { - !entry.deleted - } -} diff --git a/src/core/lib.rs b/src/core/lib.rs deleted file mode 100644 index b4a8ddb7..00000000 --- a/src/core/lib.rs +++ /dev/null @@ -1,10 +0,0 @@ -#[macro_use] -extern crate log; - -pub mod block; -pub mod block_ref_table; -pub mod bucket_table; -pub mod garage; -pub mod key_table; -pub mod object_table; -pub mod version_table; diff --git a/src/core/object_table.rs b/src/core/object_table.rs deleted file mode 100644 index 01df70e6..00000000 --- a/src/core/object_table.rs +++ /dev/null @@ -1,198 +0,0 @@ -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; - -use garage_util::background::BackgroundRunner; -use garage_util::data::*; -use garage_util::error::Error; - -use garage_table::table_sharded::*; -use garage_table::*; - -use crate::version_table::*; - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct Object { - // Primary key - pub bucket: String, - - // Sort key - pub key: String, - - // Data - versions: Vec, -} - -impl Object { - pub fn new(bucket: String, key: String, versions: Vec) -> Self { - let mut ret = Self { - bucket, - key, - versions: vec![], - }; - for v in versions { - ret.add_version(v) - .expect("Twice the same ObjectVersion in Object constructor"); - } - ret - } - /// Adds a version if it wasn't already present - pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> { - match self - .versions - .binary_search_by(|v| v.cmp_key().cmp(&new.cmp_key())) - { - Err(i) => { - self.versions.insert(i, new); - Ok(()) - } - Ok(_) => Err(()), - } - } - pub fn versions(&self) -> &[ObjectVersion] { - &self.versions[..] - } -} - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersion { - pub uuid: UUID, - pub timestamp: u64, - - pub mime_type: String, - pub size: u64, - pub state: ObjectVersionState, - - pub data: ObjectVersionData, -} - -#[derive(PartialEq, Clone, Copy, Debug, Serialize, Deserialize)] -pub enum ObjectVersionState { - Uploading, - Complete, - Aborted, -} - -impl ObjectVersionState { - fn max(self, other: Self) -> Self { - use ObjectVersionState::*; - if self == Aborted || other == Aborted { - Aborted - } else if self == Complete || other == Complete { - Complete - } else { - Uploading - } - } -} - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub enum ObjectVersionData { - Uploading, - DeleteMarker, - Inline(#[serde(with = "serde_bytes")] Vec), - FirstBlock(Hash), -} - -impl ObjectVersion { - fn cmp_key(&self) -> (u64, UUID) { - (self.timestamp, self.uuid) - } - pub fn is_complete(&self) -> bool { - self.state == ObjectVersionState::Complete - } - pub fn is_data(&self) -> bool { - self.state == ObjectVersionState::Complete && self.data != ObjectVersionData::DeleteMarker - } -} - -impl Entry for Object { - fn partition_key(&self) -> &String { - &self.bucket - } - fn sort_key(&self) -> &String { - &self.key - } - - fn merge(&mut self, other: &Self) { - for other_v in other.versions.iter() { - match self - .versions - .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) - { - Ok(i) => { - let mut v = &mut self.versions[i]; - if other_v.size > v.size { - v.size = other_v.size; - } - v.state = v.state.max(other_v.state); - if v.data == ObjectVersionData::Uploading { - v.data = other_v.data.clone(); - } - } - Err(i) => { - self.versions.insert(i, other_v.clone()); - } - } - } - let last_complete = self - .versions - .iter() - .enumerate() - .rev() - .filter(|(_, v)| v.is_complete()) - .next() - .map(|(vi, _)| vi); - - if let Some(last_vi) = last_complete { - self.versions = self.versions.drain(last_vi..).collect::>(); - } - } -} - -pub struct ObjectTable { - pub background: Arc, - pub version_table: Arc>, -} - -#[async_trait] -impl TableSchema for ObjectTable { - type P = String; - type S = String; - type E = Object; - type Filter = (); - - async fn updated(&self, old: Option, new: Option) -> Result<(), Error> { - let version_table = self.version_table.clone(); - if let (Some(old_v), Some(new_v)) = (old, new) { - // Propagate deletion of old versions - for v in old_v.versions.iter() { - let newly_deleted = match new_v - .versions - .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) - { - Err(_) => true, - Ok(i) => { - new_v.versions[i].state == ObjectVersionState::Aborted - && v.state != ObjectVersionState::Aborted - } - }; - if newly_deleted { - let deleted_version = Version::new( - v.uuid, - old_v.bucket.clone(), - old_v.key.clone(), - true, - vec![], - ); - version_table.insert(&deleted_version).await?; - } - } - } - Ok(()) - } - - fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { - entry.versions.iter().any(|v| v.is_data()) - } -} diff --git a/src/core/version_table.rs b/src/core/version_table.rs deleted file mode 100644 index 6054e389..00000000 --- a/src/core/version_table.rs +++ /dev/null @@ -1,145 +0,0 @@ -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; - -use garage_util::background::BackgroundRunner; -use garage_util::data::*; -use garage_util::error::Error; - -use garage_table::table_sharded::*; -use garage_table::*; - -use crate::block_ref_table::*; - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct Version { - // Primary key - pub uuid: UUID, - - // Actual data: the blocks for this version - pub deleted: bool, - blocks: Vec, - - // Back link to bucket+key so that we can figure if - // this was deleted later on - pub bucket: String, - pub key: String, -} - -impl Version { - pub fn new( - uuid: UUID, - bucket: String, - key: String, - deleted: bool, - blocks: Vec, - ) -> Self { - let mut ret = Self { - uuid, - deleted, - blocks: vec![], - bucket, - key, - }; - for b in blocks { - ret.add_block(b) - .expect("Twice the same VersionBlock in Version constructor"); - } - ret - } - /// Adds a block if it wasn't already present - pub fn add_block(&mut self, new: VersionBlock) -> Result<(), ()> { - match self - .blocks - .binary_search_by(|b| b.cmp_key().cmp(&new.cmp_key())) - { - Err(i) => { - self.blocks.insert(i, new); - Ok(()) - } - Ok(_) => Err(()), - } - } - pub fn blocks(&self) -> &[VersionBlock] { - &self.blocks[..] - } -} - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct VersionBlock { - pub part_number: u64, - pub offset: u64, - pub hash: Hash, - pub size: u64, -} - -impl VersionBlock { - fn cmp_key(&self) -> (u64, u64) { - (self.part_number, self.offset) - } -} - -impl Entry for Version { - fn partition_key(&self) -> &Hash { - &self.uuid - } - fn sort_key(&self) -> &EmptyKey { - &EmptyKey - } - - fn merge(&mut self, other: &Self) { - if other.deleted { - self.deleted = true; - self.blocks.clear(); - } else if !self.deleted { - for bi in other.blocks.iter() { - match self - .blocks - .binary_search_by(|x| x.cmp_key().cmp(&bi.cmp_key())) - { - Ok(_) => (), - Err(pos) => { - self.blocks.insert(pos, bi.clone()); - } - } - } - } - } -} - -pub struct VersionTable { - pub background: Arc, - pub block_ref_table: Arc>, -} - -#[async_trait] -impl TableSchema for VersionTable { - type P = Hash; - type S = EmptyKey; - type E = Version; - type Filter = (); - - async fn updated(&self, old: Option, new: Option) -> Result<(), Error> { - let block_ref_table = self.block_ref_table.clone(); - if let (Some(old_v), Some(new_v)) = (old, new) { - // Propagate deletion of version blocks - if new_v.deleted && !old_v.deleted { - let deleted_block_refs = old_v - .blocks - .iter() - .map(|vb| BlockRef { - block: vb.hash, - version: old_v.uuid, - deleted: true, - }) - .collect::>(); - block_ref_table.insert_many(&deleted_block_refs[..]).await?; - } - } - Ok(()) - } - - fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { - !entry.deleted - } -} diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 08b55c32..2daa0f75 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -14,7 +14,7 @@ path = "main.rs" garage_util = { path = "../util" } garage_rpc = { path = "../rpc" } garage_table = { path = "../table" } -garage_core = { path = "../core" } +garage_model = { path = "../model" } garage_api = { path = "../api" } bytes = "0.4" diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index aeaf2682..1dd118ac 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -10,9 +10,9 @@ use garage_table::*; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; -use garage_core::bucket_table::*; -use garage_core::garage::Garage; -use garage_core::key_table::*; +use garage_model::bucket_table::*; +use garage_model::garage::Garage; +use garage_model::key_table::*; use crate::repair::Repair; use crate::*; diff --git a/src/garage/repair.rs b/src/garage/repair.rs index afea61de..297ae9cd 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -2,10 +2,10 @@ use std::sync::Arc; use tokio::sync::watch; -use garage_core::block_ref_table::*; -use garage_core::garage::Garage; -use garage_core::object_table::*; -use garage_core::version_table::*; +use garage_model::block_ref_table::*; +use garage_model::garage::Garage; +use garage_model::object_table::*; +use garage_model::version_table::*; use garage_table::*; use garage_util::error::Error; diff --git a/src/garage/server.rs b/src/garage/server.rs index 2b618c1a..6caea5eb 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -9,7 +9,7 @@ use garage_util::config::*; use garage_util::error::Error; use garage_api::api_server; -use garage_core::garage::Garage; +use garage_model::garage::Garage; use garage_rpc::rpc_server::RpcServer; use crate::admin_rpc::*; diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml new file mode 100644 index 00000000..be3b1449 --- /dev/null +++ b/src/model/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "garage_model" +version = "0.1.0" +authors = ["Alex Auvolat "] +edition = "2018" + +[lib] +path = "lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +garage_util = { path = "../util" } +garage_rpc = { path = "../rpc" } +garage_table = { path = "../table" } + +bytes = "0.4" +rand = "0.7" +hex = "0.3" +sha2 = "0.8" +arc-swap = "0.4" +log = "0.4" + +sled = "0.31" + +rmp-serde = "0.14.3" +serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } +serde_bytes = "0.11" + +async-trait = "0.1.30" +futures = "0.3" +futures-util = "0.3" +tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } + diff --git a/src/model/block.rs b/src/model/block.rs new file mode 100644 index 00000000..af8b9efb --- /dev/null +++ b/src/model/block.rs @@ -0,0 +1,506 @@ +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; + +use arc_swap::ArcSwapOption; +use futures::future::*; +use futures::select; +use futures::stream::*; +use serde::{Deserialize, Serialize}; +use tokio::fs; +use tokio::prelude::*; +use tokio::sync::{watch, Mutex, Notify}; + +use garage_util::data; +use garage_util::data::*; +use garage_util::error::Error; + +use garage_rpc::membership::System; +use garage_rpc::rpc_client::*; +use garage_rpc::rpc_server::*; + +use garage_table::table_sharded::TableShardedReplication; +use garage_table::TableReplication; + +use crate::block_ref_table::*; + +use crate::garage::Garage; + +pub const INLINE_THRESHOLD: usize = 3072; + +const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42); +const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); +const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); + +#[derive(Debug, Serialize, Deserialize)] +pub enum Message { + Ok, + GetBlock(Hash), + PutBlock(PutBlockMessage), + NeedBlockQuery(Hash), + NeedBlockReply(bool), +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct PutBlockMessage { + pub hash: Hash, + + #[serde(with = "serde_bytes")] + pub data: Vec, +} + +impl RpcMessage for Message {} + +pub struct BlockManager { + pub replication: TableShardedReplication, + pub data_dir: PathBuf, + pub data_dir_lock: Mutex<()>, + + pub rc: sled::Tree, + + pub resync_queue: sled::Tree, + pub resync_notify: Notify, + + pub system: Arc, + rpc_client: Arc>, + pub garage: ArcSwapOption, +} + +impl BlockManager { + pub fn new( + db: &sled::Db, + data_dir: PathBuf, + replication: TableShardedReplication, + system: Arc, + rpc_server: &mut RpcServer, + ) -> Arc { + let rc = db + .open_tree("block_local_rc") + .expect("Unable to open block_local_rc tree"); + rc.set_merge_operator(rc_merge); + + let resync_queue = db + .open_tree("block_local_resync_queue") + .expect("Unable to open block_local_resync_queue tree"); + + let rpc_path = "block_manager"; + let rpc_client = system.rpc_client::(rpc_path); + + let block_manager = Arc::new(Self { + replication, + data_dir, + data_dir_lock: Mutex::new(()), + rc, + resync_queue, + resync_notify: Notify::new(), + system, + rpc_client, + garage: ArcSwapOption::from(None), + }); + block_manager + .clone() + .register_handler(rpc_server, rpc_path.into()); + block_manager + } + + fn register_handler(self: Arc, rpc_server: &mut RpcServer, path: String) { + let self2 = self.clone(); + rpc_server.add_handler::(path, move |msg, _addr| { + let self2 = self2.clone(); + async move { self2.handle(&msg).await } + }); + + let self2 = self.clone(); + self.rpc_client + .set_local_handler(self.system.id, move |msg| { + let self2 = self2.clone(); + async move { self2.handle(&msg).await } + }); + } + + async fn handle(self: Arc, msg: &Message) -> Result { + match msg { + Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await, + Message::GetBlock(h) => self.read_block(h).await, + Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply), + _ => Err(Error::BadRequest(format!("Unexpected RPC message"))), + } + } + + pub async fn spawn_background_worker(self: Arc) { + // Launch 2 simultaneous workers for background resync loop preprocessing + for i in 0..2usize { + let bm2 = self.clone(); + let background = self.system.background.clone(); + tokio::spawn(async move { + tokio::time::delay_for(Duration::from_secs(10)).await; + background + .spawn_worker(format!("block resync worker {}", i), move |must_exit| { + bm2.resync_loop(must_exit) + }) + .await; + }); + } + } + + pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result { + let _lock = self.data_dir_lock.lock().await; + + let mut path = self.block_dir(hash); + fs::create_dir_all(&path).await?; + + path.push(hex::encode(hash)); + if fs::metadata(&path).await.is_ok() { + return Ok(Message::Ok); + } + + let mut f = fs::File::create(path).await?; + f.write_all(data).await?; + drop(f); + + Ok(Message::Ok) + } + + pub async fn read_block(&self, hash: &Hash) -> Result { + let path = self.block_path(hash); + + let mut f = match fs::File::open(&path).await { + Ok(f) => f, + Err(e) => { + // Not found but maybe we should have had it ?? + self.put_to_resync(hash, 0)?; + return Err(Into::into(e)); + } + }; + let mut data = vec![]; + f.read_to_end(&mut data).await?; + drop(f); + + if data::hash(&data[..]) != *hash { + let _lock = self.data_dir_lock.lock().await; + warn!("Block {:?} is corrupted. Deleting and resyncing.", hash); + fs::remove_file(path).await?; + self.put_to_resync(&hash, 0)?; + return Err(Error::CorruptData(*hash)); + } + + Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) + } + + pub async fn need_block(&self, hash: &Hash) -> Result { + let needed = self + .rc + .get(hash.as_ref())? + .map(|x| u64_from_bytes(x.as_ref()) > 0) + .unwrap_or(false); + if needed { + let path = self.block_path(hash); + let exists = fs::metadata(&path).await.is_ok(); + Ok(!exists) + } else { + Ok(false) + } + } + + fn block_dir(&self, hash: &Hash) -> PathBuf { + let mut path = self.data_dir.clone(); + path.push(hex::encode(&hash.as_slice()[0..1])); + path.push(hex::encode(&hash.as_slice()[1..2])); + path + } + fn block_path(&self, hash: &Hash) -> PathBuf { + let mut path = self.block_dir(hash); + path.push(hex::encode(hash.as_ref())); + path + } + + pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { + let old_rc = self.rc.get(&hash)?; + self.rc.merge(&hash, vec![1])?; + if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { + self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?; + } + Ok(()) + } + + pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { + let new_rc = self.rc.merge(&hash, vec![0])?; + if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { + self.put_to_resync(&hash, 0)?; + } + Ok(()) + } + + fn put_to_resync(&self, hash: &Hash, delay_millis: u64) -> Result<(), Error> { + let when = now_msec() + delay_millis; + trace!("Put resync_queue: {} {:?}", when, hash); + let mut key = u64::to_be_bytes(when).to_vec(); + key.extend(hash.as_ref()); + self.resync_queue.insert(key, hash.as_ref())?; + self.resync_notify.notify(); + Ok(()) + } + + async fn resync_loop( + self: Arc, + mut must_exit: watch::Receiver, + ) -> Result<(), Error> { + let mut n_failures = 0usize; + while !*must_exit.borrow() { + if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { + let time_msec = u64_from_bytes(&time_bytes[0..8]); + let now = now_msec(); + if now >= time_msec { + let mut hash = [0u8; 32]; + hash.copy_from_slice(hash_bytes.as_ref()); + let hash = Hash::from(hash); + + if let Err(e) = self.resync_iter(&hash).await { + warn!("Failed to resync block {:?}, retrying later: {}", hash, e); + self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?; + n_failures += 1; + if n_failures >= 10 { + warn!("Too many resync failures, throttling."); + tokio::time::delay_for(Duration::from_secs(1)).await; + } + } else { + n_failures = 0; + } + } else { + self.resync_queue.insert(time_bytes, hash_bytes)?; + let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now)); + select! { + _ = delay.fuse() => (), + _ = self.resync_notify.notified().fuse() => (), + _ = must_exit.recv().fuse() => (), + } + } + } else { + select! { + _ = self.resync_notify.notified().fuse() => (), + _ = must_exit.recv().fuse() => (), + } + } + } + Ok(()) + } + + async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> { + let path = self.block_path(hash); + + let exists = fs::metadata(&path).await.is_ok(); + let needed = self + .rc + .get(hash.as_ref())? + .map(|x| u64_from_bytes(x.as_ref()) > 0) + .unwrap_or(false); + + if exists != needed { + info!( + "Resync block {:?}: exists {}, needed {}", + hash, exists, needed + ); + } + + if exists && !needed { + let garage = self.garage.load_full().unwrap(); + let active_refs = garage + .block_ref_table + .get_range(&hash, None, Some(()), 1) + .await?; + let needed_by_others = !active_refs.is_empty(); + if needed_by_others { + let ring = self.system.ring.borrow().clone(); + let who = self.replication.replication_nodes(&hash, &ring); + let msg = Arc::new(Message::NeedBlockQuery(*hash)); + let who_needs_fut = who.iter().map(|to| { + self.rpc_client + .call_arc(*to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT) + }); + let who_needs = join_all(who_needs_fut).await; + + let mut need_nodes = vec![]; + for (node, needed) in who.into_iter().zip(who_needs.iter()) { + match needed { + Ok(Message::NeedBlockReply(needed)) => { + if *needed { + need_nodes.push(node); + } + } + Err(e) => { + return Err(Error::Message(format!( + "Should delete block, but unable to confirm that all other nodes that need it have it: {}", + e + ))); + } + Ok(_) => { + return Err(Error::Message(format!( + "Unexpected response to NeedBlockQuery RPC" + ))); + } + } + } + + if need_nodes.len() > 0 { + let put_block_message = self.read_block(hash).await?; + self.rpc_client + .try_call_many( + &need_nodes[..], + put_block_message, + RequestStrategy::with_quorum(need_nodes.len()) + .with_timeout(BLOCK_RW_TIMEOUT), + ) + .await?; + } + } + fs::remove_file(path).await?; + self.resync_queue.remove(&hash)?; + } + + if needed && !exists { + // TODO find a way to not do this if they are sending it to us + // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay + // between the RC being incremented and this part being called. + let block_data = self.rpc_get_block(&hash).await?; + self.write_block(hash, &block_data[..]).await?; + } + + Ok(()) + } + + pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { + let who = self.replication.read_nodes(&hash, &self.system); + let resps = self + .rpc_client + .try_call_many( + &who[..], + Message::GetBlock(*hash), + RequestStrategy::with_quorum(1) + .with_timeout(BLOCK_RW_TIMEOUT) + .interrupt_after_quorum(true), + ) + .await?; + + for resp in resps { + if let Message::PutBlock(msg) = resp { + return Ok(msg.data); + } + } + Err(Error::Message(format!( + "Unable to read block {:?}: no valid blocks returned", + hash + ))) + } + + pub async fn rpc_put_block(&self, hash: Hash, data: Vec) -> Result<(), Error> { + let who = self.replication.write_nodes(&hash, &self.system); + self.rpc_client + .try_call_many( + &who[..], + Message::PutBlock(PutBlockMessage { hash, data }), + RequestStrategy::with_quorum(self.replication.write_quorum()) + .with_timeout(BLOCK_RW_TIMEOUT), + ) + .await?; + Ok(()) + } + + pub async fn repair_data_store(&self, must_exit: &watch::Receiver) -> Result<(), Error> { + // 1. Repair blocks from RC table + let garage = self.garage.load_full().unwrap(); + let mut last_hash = None; + let mut i = 0usize; + for entry in garage.block_ref_table.store.iter() { + let (_k, v_bytes) = entry?; + let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(v_bytes.as_ref())?; + if Some(&block_ref.block) == last_hash.as_ref() { + continue; + } + if !block_ref.deleted { + last_hash = Some(block_ref.block); + self.put_to_resync(&block_ref.block, 0)?; + } + i += 1; + if i & 0xFF == 0 && *must_exit.borrow() { + return Ok(()); + } + } + + // 2. Repair blocks actually on disk + let mut ls_data_dir = fs::read_dir(&self.data_dir).await?; + while let Some(data_dir_ent) = ls_data_dir.next().await { + let data_dir_ent = data_dir_ent?; + let dir_name = data_dir_ent.file_name(); + let dir_name = match dir_name.into_string() { + Ok(x) => x, + Err(_) => continue, + }; + if dir_name.len() != 2 || hex::decode(&dir_name).is_err() { + continue; + } + + let mut ls_data_dir_2 = match fs::read_dir(data_dir_ent.path()).await { + Err(e) => { + warn!( + "Warning: could not list dir {:?}: {}", + data_dir_ent.path().to_str(), + e + ); + continue; + } + Ok(x) => x, + }; + while let Some(file) = ls_data_dir_2.next().await { + let file = file?; + let file_name = file.file_name(); + let file_name = match file_name.into_string() { + Ok(x) => x, + Err(_) => continue, + }; + if file_name.len() != 64 { + continue; + } + let hash_bytes = match hex::decode(&file_name) { + Ok(h) => h, + Err(_) => continue, + }; + let mut hash = [0u8; 32]; + hash.copy_from_slice(&hash_bytes[..]); + self.put_to_resync(&hash.into(), 0)?; + + if *must_exit.borrow() { + return Ok(()); + } + } + } + Ok(()) + } +} + +fn u64_from_bytes(bytes: &[u8]) -> u64 { + assert!(bytes.len() == 8); + let mut x8 = [0u8; 8]; + x8.copy_from_slice(bytes); + u64::from_be_bytes(x8) +} + +fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option> { + let old = old.map(u64_from_bytes).unwrap_or(0); + assert!(new.len() == 1); + let new = match new[0] { + 0 => { + if old > 0 { + old - 1 + } else { + 0 + } + } + 1 => old + 1, + _ => unreachable!(), + }; + if new == 0 { + None + } else { + Some(u64::to_be_bytes(new).to_vec()) + } +} diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs new file mode 100644 index 00000000..a00438c0 --- /dev/null +++ b/src/model/block_ref_table.rs @@ -0,0 +1,68 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +use garage_util::background::*; +use garage_util::data::*; +use garage_util::error::Error; + +use garage_table::*; + +use crate::block::*; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct BlockRef { + // Primary key + pub block: Hash, + + // Sort key + pub version: UUID, + + // Keep track of deleted status + pub deleted: bool, +} + +impl Entry for BlockRef { + fn partition_key(&self) -> &Hash { + &self.block + } + fn sort_key(&self) -> &UUID { + &self.version + } + + fn merge(&mut self, other: &Self) { + if other.deleted { + self.deleted = true; + } + } +} + +pub struct BlockRefTable { + pub background: Arc, + pub block_manager: Arc, +} + +#[async_trait] +impl TableSchema for BlockRefTable { + type P = Hash; + type S = UUID; + type E = BlockRef; + type Filter = (); + + async fn updated(&self, old: Option, new: Option) -> Result<(), Error> { + let block = &old.as_ref().or(new.as_ref()).unwrap().block; + let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false); + let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false); + if is_after && !was_before { + self.block_manager.block_incref(block)?; + } + if was_before && !is_after { + self.block_manager.block_decref(block)?; + } + Ok(()) + } + + fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { + !entry.deleted + } +} diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs new file mode 100644 index 00000000..28234d82 --- /dev/null +++ b/src/model/bucket_table.rs @@ -0,0 +1,121 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +use garage_table::*; +use garage_util::error::Error; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Bucket { + // Primary key + pub name: String, + + // Timestamp and deletion + // Upon version increment, all info is replaced + pub timestamp: u64, + pub deleted: bool, + + // Authorized keys + authorized_keys: Vec, +} + +impl Bucket { + pub fn new( + name: String, + timestamp: u64, + deleted: bool, + authorized_keys: Vec, + ) -> Self { + let mut ret = Bucket { + name, + timestamp, + deleted, + authorized_keys: vec![], + }; + for key in authorized_keys { + ret.add_key(key) + .expect("Duplicate AllowedKey in Bucket constructor"); + } + ret + } + /// Add a key only if it is not already present + pub fn add_key(&mut self, key: AllowedKey) -> Result<(), ()> { + match self + .authorized_keys + .binary_search_by(|k| k.key_id.cmp(&key.key_id)) + { + Err(i) => { + self.authorized_keys.insert(i, key); + Ok(()) + } + Ok(_) => Err(()), + } + } + pub fn authorized_keys(&self) -> &[AllowedKey] { + &self.authorized_keys[..] + } + pub fn clear_keys(&mut self) { + self.authorized_keys.clear(); + } +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct AllowedKey { + pub key_id: String, + pub timestamp: u64, + pub allow_read: bool, + pub allow_write: bool, +} + +impl Entry for Bucket { + fn partition_key(&self) -> &EmptyKey { + &EmptyKey + } + fn sort_key(&self) -> &String { + &self.name + } + + fn merge(&mut self, other: &Self) { + if other.timestamp < self.timestamp { + *self = other.clone(); + return; + } + if self.timestamp > other.timestamp || self.deleted { + return; + } + + for ak in other.authorized_keys.iter() { + match self + .authorized_keys + .binary_search_by(|our_ak| our_ak.key_id.cmp(&ak.key_id)) + { + Ok(i) => { + let our_ak = &mut self.authorized_keys[i]; + if ak.timestamp > our_ak.timestamp { + *our_ak = ak.clone(); + } + } + Err(i) => { + self.authorized_keys.insert(i, ak.clone()); + } + } + } + } +} + +pub struct BucketTable; + +#[async_trait] +impl TableSchema for BucketTable { + type P = EmptyKey; + type S = String; + type E = Bucket; + type Filter = (); + + async fn updated(&self, _old: Option, _new: Option) -> Result<(), Error> { + Ok(()) + } + + fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { + !entry.deleted + } +} diff --git a/src/model/garage.rs b/src/model/garage.rs new file mode 100644 index 00000000..46e0d02f --- /dev/null +++ b/src/model/garage.rs @@ -0,0 +1,166 @@ +use std::sync::Arc; + +use garage_util::background::*; +use garage_util::config::*; + +use garage_rpc::membership::System; +use garage_rpc::rpc_client::RpcHttpClient; +use garage_rpc::rpc_server::RpcServer; + +use garage_table::table_fullcopy::*; +use garage_table::table_sharded::*; +use garage_table::*; + +use crate::block::*; +use crate::block_ref_table::*; +use crate::bucket_table::*; +use crate::key_table::*; +use crate::object_table::*; +use crate::version_table::*; + +pub struct Garage { + pub config: Config, + + pub db: sled::Db, + pub background: Arc, + pub system: Arc, + pub block_manager: Arc, + + pub bucket_table: Arc>, + pub key_table: Arc>, + + pub object_table: Arc>, + pub version_table: Arc>, + pub block_ref_table: Arc>, +} + +impl Garage { + pub async fn new( + config: Config, + db: sled::Db, + background: Arc, + rpc_server: &mut RpcServer, + ) -> Arc { + info!("Initialize membership management system..."); + let rpc_http_client = Arc::new( + RpcHttpClient::new(config.max_concurrent_rpc_requests, &config.rpc_tls) + .expect("Could not create RPC client"), + ); + let system = System::new( + config.metadata_dir.clone(), + rpc_http_client, + background.clone(), + rpc_server, + ); + + let data_rep_param = TableShardedReplication { + replication_factor: config.data_replication_factor, + write_quorum: (config.data_replication_factor + 1) / 2, + read_quorum: 1, + }; + + let meta_rep_param = TableShardedReplication { + replication_factor: config.meta_replication_factor, + write_quorum: (config.meta_replication_factor + 1) / 2, + read_quorum: (config.meta_replication_factor + 1) / 2, + }; + + let control_rep_param = TableFullReplication::new( + config.meta_epidemic_fanout, + (config.meta_epidemic_fanout + 1) / 2, + ); + + info!("Initialize block manager..."); + let block_manager = BlockManager::new( + &db, + config.data_dir.clone(), + data_rep_param.clone(), + system.clone(), + rpc_server, + ); + + info!("Initialize block_ref_table..."); + let block_ref_table = Table::new( + BlockRefTable { + background: background.clone(), + block_manager: block_manager.clone(), + }, + data_rep_param.clone(), + system.clone(), + &db, + "block_ref".to_string(), + rpc_server, + ) + .await; + + info!("Initialize version_table..."); + let version_table = Table::new( + VersionTable { + background: background.clone(), + block_ref_table: block_ref_table.clone(), + }, + meta_rep_param.clone(), + system.clone(), + &db, + "version".to_string(), + rpc_server, + ) + .await; + + info!("Initialize object_table..."); + let object_table = Table::new( + ObjectTable { + background: background.clone(), + version_table: version_table.clone(), + }, + meta_rep_param.clone(), + system.clone(), + &db, + "object".to_string(), + rpc_server, + ) + .await; + + info!("Initialize bucket_table..."); + let bucket_table = Table::new( + BucketTable, + control_rep_param.clone(), + system.clone(), + &db, + "bucket".to_string(), + rpc_server, + ) + .await; + + info!("Initialize key_table_table..."); + let key_table = Table::new( + KeyTable, + control_rep_param.clone(), + system.clone(), + &db, + "key".to_string(), + rpc_server, + ) + .await; + + info!("Initialize Garage..."); + let garage = Arc::new(Self { + config, + db, + system: system.clone(), + block_manager, + background, + bucket_table, + key_table, + object_table, + version_table, + block_ref_table, + }); + + info!("Start block manager background thread..."); + garage.block_manager.garage.swap(Some(garage.clone())); + garage.block_manager.clone().spawn_background_worker().await; + + garage + } +} diff --git a/src/model/key_table.rs b/src/model/key_table.rs new file mode 100644 index 00000000..76d163b5 --- /dev/null +++ b/src/model/key_table.rs @@ -0,0 +1,154 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +use garage_table::*; +use garage_util::data::*; +use garage_util::error::Error; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Key { + // Primary key + pub key_id: String, + + // Associated secret key (immutable) + pub secret_key: String, + + // Name + pub name: String, + pub name_timestamp: u64, + + // Deletion + pub deleted: bool, + + // Authorized keys + authorized_buckets: Vec, +} + +impl Key { + pub fn new(name: String, buckets: Vec) -> Self { + let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..])); + let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]); + let mut ret = Self { + key_id, + secret_key, + name, + name_timestamp: now_msec(), + deleted: false, + authorized_buckets: vec![], + }; + for b in buckets { + ret.add_bucket(b) + .expect("Duplicate AllowedBucket in Key constructor"); + } + ret + } + pub fn delete(key_id: String) -> Self { + Self { + key_id, + secret_key: "".into(), + name: "".into(), + name_timestamp: now_msec(), + deleted: true, + authorized_buckets: vec![], + } + } + /// Add an authorized bucket, only if it wasn't there before + pub fn add_bucket(&mut self, new: AllowedBucket) -> Result<(), ()> { + match self + .authorized_buckets + .binary_search_by(|b| b.bucket.cmp(&new.bucket)) + { + Err(i) => { + self.authorized_buckets.insert(i, new); + Ok(()) + } + Ok(_) => Err(()), + } + } + pub fn authorized_buckets(&self) -> &[AllowedBucket] { + &self.authorized_buckets[..] + } + pub fn clear_buckets(&mut self) { + self.authorized_buckets.clear(); + } + pub fn allow_read(&self, bucket: &str) -> bool { + self.authorized_buckets + .iter() + .find(|x| x.bucket.as_str() == bucket) + .map(|x| x.allow_read) + .unwrap_or(false) + } + pub fn allow_write(&self, bucket: &str) -> bool { + self.authorized_buckets + .iter() + .find(|x| x.bucket.as_str() == bucket) + .map(|x| x.allow_write) + .unwrap_or(false) + } +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct AllowedBucket { + pub bucket: String, + pub timestamp: u64, + pub allow_read: bool, + pub allow_write: bool, +} + +impl Entry for Key { + fn partition_key(&self) -> &EmptyKey { + &EmptyKey + } + fn sort_key(&self) -> &String { + &self.key_id + } + + fn merge(&mut self, other: &Self) { + if other.deleted { + self.deleted = true; + } + if self.deleted { + self.authorized_buckets.clear(); + return; + } + if other.name_timestamp > self.name_timestamp { + self.name_timestamp = other.name_timestamp; + self.name = other.name.clone(); + } + + for ab in other.authorized_buckets.iter() { + match self + .authorized_buckets + .binary_search_by(|our_ab| our_ab.bucket.cmp(&ab.bucket)) + { + Ok(i) => { + let our_ab = &mut self.authorized_buckets[i]; + if ab.timestamp > our_ab.timestamp { + *our_ab = ab.clone(); + } + } + Err(i) => { + self.authorized_buckets.insert(i, ab.clone()); + } + } + } + } +} + +pub struct KeyTable; + +#[async_trait] +impl TableSchema for KeyTable { + type P = EmptyKey; + type S = String; + type E = Key; + type Filter = (); + + async fn updated(&self, _old: Option, _new: Option) -> Result<(), Error> { + Ok(()) + } + + fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { + !entry.deleted + } +} diff --git a/src/model/lib.rs b/src/model/lib.rs new file mode 100644 index 00000000..b4a8ddb7 --- /dev/null +++ b/src/model/lib.rs @@ -0,0 +1,10 @@ +#[macro_use] +extern crate log; + +pub mod block; +pub mod block_ref_table; +pub mod bucket_table; +pub mod garage; +pub mod key_table; +pub mod object_table; +pub mod version_table; diff --git a/src/model/object_table.rs b/src/model/object_table.rs new file mode 100644 index 00000000..01df70e6 --- /dev/null +++ b/src/model/object_table.rs @@ -0,0 +1,198 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +use garage_util::background::BackgroundRunner; +use garage_util::data::*; +use garage_util::error::Error; + +use garage_table::table_sharded::*; +use garage_table::*; + +use crate::version_table::*; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Object { + // Primary key + pub bucket: String, + + // Sort key + pub key: String, + + // Data + versions: Vec, +} + +impl Object { + pub fn new(bucket: String, key: String, versions: Vec) -> Self { + let mut ret = Self { + bucket, + key, + versions: vec![], + }; + for v in versions { + ret.add_version(v) + .expect("Twice the same ObjectVersion in Object constructor"); + } + ret + } + /// Adds a version if it wasn't already present + pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> { + match self + .versions + .binary_search_by(|v| v.cmp_key().cmp(&new.cmp_key())) + { + Err(i) => { + self.versions.insert(i, new); + Ok(()) + } + Ok(_) => Err(()), + } + } + pub fn versions(&self) -> &[ObjectVersion] { + &self.versions[..] + } +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersion { + pub uuid: UUID, + pub timestamp: u64, + + pub mime_type: String, + pub size: u64, + pub state: ObjectVersionState, + + pub data: ObjectVersionData, +} + +#[derive(PartialEq, Clone, Copy, Debug, Serialize, Deserialize)] +pub enum ObjectVersionState { + Uploading, + Complete, + Aborted, +} + +impl ObjectVersionState { + fn max(self, other: Self) -> Self { + use ObjectVersionState::*; + if self == Aborted || other == Aborted { + Aborted + } else if self == Complete || other == Complete { + Complete + } else { + Uploading + } + } +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum ObjectVersionData { + Uploading, + DeleteMarker, + Inline(#[serde(with = "serde_bytes")] Vec), + FirstBlock(Hash), +} + +impl ObjectVersion { + fn cmp_key(&self) -> (u64, UUID) { + (self.timestamp, self.uuid) + } + pub fn is_complete(&self) -> bool { + self.state == ObjectVersionState::Complete + } + pub fn is_data(&self) -> bool { + self.state == ObjectVersionState::Complete && self.data != ObjectVersionData::DeleteMarker + } +} + +impl Entry for Object { + fn partition_key(&self) -> &String { + &self.bucket + } + fn sort_key(&self) -> &String { + &self.key + } + + fn merge(&mut self, other: &Self) { + for other_v in other.versions.iter() { + match self + .versions + .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) + { + Ok(i) => { + let mut v = &mut self.versions[i]; + if other_v.size > v.size { + v.size = other_v.size; + } + v.state = v.state.max(other_v.state); + if v.data == ObjectVersionData::Uploading { + v.data = other_v.data.clone(); + } + } + Err(i) => { + self.versions.insert(i, other_v.clone()); + } + } + } + let last_complete = self + .versions + .iter() + .enumerate() + .rev() + .filter(|(_, v)| v.is_complete()) + .next() + .map(|(vi, _)| vi); + + if let Some(last_vi) = last_complete { + self.versions = self.versions.drain(last_vi..).collect::>(); + } + } +} + +pub struct ObjectTable { + pub background: Arc, + pub version_table: Arc>, +} + +#[async_trait] +impl TableSchema for ObjectTable { + type P = String; + type S = String; + type E = Object; + type Filter = (); + + async fn updated(&self, old: Option, new: Option) -> Result<(), Error> { + let version_table = self.version_table.clone(); + if let (Some(old_v), Some(new_v)) = (old, new) { + // Propagate deletion of old versions + for v in old_v.versions.iter() { + let newly_deleted = match new_v + .versions + .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) + { + Err(_) => true, + Ok(i) => { + new_v.versions[i].state == ObjectVersionState::Aborted + && v.state != ObjectVersionState::Aborted + } + }; + if newly_deleted { + let deleted_version = Version::new( + v.uuid, + old_v.bucket.clone(), + old_v.key.clone(), + true, + vec![], + ); + version_table.insert(&deleted_version).await?; + } + } + } + Ok(()) + } + + fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { + entry.versions.iter().any(|v| v.is_data()) + } +} diff --git a/src/model/version_table.rs b/src/model/version_table.rs new file mode 100644 index 00000000..6054e389 --- /dev/null +++ b/src/model/version_table.rs @@ -0,0 +1,145 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +use garage_util::background::BackgroundRunner; +use garage_util::data::*; +use garage_util::error::Error; + +use garage_table::table_sharded::*; +use garage_table::*; + +use crate::block_ref_table::*; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Version { + // Primary key + pub uuid: UUID, + + // Actual data: the blocks for this version + pub deleted: bool, + blocks: Vec, + + // Back link to bucket+key so that we can figure if + // this was deleted later on + pub bucket: String, + pub key: String, +} + +impl Version { + pub fn new( + uuid: UUID, + bucket: String, + key: String, + deleted: bool, + blocks: Vec, + ) -> Self { + let mut ret = Self { + uuid, + deleted, + blocks: vec![], + bucket, + key, + }; + for b in blocks { + ret.add_block(b) + .expect("Twice the same VersionBlock in Version constructor"); + } + ret + } + /// Adds a block if it wasn't already present + pub fn add_block(&mut self, new: VersionBlock) -> Result<(), ()> { + match self + .blocks + .binary_search_by(|b| b.cmp_key().cmp(&new.cmp_key())) + { + Err(i) => { + self.blocks.insert(i, new); + Ok(()) + } + Ok(_) => Err(()), + } + } + pub fn blocks(&self) -> &[VersionBlock] { + &self.blocks[..] + } +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct VersionBlock { + pub part_number: u64, + pub offset: u64, + pub hash: Hash, + pub size: u64, +} + +impl VersionBlock { + fn cmp_key(&self) -> (u64, u64) { + (self.part_number, self.offset) + } +} + +impl Entry for Version { + fn partition_key(&self) -> &Hash { + &self.uuid + } + fn sort_key(&self) -> &EmptyKey { + &EmptyKey + } + + fn merge(&mut self, other: &Self) { + if other.deleted { + self.deleted = true; + self.blocks.clear(); + } else if !self.deleted { + for bi in other.blocks.iter() { + match self + .blocks + .binary_search_by(|x| x.cmp_key().cmp(&bi.cmp_key())) + { + Ok(_) => (), + Err(pos) => { + self.blocks.insert(pos, bi.clone()); + } + } + } + } + } +} + +pub struct VersionTable { + pub background: Arc, + pub block_ref_table: Arc>, +} + +#[async_trait] +impl TableSchema for VersionTable { + type P = Hash; + type S = EmptyKey; + type E = Version; + type Filter = (); + + async fn updated(&self, old: Option, new: Option) -> Result<(), Error> { + let block_ref_table = self.block_ref_table.clone(); + if let (Some(old_v), Some(new_v)) = (old, new) { + // Propagate deletion of version blocks + if new_v.deleted && !old_v.deleted { + let deleted_block_refs = old_v + .blocks + .iter() + .map(|vb| BlockRef { + block: vb.hash, + version: old_v.uuid, + deleted: true, + }) + .collect::>(); + block_ref_table.insert_many(&deleted_block_refs[..]).await?; + } + } + Ok(()) + } + + fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { + !entry.deleted + } +} -- cgit v1.2.3