From d8f5e643bcee95969b59c309809710a38b0661e3 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 24 Apr 2020 10:10:01 +0000 Subject: Split code for modular compilation --- src/admin_rpc.rs | 358 ---------------------------- src/api/Cargo.toml | 28 +++ src/api/api_server.rs | 18 +- src/api/http_util.rs | 2 +- src/api/lib.rs | 5 + src/api/mod.rs | 2 - src/background.rs | 124 ---------- src/config.rs | 66 ------ src/core/Cargo.toml | 34 +++ src/core/block.rs | 506 ++++++++++++++++++++++++++++++++++++++++ src/core/block_ref_table.rs | 68 ++++++ src/core/bucket_table.rs | 121 ++++++++++ src/core/garage.rs | 166 +++++++++++++ src/core/key_table.rs | 154 ++++++++++++ src/core/lib.rs | 10 + src/core/object_table.rs | 165 +++++++++++++ src/core/version_table.rs | 131 +++++++++++ src/data.rs | 124 ---------- src/error.rs | 95 -------- src/garage/Cargo.toml | 36 +++ src/garage/admin_rpc.rs | 358 ++++++++++++++++++++++++++++ src/garage/main.rs | 531 ++++++++++++++++++++++++++++++++++++++++++ src/garage/repair.rs | 183 +++++++++++++++ src/garage/server.rs | 87 +++++++ src/main.rs | 540 ------------------------------------------- src/rpc/Cargo.toml | 37 +++ src/rpc/lib.rs | 7 + src/rpc/membership.rs | 10 +- src/rpc/mod.rs | 4 - src/rpc/rpc_client.rs | 34 +-- src/rpc/rpc_server.rs | 8 +- src/rpc/tls_util.rs | 2 +- src/server.rs | 247 -------------------- src/store/block.rs | 506 ---------------------------------------- src/store/block_ref_table.rs | 68 ------ src/store/bucket_table.rs | 121 ---------- src/store/key_table.rs | 154 ------------ src/store/mod.rs | 7 - src/store/object_table.rs | 165 ------------- src/store/repair.rs | 184 --------------- src/store/version_table.rs | 131 ----------- src/table/Cargo.toml | 32 +++ src/table/lib.rs | 11 + src/table/mod.rs | 6 - src/table/table.rs | 20 +- src/table/table_fullcopy.rs | 7 +- src/table/table_sharded.rs | 7 +- src/table/table_sync.rs | 9 +- src/util/Cargo.toml | 35 +++ src/util/background.rs | 124 ++++++++++ src/util/config.rs | 66 ++++++ src/util/data.rs | 124 ++++++++++ src/util/error.rs | 112 +++++++++ src/util/lib.rs | 7 + 54 files changed, 3188 insertions(+), 2969 deletions(-) delete mode 100644 src/admin_rpc.rs create mode 100644 src/api/Cargo.toml create mode 100644 src/api/lib.rs delete mode 100644 src/api/mod.rs delete mode 100644 src/background.rs delete mode 100644 src/config.rs create mode 100644 src/core/Cargo.toml create mode 100644 src/core/block.rs create mode 100644 src/core/block_ref_table.rs create mode 100644 src/core/bucket_table.rs create mode 100644 src/core/garage.rs create mode 100644 src/core/key_table.rs create mode 100644 src/core/lib.rs create mode 100644 src/core/object_table.rs create mode 100644 src/core/version_table.rs delete mode 100644 src/data.rs delete mode 100644 src/error.rs create mode 100644 src/garage/Cargo.toml create mode 100644 src/garage/admin_rpc.rs create mode 100644 src/garage/main.rs create mode 100644 src/garage/repair.rs create mode 100644 src/garage/server.rs delete mode 100644 src/main.rs create mode 100644 src/rpc/Cargo.toml create mode 100644 src/rpc/lib.rs delete mode 100644 src/rpc/mod.rs delete mode 100644 src/server.rs delete mode 100644 src/store/block.rs delete mode 100644 src/store/block_ref_table.rs delete mode 100644 src/store/bucket_table.rs delete mode 100644 src/store/key_table.rs delete mode 100644 src/store/mod.rs delete mode 100644 src/store/object_table.rs delete mode 100644 src/store/repair.rs delete mode 100644 src/store/version_table.rs create mode 100644 src/table/Cargo.toml create mode 100644 src/table/lib.rs delete mode 100644 src/table/mod.rs create mode 100644 src/util/Cargo.toml create mode 100644 src/util/background.rs create mode 100644 src/util/config.rs create mode 100644 src/util/data.rs create mode 100644 src/util/error.rs create mode 100644 src/util/lib.rs (limited to 'src') diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs deleted file mode 100644 index 4e5c56df..00000000 --- a/src/admin_rpc.rs +++ /dev/null @@ -1,358 +0,0 @@ -use std::sync::Arc; - -use serde::{Deserialize, Serialize}; - -use crate::data::*; -use crate::error::Error; -use crate::server::Garage; - -use crate::table::*; - -use crate::rpc::rpc_client::*; -use crate::rpc::rpc_server::*; - -use crate::store::bucket_table::*; -use crate::store::key_table::*; -use crate::store::repair::Repair; - -use crate::*; - -pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30); -pub const ADMIN_RPC_PATH: &str = "_admin"; - -#[derive(Debug, Serialize, Deserialize)] -pub enum AdminRPC { - BucketOperation(BucketOperation), - KeyOperation(KeyOperation), - LaunchRepair(RepairOpt), - - // Replies - Ok(String), - BucketList(Vec), - BucketInfo(Bucket), - KeyList(Vec<(String, String)>), - KeyInfo(Key), -} - -impl RpcMessage for AdminRPC {} - -pub struct AdminRpcHandler { - garage: Arc, - rpc_client: Arc>, -} - -impl AdminRpcHandler { - pub fn new(garage: Arc) -> Arc { - let rpc_client = garage.system.clone().rpc_client::(ADMIN_RPC_PATH); - Arc::new(Self { garage, rpc_client }) - } - - pub fn register_handler(self: Arc, rpc_server: &mut RpcServer) { - rpc_server.add_handler::(ADMIN_RPC_PATH.to_string(), move |msg, _addr| { - let self2 = self.clone(); - async move { - match msg { - AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, - AdminRPC::KeyOperation(ko) => self2.handle_key_cmd(ko).await, - AdminRPC::LaunchRepair(opt) => self2.handle_launch_repair(opt).await, - _ => Err(Error::BadRequest(format!("Invalid RPC"))), - } - } - }); - } - - async fn handle_bucket_cmd(&self, cmd: BucketOperation) -> Result { - match cmd { - BucketOperation::List => { - let bucket_names = self - .garage - .bucket_table - .get_range(&EmptyKey, None, Some(()), 10000) - .await? - .iter() - .map(|b| b.name.to_string()) - .collect::>(); - Ok(AdminRPC::BucketList(bucket_names)) - } - BucketOperation::Info(query) => { - let bucket = self.get_existing_bucket(&query.name).await?; - Ok(AdminRPC::BucketInfo(bucket)) - } - BucketOperation::Create(query) => { - let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?; - if bucket.as_ref().filter(|b| !b.deleted).is_some() { - return Err(Error::BadRequest(format!( - "Bucket {} already exists", - query.name - ))); - } - let new_time = match bucket { - Some(b) => std::cmp::max(b.timestamp + 1, now_msec()), - None => now_msec(), - }; - self.garage - .bucket_table - .insert(&Bucket::new(query.name.clone(), new_time, false, vec![])) - .await?; - Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name))) - } - BucketOperation::Delete(query) => { - let bucket = self.get_existing_bucket(&query.name).await?; - let objects = self - .garage - .object_table - .get_range(&query.name, None, Some(()), 10) - .await?; - if !objects.is_empty() { - return Err(Error::BadRequest(format!( - "Bucket {} is not empty", - query.name - ))); - } - if !query.yes { - return Err(Error::BadRequest(format!( - "Add --yes flag to really perform this operation" - ))); - } - // --- done checking, now commit --- - for ak in bucket.authorized_keys() { - if let Some(key) = self.garage.key_table.get(&EmptyKey, &ak.key_id).await? { - if !key.deleted { - self.update_key_bucket(key, &bucket.name, false, false) - .await?; - } - } else { - return Err(Error::Message(format!("Key not found: {}", ak.key_id))); - } - } - self.garage - .bucket_table - .insert(&Bucket::new( - query.name.clone(), - std::cmp::max(bucket.timestamp + 1, now_msec()), - true, - vec![], - )) - .await?; - Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name))) - } - BucketOperation::Allow(query) => { - let key = self.get_existing_key(&query.key_id).await?; - let bucket = self.get_existing_bucket(&query.bucket).await?; - let allow_read = query.read || key.allow_read(&query.bucket); - let allow_write = query.write || key.allow_write(&query.bucket); - self.update_key_bucket(key, &query.bucket, allow_read, allow_write) - .await?; - self.update_bucket_key(bucket, &query.key_id, allow_read, allow_write) - .await?; - Ok(AdminRPC::Ok(format!( - "New permissions for {} on {}: read {}, write {}.", - &query.key_id, &query.bucket, allow_read, allow_write - ))) - } - BucketOperation::Deny(query) => { - let key = self.get_existing_key(&query.key_id).await?; - let bucket = self.get_existing_bucket(&query.bucket).await?; - let allow_read = !query.read && key.allow_read(&query.bucket); - let allow_write = !query.write && key.allow_write(&query.bucket); - self.update_key_bucket(key, &query.bucket, allow_read, allow_write) - .await?; - self.update_bucket_key(bucket, &query.key_id, allow_read, allow_write) - .await?; - Ok(AdminRPC::Ok(format!( - "New permissions for {} on {}: read {}, write {}.", - &query.key_id, &query.bucket, allow_read, allow_write - ))) - } - } - } - - async fn handle_key_cmd(&self, cmd: KeyOperation) -> Result { - match cmd { - KeyOperation::List => { - let key_ids = self - .garage - .key_table - .get_range(&EmptyKey, None, Some(()), 10000) - .await? - .iter() - .map(|k| (k.key_id.to_string(), k.name.to_string())) - .collect::>(); - Ok(AdminRPC::KeyList(key_ids)) - } - KeyOperation::Info(query) => { - let key = self.get_existing_key(&query.key_id).await?; - Ok(AdminRPC::KeyInfo(key)) - } - KeyOperation::New(query) => { - let key = Key::new(query.name, vec![]); - self.garage.key_table.insert(&key).await?; - Ok(AdminRPC::KeyInfo(key)) - } - KeyOperation::Rename(query) => { - let mut key = self.get_existing_key(&query.key_id).await?; - key.name_timestamp = std::cmp::max(key.name_timestamp + 1, now_msec()); - key.name = query.new_name; - self.garage.key_table.insert(&key).await?; - Ok(AdminRPC::KeyInfo(key)) - } - KeyOperation::Delete(query) => { - let key = self.get_existing_key(&query.key_id).await?; - if !query.yes { - return Err(Error::BadRequest(format!( - "Add --yes flag to really perform this operation" - ))); - } - // --- done checking, now commit --- - for ab in key.authorized_buckets().iter() { - if let Some(bucket) = - self.garage.bucket_table.get(&EmptyKey, &ab.bucket).await? - { - if !bucket.deleted { - self.update_bucket_key(bucket, &key.key_id, false, false) - .await?; - } - } else { - return Err(Error::Message(format!("Bucket not found: {}", ab.bucket))); - } - } - let del_key = Key::delete(key.key_id); - self.garage.key_table.insert(&del_key).await?; - Ok(AdminRPC::Ok(format!( - "Key {} was deleted successfully.", - query.key_id - ))) - } - } - } - - async fn get_existing_bucket(&self, bucket: &String) -> Result { - self.garage - .bucket_table - .get(&EmptyKey, bucket) - .await? - .filter(|b| !b.deleted) - .map(Ok) - .unwrap_or(Err(Error::BadRequest(format!( - "Bucket {} does not exist", - bucket - )))) - } - - async fn get_existing_key(&self, id: &String) -> Result { - self.garage - .key_table - .get(&EmptyKey, id) - .await? - .filter(|k| !k.deleted) - .map(Ok) - .unwrap_or(Err(Error::BadRequest(format!("Key {} does not exist", id)))) - } - - async fn update_bucket_key( - &self, - mut bucket: Bucket, - key_id: &String, - allow_read: bool, - allow_write: bool, - ) -> Result<(), Error> { - let timestamp = match bucket - .authorized_keys() - .iter() - .find(|x| x.key_id == *key_id) - { - None => now_msec(), - Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()), - }; - bucket.clear_keys(); - bucket - .add_key(AllowedKey { - key_id: key_id.clone(), - timestamp, - allow_read, - allow_write, - }) - .unwrap(); - self.garage.bucket_table.insert(&bucket).await?; - Ok(()) - } - - async fn update_key_bucket( - &self, - mut key: Key, - bucket: &String, - allow_read: bool, - allow_write: bool, - ) -> Result<(), Error> { - let timestamp = match key - .authorized_buckets() - .iter() - .find(|x| x.bucket == *bucket) - { - None => now_msec(), - Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()), - }; - key.clear_buckets(); - key.add_bucket(AllowedBucket { - bucket: bucket.clone(), - timestamp, - allow_read, - allow_write, - }) - .unwrap(); - self.garage.key_table.insert(&key).await?; - Ok(()) - } - - async fn handle_launch_repair(self: &Arc, opt: RepairOpt) -> Result { - if !opt.yes { - return Err(Error::BadRequest(format!( - "Please provide the --yes flag to initiate repair operations." - ))); - } - if opt.all_nodes { - let mut opt_to_send = opt.clone(); - opt_to_send.all_nodes = false; - - let mut failures = vec![]; - let ring = self.garage.system.ring.borrow().clone(); - for node in ring.config.members.keys() { - if self - .rpc_client - .call( - *node, - AdminRPC::LaunchRepair(opt_to_send.clone()), - ADMIN_RPC_TIMEOUT, - ) - .await - .is_err() - { - failures.push(node.clone()); - } - } - if failures.is_empty() { - Ok(AdminRPC::Ok(format!("Repair launched on all nodes"))) - } else { - Err(Error::Message(format!( - "Could not launch repair on nodes: {:?} (launched successfully on other nodes)", - failures - ))) - } - } else { - let repair = Repair { - garage: self.garage.clone(), - }; - self.garage - .system - .background - .spawn_worker("Repair worker".into(), move |must_exit| async move { - repair.repair_worker(opt, must_exit).await - }) - .await; - Ok(AdminRPC::Ok(format!( - "Repair launched on {:?}", - self.garage.system.id - ))) - } - } -} diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml new file mode 100644 index 00000000..3606c1df --- /dev/null +++ b/src/api/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "garage_api" +version = "0.1.0" +authors = ["Alex Auvolat "] +edition = "2018" + +[lib] +path = "lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +garage_util = { path = "../util" } +garage_table = { path = "../table" } +garage_core = { path = "../core" } + +bytes = "0.4" +hex = "0.3" +log = "0.4" + +futures = "0.3" +futures-util = "0.3" +tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } + +http = "0.2" +hyper = "0.13" + + diff --git a/src/api/api_server.rs b/src/api/api_server.rs index 905ba0dd..de79ffc2 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -9,18 +9,18 @@ use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; -use crate::data::*; -use crate::error::Error; -use crate::server::Garage; +use garage_util::data::*; +use garage_util::error::Error; -use crate::table::EmptyKey; +use garage_table::EmptyKey; -use crate::store::block::INLINE_THRESHOLD; -use crate::store::block_ref_table::*; -use crate::store::object_table::*; -use crate::store::version_table::*; +use garage_core::block::INLINE_THRESHOLD; +use garage_core::block_ref_table::*; +use garage_core::garage::Garage; +use garage_core::object_table::*; +use garage_core::version_table::*; -use crate::api::http_util::*; +use crate::http_util::*; type BodyType = Box + Send + Unpin>; diff --git a/src/api/http_util.rs b/src/api/http_util.rs index 228448f0..e7e74409 100644 --- a/src/api/http_util.rs +++ b/src/api/http_util.rs @@ -5,7 +5,7 @@ use futures::ready; use futures::stream::*; use hyper::body::{Bytes, HttpBody}; -use crate::error::Error; +use garage_util::error::Error; type StreamType = Pin> + Send>>; diff --git a/src/api/lib.rs b/src/api/lib.rs new file mode 100644 index 00000000..b313d45d --- /dev/null +++ b/src/api/lib.rs @@ -0,0 +1,5 @@ +#[macro_use] +extern crate log; + +pub mod api_server; +pub mod http_util; diff --git a/src/api/mod.rs b/src/api/mod.rs deleted file mode 100644 index 8e62d1e7..00000000 --- a/src/api/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod api_server; -pub mod http_util; diff --git a/src/background.rs b/src/background.rs deleted file mode 100644 index 937062dd..00000000 --- a/src/background.rs +++ /dev/null @@ -1,124 +0,0 @@ -use core::future::Future; -use std::pin::Pin; - -use futures::future::join_all; -use futures::select; -use futures_util::future::*; -use std::sync::Arc; -use tokio::sync::Mutex; -use tokio::sync::{mpsc, watch, Notify}; - -use crate::error::Error; - -type JobOutput = Result<(), Error>; -type Job = Pin + Send>>; - -pub struct BackgroundRunner { - n_runners: usize, - pub stop_signal: watch::Receiver, - - queue_in: mpsc::UnboundedSender<(Job, bool)>, - queue_out: Mutex>, - job_notify: Notify, - - workers: Mutex>>, -} - -impl BackgroundRunner { - pub fn new(n_runners: usize, stop_signal: watch::Receiver) -> Arc { - let (queue_in, queue_out) = mpsc::unbounded_channel(); - Arc::new(Self { - n_runners, - stop_signal, - queue_in, - queue_out: Mutex::new(queue_out), - job_notify: Notify::new(), - workers: Mutex::new(Vec::new()), - }) - } - - pub async fn run(self: Arc) { - let mut workers = self.workers.lock().await; - for i in 0..self.n_runners { - workers.push(tokio::spawn(self.clone().runner(i))); - } - drop(workers); - - let mut stop_signal = self.stop_signal.clone(); - while let Some(exit_now) = stop_signal.recv().await { - if exit_now { - let mut workers = self.workers.lock().await; - let workers_vec = workers.drain(..).collect::>(); - join_all(workers_vec).await; - return; - } - } - } - - pub fn spawn(&self, job: T) - where - T: Future + Send + 'static, - { - let boxed: Job = Box::pin(job); - let _: Result<_, _> = self.queue_in.clone().send((boxed, false)); - self.job_notify.notify(); - } - - pub fn spawn_cancellable(&self, job: T) - where - T: Future + Send + 'static, - { - let boxed: Job = Box::pin(job); - let _: Result<_, _> = self.queue_in.clone().send((boxed, true)); - self.job_notify.notify(); - } - - pub async fn spawn_worker(&self, name: String, worker: F) - where - F: FnOnce(watch::Receiver) -> T + Send + 'static, - T: Future + Send + 'static, - { - let mut workers = self.workers.lock().await; - let stop_signal = self.stop_signal.clone(); - workers.push(tokio::spawn(async move { - if let Err(e) = worker(stop_signal).await { - error!("Worker stopped with error: {}, error: {}", name, e); - } else { - info!("Worker exited successfully: {}", name); - } - })); - } - - async fn runner(self: Arc, i: usize) { - let mut stop_signal = self.stop_signal.clone(); - loop { - let must_exit: bool = *stop_signal.borrow(); - if let Some(job) = self.dequeue_job(must_exit).await { - if let Err(e) = job.await { - error!("Job failed: {}", e) - } - } else { - if must_exit { - info!("Background runner {} exiting", i); - return; - } - select! { - _ = self.job_notify.notified().fuse() => (), - _ = stop_signal.recv().fuse() => (), - } - } - } - } - - async fn dequeue_job(&self, must_exit: bool) -> Option { - let mut queue = self.queue_out.lock().await; - while let Ok((job, cancellable)) = queue.try_recv() { - if cancellable && must_exit { - continue; - } else { - return Some(job); - } - } - None - } -} diff --git a/src/config.rs b/src/config.rs deleted file mode 100644 index 7a6ae3f2..00000000 --- a/src/config.rs +++ /dev/null @@ -1,66 +0,0 @@ -use std::io::Read; -use std::net::SocketAddr; -use std::path::PathBuf; - -use serde::Deserialize; - -use crate::error::Error; - -#[derive(Deserialize, Debug, Clone)] -pub struct Config { - pub metadata_dir: PathBuf, - pub data_dir: PathBuf, - - pub api_bind_addr: SocketAddr, - pub rpc_bind_addr: SocketAddr, - - pub bootstrap_peers: Vec, - - #[serde(default = "default_max_concurrent_rpc_requests")] - pub max_concurrent_rpc_requests: usize, - - #[serde(default = "default_block_size")] - pub block_size: usize, - - #[serde(default = "default_replication_factor")] - pub meta_replication_factor: usize, - - #[serde(default = "default_epidemic_factor")] - pub meta_epidemic_factor: usize, - - #[serde(default = "default_replication_factor")] - pub data_replication_factor: usize, - - pub rpc_tls: Option, -} - -fn default_max_concurrent_rpc_requests() -> usize { - 12 -} -fn default_block_size() -> usize { - 1048576 -} -fn default_replication_factor() -> usize { - 3 -} -fn default_epidemic_factor() -> usize { - 3 -} - -#[derive(Deserialize, Debug, Clone)] -pub struct TlsConfig { - pub ca_cert: String, - pub node_cert: String, - pub node_key: String, -} - -pub fn read_config(config_file: PathBuf) -> Result { - let mut file = std::fs::OpenOptions::new() - .read(true) - .open(config_file.as_path())?; - - let mut config = String::new(); - file.read_to_string(&mut config)?; - - Ok(toml::from_str(&config)?) -} diff --git a/src/core/Cargo.toml b/src/core/Cargo.toml new file mode 100644 index 00000000..1e482d87 --- /dev/null +++ b/src/core/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "garage_core" +version = "0.1.0" +authors = ["Alex Auvolat "] +edition = "2018" + +[lib] +path = "lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +garage_util = { path = "../util" } +garage_rpc = { path = "../rpc" } +garage_table = { path = "../table" } + +bytes = "0.4" +rand = "0.7" +hex = "0.3" +sha2 = "0.8" +arc-swap = "0.4" +log = "0.4" + +sled = "0.31" + +rmp-serde = "0.14.3" +serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } +serde_bytes = "0.11" + +async-trait = "0.1.30" +futures = "0.3" +futures-util = "0.3" +tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } + diff --git a/src/core/block.rs b/src/core/block.rs new file mode 100644 index 00000000..af8b9efb --- /dev/null +++ b/src/core/block.rs @@ -0,0 +1,506 @@ +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; + +use arc_swap::ArcSwapOption; +use futures::future::*; +use futures::select; +use futures::stream::*; +use serde::{Deserialize, Serialize}; +use tokio::fs; +use tokio::prelude::*; +use tokio::sync::{watch, Mutex, Notify}; + +use garage_util::data; +use garage_util::data::*; +use garage_util::error::Error; + +use garage_rpc::membership::System; +use garage_rpc::rpc_client::*; +use garage_rpc::rpc_server::*; + +use garage_table::table_sharded::TableShardedReplication; +use garage_table::TableReplication; + +use crate::block_ref_table::*; + +use crate::garage::Garage; + +pub const INLINE_THRESHOLD: usize = 3072; + +const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42); +const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); +const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); + +#[derive(Debug, Serialize, Deserialize)] +pub enum Message { + Ok, + GetBlock(Hash), + PutBlock(PutBlockMessage), + NeedBlockQuery(Hash), + NeedBlockReply(bool), +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct PutBlockMessage { + pub hash: Hash, + + #[serde(with = "serde_bytes")] + pub data: Vec, +} + +impl RpcMessage for Message {} + +pub struct BlockManager { + pub replication: TableShardedReplication, + pub data_dir: PathBuf, + pub data_dir_lock: Mutex<()>, + + pub rc: sled::Tree, + + pub resync_queue: sled::Tree, + pub resync_notify: Notify, + + pub system: Arc, + rpc_client: Arc>, + pub garage: ArcSwapOption, +} + +impl BlockManager { + pub fn new( + db: &sled::Db, + data_dir: PathBuf, + replication: TableShardedReplication, + system: Arc, + rpc_server: &mut RpcServer, + ) -> Arc { + let rc = db + .open_tree("block_local_rc") + .expect("Unable to open block_local_rc tree"); + rc.set_merge_operator(rc_merge); + + let resync_queue = db + .open_tree("block_local_resync_queue") + .expect("Unable to open block_local_resync_queue tree"); + + let rpc_path = "block_manager"; + let rpc_client = system.rpc_client::(rpc_path); + + let block_manager = Arc::new(Self { + replication, + data_dir, + data_dir_lock: Mutex::new(()), + rc, + resync_queue, + resync_notify: Notify::new(), + system, + rpc_client, + garage: ArcSwapOption::from(None), + }); + block_manager + .clone() + .register_handler(rpc_server, rpc_path.into()); + block_manager + } + + fn register_handler(self: Arc, rpc_server: &mut RpcServer, path: String) { + let self2 = self.clone(); + rpc_server.add_handler::(path, move |msg, _addr| { + let self2 = self2.clone(); + async move { self2.handle(&msg).await } + }); + + let self2 = self.clone(); + self.rpc_client + .set_local_handler(self.system.id, move |msg| { + let self2 = self2.clone(); + async move { self2.handle(&msg).await } + }); + } + + async fn handle(self: Arc, msg: &Message) -> Result { + match msg { + Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await, + Message::GetBlock(h) => self.read_block(h).await, + Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply), + _ => Err(Error::BadRequest(format!("Unexpected RPC message"))), + } + } + + pub async fn spawn_background_worker(self: Arc) { + // Launch 2 simultaneous workers for background resync loop preprocessing + for i in 0..2usize { + let bm2 = self.clone(); + let background = self.system.background.clone(); + tokio::spawn(async move { + tokio::time::delay_for(Duration::from_secs(10)).await; + background + .spawn_worker(format!("block resync worker {}", i), move |must_exit| { + bm2.resync_loop(must_exit) + }) + .await; + }); + } + } + + pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result { + let _lock = self.data_dir_lock.lock().await; + + let mut path = self.block_dir(hash); + fs::create_dir_all(&path).await?; + + path.push(hex::encode(hash)); + if fs::metadata(&path).await.is_ok() { + return Ok(Message::Ok); + } + + let mut f = fs::File::create(path).await?; + f.write_all(data).await?; + drop(f); + + Ok(Message::Ok) + } + + pub async fn read_block(&self, hash: &Hash) -> Result { + let path = self.block_path(hash); + + let mut f = match fs::File::open(&path).await { + Ok(f) => f, + Err(e) => { + // Not found but maybe we should have had it ?? + self.put_to_resync(hash, 0)?; + return Err(Into::into(e)); + } + }; + let mut data = vec![]; + f.read_to_end(&mut data).await?; + drop(f); + + if data::hash(&data[..]) != *hash { + let _lock = self.data_dir_lock.lock().await; + warn!("Block {:?} is corrupted. Deleting and resyncing.", hash); + fs::remove_file(path).await?; + self.put_to_resync(&hash, 0)?; + return Err(Error::CorruptData(*hash)); + } + + Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) + } + + pub async fn need_block(&self, hash: &Hash) -> Result { + let needed = self + .rc + .get(hash.as_ref())? + .map(|x| u64_from_bytes(x.as_ref()) > 0) + .unwrap_or(false); + if needed { + let path = self.block_path(hash); + let exists = fs::metadata(&path).await.is_ok(); + Ok(!exists) + } else { + Ok(false) + } + } + + fn block_dir(&self, hash: &Hash) -> PathBuf { + let mut path = self.data_dir.clone(); + path.push(hex::encode(&hash.as_slice()[0..1])); + path.push(hex::encode(&hash.as_slice()[1..2])); + path + } + fn block_path(&self, hash: &Hash) -> PathBuf { + let mut path = self.block_dir(hash); + path.push(hex::encode(hash.as_ref())); + path + } + + pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { + let old_rc = self.rc.get(&hash)?; + self.rc.merge(&hash, vec![1])?; + if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { + self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?; + } + Ok(()) + } + + pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { + let new_rc = self.rc.merge(&hash, vec![0])?; + if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { + self.put_to_resync(&hash, 0)?; + } + Ok(()) + } + + fn put_to_resync(&self, hash: &Hash, delay_millis: u64) -> Result<(), Error> { + let when = now_msec() + delay_millis; + trace!("Put resync_queue: {} {:?}", when, hash); + let mut key = u64::to_be_bytes(when).to_vec(); + key.extend(hash.as_ref()); + self.resync_queue.insert(key, hash.as_ref())?; + self.resync_notify.notify(); + Ok(()) + } + + async fn resync_loop( + self: Arc, + mut must_exit: watch::Receiver, + ) -> Result<(), Error> { + let mut n_failures = 0usize; + while !*must_exit.borrow() { + if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { + let time_msec = u64_from_bytes(&time_bytes[0..8]); + let now = now_msec(); + if now >= time_msec { + let mut hash = [0u8; 32]; + hash.copy_from_slice(hash_bytes.as_ref()); + let hash = Hash::from(hash); + + if let Err(e) = self.resync_iter(&hash).await { + warn!("Failed to resync block {:?}, retrying later: {}", hash, e); + self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?; + n_failures += 1; + if n_failures >= 10 { + warn!("Too many resync failures, throttling."); + tokio::time::delay_for(Duration::from_secs(1)).await; + } + } else { + n_failures = 0; + } + } else { + self.resync_queue.insert(time_bytes, hash_bytes)?; + let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now)); + select! { + _ = delay.fuse() => (), + _ = self.resync_notify.notified().fuse() => (), + _ = must_exit.recv().fuse() => (), + } + } + } else { + select! { + _ = self.resync_notify.notified().fuse() => (), + _ = must_exit.recv().fuse() => (), + } + } + } + Ok(()) + } + + async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> { + let path = self.block_path(hash); + + let exists = fs::metadata(&path).await.is_ok(); + let needed = self + .rc + .get(hash.as_ref())? + .map(|x| u64_from_bytes(x.as_ref()) > 0) + .unwrap_or(false); + + if exists != needed { + info!( + "Resync block {:?}: exists {}, needed {}", + hash, exists, needed + ); + } + + if exists && !needed { + let garage = self.garage.load_full().unwrap(); + let active_refs = garage + .block_ref_table + .get_range(&hash, None, Some(()), 1) + .await?; + let needed_by_others = !active_refs.is_empty(); + if needed_by_others { + let ring = self.system.ring.borrow().clone(); + let who = self.replication.replication_nodes(&hash, &ring); + let msg = Arc::new(Message::NeedBlockQuery(*hash)); + let who_needs_fut = who.iter().map(|to| { + self.rpc_client + .call_arc(*to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT) + }); + let who_needs = join_all(who_needs_fut).await; + + let mut need_nodes = vec![]; + for (node, needed) in who.into_iter().zip(who_needs.iter()) { + match needed { + Ok(Message::NeedBlockReply(needed)) => { + if *needed { + need_nodes.push(node); + } + } + Err(e) => { + return Err(Error::Message(format!( + "Should delete block, but unable to confirm that all other nodes that need it have it: {}", + e + ))); + } + Ok(_) => { + return Err(Error::Message(format!( + "Unexpected response to NeedBlockQuery RPC" + ))); + } + } + } + + if need_nodes.len() > 0 { + let put_block_message = self.read_block(hash).await?; + self.rpc_client + .try_call_many( + &need_nodes[..], + put_block_message, + RequestStrategy::with_quorum(need_nodes.len()) + .with_timeout(BLOCK_RW_TIMEOUT), + ) + .await?; + } + } + fs::remove_file(path).await?; + self.resync_queue.remove(&hash)?; + } + + if needed && !exists { + // TODO find a way to not do this if they are sending it to us + // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay + // between the RC being incremented and this part being called. + let block_data = self.rpc_get_block(&hash).await?; + self.write_block(hash, &block_data[..]).await?; + } + + Ok(()) + } + + pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { + let who = self.replication.read_nodes(&hash, &self.system); + let resps = self + .rpc_client + .try_call_many( + &who[..], + Message::GetBlock(*hash), + RequestStrategy::with_quorum(1) + .with_timeout(BLOCK_RW_TIMEOUT) + .interrupt_after_quorum(true), + ) + .await?; + + for resp in resps { + if let Message::PutBlock(msg) = resp { + return Ok(msg.data); + } + } + Err(Error::Message(format!( + "Unable to read block {:?}: no valid blocks returned", + hash + ))) + } + + pub async fn rpc_put_block(&self, hash: Hash, data: Vec) -> Result<(), Error> { + let who = self.replication.write_nodes(&hash, &self.system); + self.rpc_client + .try_call_many( + &who[..], + Message::PutBlock(PutBlockMessage { hash, data }), + RequestStrategy::with_quorum(self.replication.write_quorum()) + .with_timeout(BLOCK_RW_TIMEOUT), + ) + .await?; + Ok(()) + } + + pub async fn repair_data_store(&self, must_exit: &watch::Receiver) -> Result<(), Error> { + // 1. Repair blocks from RC table + let garage = self.garage.load_full().unwrap(); + let mut last_hash = None; + let mut i = 0usize; + for entry in garage.block_ref_table.store.iter() { + let (_k, v_bytes) = entry?; + let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(v_bytes.as_ref())?; + if Some(&block_ref.block) == last_hash.as_ref() { + continue; + } + if !block_ref.deleted { + last_hash = Some(block_ref.block); + self.put_to_resync(&block_ref.block, 0)?; + } + i += 1; + if i & 0xFF == 0 && *must_exit.borrow() { + return Ok(()); + } + } + + // 2. Repair blocks actually on disk + let mut ls_data_dir = fs::read_dir(&self.data_dir).await?; + while let Some(data_dir_ent) = ls_data_dir.next().await { + let data_dir_ent = data_dir_ent?; + let dir_name = data_dir_ent.file_name(); + let dir_name = match dir_name.into_string() { + Ok(x) => x, + Err(_) => continue, + }; + if dir_name.len() != 2 || hex::decode(&dir_name).is_err() { + continue; + } + + let mut ls_data_dir_2 = match fs::read_dir(data_dir_ent.path()).await { + Err(e) => { + warn!( + "Warning: could not list dir {:?}: {}", + data_dir_ent.path().to_str(), + e + ); + continue; + } + Ok(x) => x, + }; + while let Some(file) = ls_data_dir_2.next().await { + let file = file?; + let file_name = file.file_name(); + let file_name = match file_name.into_string() { + Ok(x) => x, + Err(_) => continue, + }; + if file_name.len() != 64 { + continue; + } + let hash_bytes = match hex::decode(&file_name) { + Ok(h) => h, + Err(_) => continue, + }; + let mut hash = [0u8; 32]; + hash.copy_from_slice(&hash_bytes[..]); + self.put_to_resync(&hash.into(), 0)?; + + if *must_exit.borrow() { + return Ok(()); + } + } + } + Ok(()) + } +} + +fn u64_from_bytes(bytes: &[u8]) -> u64 { + assert!(bytes.len() == 8); + let mut x8 = [0u8; 8]; + x8.copy_from_slice(bytes); + u64::from_be_bytes(x8) +} + +fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option> { + let old = old.map(u64_from_bytes).unwrap_or(0); + assert!(new.len() == 1); + let new = match new[0] { + 0 => { + if old > 0 { + old - 1 + } else { + 0 + } + } + 1 => old + 1, + _ => unreachable!(), + }; + if new == 0 { + None + } else { + Some(u64::to_be_bytes(new).to_vec()) + } +} diff --git a/src/core/block_ref_table.rs b/src/core/block_ref_table.rs new file mode 100644 index 00000000..a00438c0 --- /dev/null +++ b/src/core/block_ref_table.rs @@ -0,0 +1,68 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +use garage_util::background::*; +use garage_util::data::*; +use garage_util::error::Error; + +use garage_table::*; + +use crate::block::*; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct BlockRef { + // Primary key + pub block: Hash, + + // Sort key + pub version: UUID, + + // Keep track of deleted status + pub deleted: bool, +} + +impl Entry for BlockRef { + fn partition_key(&self) -> &Hash { + &self.block + } + fn sort_key(&self) -> &UUID { + &self.version + } + + fn merge(&mut self, other: &Self) { + if other.deleted { + self.deleted = true; + } + } +} + +pub struct BlockRefTable { + pub background: Arc, + pub block_manager: Arc, +} + +#[async_trait] +impl TableSchema for BlockRefTable { + type P = Hash; + type S = UUID; + type E = BlockRef; + type Filter = (); + + async fn updated(&self, old: Option, new: Option) -> Result<(), Error> { + let block = &old.as_ref().or(new.as_ref()).unwrap().block; + let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false); + let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false); + if is_after && !was_before { + self.block_manager.block_incref(block)?; + } + if was_before && !is_after { + self.block_manager.block_decref(block)?; + } + Ok(()) + } + + fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { + !entry.deleted + } +} diff --git a/src/core/bucket_table.rs b/src/core/bucket_table.rs new file mode 100644 index 00000000..28234d82 --- /dev/null +++ b/src/core/bucket_table.rs @@ -0,0 +1,121 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +use garage_table::*; +use garage_util::error::Error; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Bucket { + // Primary key + pub name: String, + + // Timestamp and deletion + // Upon version increment, all info is replaced + pub timestamp: u64, + pub deleted: bool, + + // Authorized keys + authorized_keys: Vec, +} + +impl Bucket { + pub fn new( + name: String, + timestamp: u64, + deleted: bool, + authorized_keys: Vec, + ) -> Self { + let mut ret = Bucket { + name, + timestamp, + deleted, + authorized_keys: vec![], + }; + for key in authorized_keys { + ret.add_key(key) + .expect("Duplicate AllowedKey in Bucket constructor"); + } + ret + } + /// Add a key only if it is not already present + pub fn add_key(&mut self, key: AllowedKey) -> Result<(), ()> { + match self + .authorized_keys + .binary_search_by(|k| k.key_id.cmp(&key.key_id)) + { + Err(i) => { + self.authorized_keys.insert(i, key); + Ok(()) + } + Ok(_) => Err(()), + } + } + pub fn authorized_keys(&self) -> &[AllowedKey] { + &self.authorized_keys[..] + } + pub fn clear_keys(&mut self) { + self.authorized_keys.clear(); + } +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct AllowedKey { + pub key_id: String, + pub timestamp: u64, + pub allow_read: bool, + pub allow_write: bool, +} + +impl Entry for Bucket { + fn partition_key(&self) -> &EmptyKey { + &EmptyKey + } + fn sort_key(&self) -> &String { + &self.name + } + + fn merge(&mut self, other: &Self) { + if other.timestamp < self.timestamp { + *self = other.clone(); + return; + } + if self.timestamp > other.timestamp || self.deleted { + return; + } + + for ak in other.authorized_keys.iter() { + match self + .authorized_keys + .binary_search_by(|our_ak| our_ak.key_id.cmp(&ak.key_id)) + { + Ok(i) => { + let our_ak = &mut self.authorized_keys[i]; + if ak.timestamp > our_ak.timestamp { + *our_ak = ak.clone(); + } + } + Err(i) => { + self.authorized_keys.insert(i, ak.clone()); + } + } + } + } +} + +pub struct BucketTable; + +#[async_trait] +impl TableSchema for BucketTable { + type P = EmptyKey; + type S = String; + type E = Bucket; + type Filter = (); + + async fn updated(&self, _old: Option, _new: Option) -> Result<(), Error> { + Ok(()) + } + + fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { + !entry.deleted + } +} diff --git a/src/core/garage.rs b/src/core/garage.rs new file mode 100644 index 00000000..d77b0acd --- /dev/null +++ b/src/core/garage.rs @@ -0,0 +1,166 @@ +use std::sync::Arc; + +use garage_util::background::*; +use garage_util::config::*; + +use garage_rpc::membership::System; +use garage_rpc::rpc_client::RpcHttpClient; +use garage_rpc::rpc_server::RpcServer; + +use garage_table::table_fullcopy::*; +use garage_table::table_sharded::*; +use garage_table::*; + +use crate::block::*; +use crate::block_ref_table::*; +use crate::bucket_table::*; +use crate::key_table::*; +use crate::object_table::*; +use crate::version_table::*; + +pub struct Garage { + pub config: Config, + + pub db: sled::Db, + pub background: Arc, + pub system: Arc, + pub block_manager: Arc, + + pub bucket_table: Arc>, + pub key_table: Arc>, + + pub object_table: Arc>, + pub version_table: Arc>, + pub block_ref_table: Arc>, +} + +impl Garage { + pub async fn new( + config: Config, + db: sled::Db, + background: Arc, + rpc_server: &mut RpcServer, + ) -> Arc { + info!("Initialize membership management system..."); + let rpc_http_client = Arc::new( + RpcHttpClient::new(config.max_concurrent_rpc_requests, &config.rpc_tls) + .expect("Could not create RPC client"), + ); + let system = System::new( + config.metadata_dir.clone(), + rpc_http_client, + background.clone(), + rpc_server, + ); + + let data_rep_param = TableShardedReplication { + replication_factor: config.data_replication_factor, + write_quorum: (config.data_replication_factor + 1) / 2, + read_quorum: 1, + }; + + let meta_rep_param = TableShardedReplication { + replication_factor: config.meta_replication_factor, + write_quorum: (config.meta_replication_factor + 1) / 2, + read_quorum: (config.meta_replication_factor + 1) / 2, + }; + + let control_rep_param = TableFullReplication::new( + config.meta_epidemic_factor, + (config.meta_epidemic_factor + 1) / 2, + ); + + info!("Initialize block manager..."); + let block_manager = BlockManager::new( + &db, + config.data_dir.clone(), + data_rep_param.clone(), + system.clone(), + rpc_server, + ); + + info!("Initialize block_ref_table..."); + let block_ref_table = Table::new( + BlockRefTable { + background: background.clone(), + block_manager: block_manager.clone(), + }, + data_rep_param.clone(), + system.clone(), + &db, + "block_ref".to_string(), + rpc_server, + ) + .await; + + info!("Initialize version_table..."); + let version_table = Table::new( + VersionTable { + background: background.clone(), + block_ref_table: block_ref_table.clone(), + }, + meta_rep_param.clone(), + system.clone(), + &db, + "version".to_string(), + rpc_server, + ) + .await; + + info!("Initialize object_table..."); + let object_table = Table::new( + ObjectTable { + background: background.clone(), + version_table: version_table.clone(), + }, + meta_rep_param.clone(), + system.clone(), + &db, + "object".to_string(), + rpc_server, + ) + .await; + + info!("Initialize bucket_table..."); + let bucket_table = Table::new( + BucketTable, + control_rep_param.clone(), + system.clone(), + &db, + "bucket".to_string(), + rpc_server, + ) + .await; + + info!("Initialize key_table_table..."); + let key_table = Table::new( + KeyTable, + control_rep_param.clone(), + system.clone(), + &db, + "key".to_string(), + rpc_server, + ) + .await; + + info!("Initialize Garage..."); + let garage = Arc::new(Self { + config, + db, + system: system.clone(), + block_manager, + background, + bucket_table, + key_table, + object_table, + version_table, + block_ref_table, + }); + + info!("Start block manager background thread..."); + garage.block_manager.garage.swap(Some(garage.clone())); + garage.block_manager.clone().spawn_background_worker().await; + + garage + } +} diff --git a/src/core/key_table.rs b/src/core/key_table.rs new file mode 100644 index 00000000..76d163b5 --- /dev/null +++ b/src/core/key_table.rs @@ -0,0 +1,154 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +use garage_table::*; +use garage_util::data::*; +use garage_util::error::Error; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Key { + // Primary key + pub key_id: String, + + // Associated secret key (immutable) + pub secret_key: String, + + // Name + pub name: String, + pub name_timestamp: u64, + + // Deletion + pub deleted: bool, + + // Authorized keys + authorized_buckets: Vec, +} + +impl Key { + pub fn new(name: String, buckets: Vec) -> Self { + let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..])); + let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]); + let mut ret = Self { + key_id, + secret_key, + name, + name_timestamp: now_msec(), + deleted: false, + authorized_buckets: vec![], + }; + for b in buckets { + ret.add_bucket(b) + .expect("Duplicate AllowedBucket in Key constructor"); + } + ret + } + pub fn delete(key_id: String) -> Self { + Self { + key_id, + secret_key: "".into(), + name: "".into(), + name_timestamp: now_msec(), + deleted: true, + authorized_buckets: vec![], + } + } + /// Add an authorized bucket, only if it wasn't there before + pub fn add_bucket(&mut self, new: AllowedBucket) -> Result<(), ()> { + match self + .authorized_buckets + .binary_search_by(|b| b.bucket.cmp(&new.bucket)) + { + Err(i) => { + self.authorized_buckets.insert(i, new); + Ok(()) + } + Ok(_) => Err(()), + } + } + pub fn authorized_buckets(&self) -> &[AllowedBucket] { + &self.authorized_buckets[..] + } + pub fn clear_buckets(&mut self) { + self.authorized_buckets.clear(); + } + pub fn allow_read(&self, bucket: &str) -> bool { + self.authorized_buckets + .iter() + .find(|x| x.bucket.as_str() == bucket) + .map(|x| x.allow_read) + .unwrap_or(false) + } + pub fn allow_write(&self, bucket: &str) -> bool { + self.authorized_buckets + .iter() + .find(|x| x.bucket.as_str() == bucket) + .map(|x| x.allow_write) + .unwrap_or(false) + } +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct AllowedBucket { + pub bucket: String, + pub timestamp: u64, + pub allow_read: bool, + pub allow_write: bool, +} + +impl Entry for Key { + fn partition_key(&self) -> &EmptyKey { + &EmptyKey + } + fn sort_key(&self) -> &String { + &self.key_id + } + + fn merge(&mut self, other: &Self) { + if other.deleted { + self.deleted = true; + } + if self.deleted { + self.authorized_buckets.clear(); + return; + } + if other.name_timestamp > self.name_timestamp { + self.name_timestamp = other.name_timestamp; + self.name = other.name.clone(); + } + + for ab in other.authorized_buckets.iter() { + match self + .authorized_buckets + .binary_search_by(|our_ab| our_ab.bucket.cmp(&ab.bucket)) + { + Ok(i) => { + let our_ab = &mut self.authorized_buckets[i]; + if ab.timestamp > our_ab.timestamp { + *our_ab = ab.clone(); + } + } + Err(i) => { + self.authorized_buckets.insert(i, ab.clone()); + } + } + } + } +} + +pub struct KeyTable; + +#[async_trait] +impl TableSchema for KeyTable { + type P = EmptyKey; + type S = String; + type E = Key; + type Filter = (); + + async fn updated(&self, _old: Option, _new: Option) -> Result<(), Error> { + Ok(()) + } + + fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { + !entry.deleted + } +} diff --git a/src/core/lib.rs b/src/core/lib.rs new file mode 100644 index 00000000..b4a8ddb7 --- /dev/null +++ b/src/core/lib.rs @@ -0,0 +1,10 @@ +#[macro_use] +extern crate log; + +pub mod block; +pub mod block_ref_table; +pub mod bucket_table; +pub mod garage; +pub mod key_table; +pub mod object_table; +pub mod version_table; diff --git a/src/core/object_table.rs b/src/core/object_table.rs new file mode 100644 index 00000000..1fe2b3d4 --- /dev/null +++ b/src/core/object_table.rs @@ -0,0 +1,165 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +use garage_util::background::BackgroundRunner; +use garage_util::data::*; +use garage_util::error::Error; + +use garage_table::table_sharded::*; +use garage_table::*; + +use crate::version_table::*; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Object { + // Primary key + pub bucket: String, + + // Sort key + pub key: String, + + // Data + versions: Vec, +} + +impl Object { + pub fn new(bucket: String, key: String, versions: Vec) -> Self { + let mut ret = Self { + bucket, + key, + versions: vec![], + }; + for v in versions { + ret.add_version(v) + .expect("Twice the same ObjectVersion in Object constructor"); + } + ret + } + /// Adds a version if it wasn't already present + pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> { + match self + .versions + .binary_search_by(|v| v.cmp_key().cmp(&new.cmp_key())) + { + Err(i) => { + self.versions.insert(i, new); + Ok(()) + } + Ok(_) => Err(()), + } + } + pub fn versions(&self) -> &[ObjectVersion] { + &self.versions[..] + } +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersion { + pub uuid: UUID, + pub timestamp: u64, + + pub mime_type: String, + pub size: u64, + pub is_complete: bool, + + pub data: ObjectVersionData, +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum ObjectVersionData { + DeleteMarker, + Inline(#[serde(with = "serde_bytes")] Vec), + FirstBlock(Hash), +} + +impl ObjectVersion { + fn cmp_key(&self) -> (u64, &UUID) { + (self.timestamp, &self.uuid) + } +} + +impl Entry for Object { + fn partition_key(&self) -> &String { + &self.bucket + } + fn sort_key(&self) -> &String { + &self.key + } + + fn merge(&mut self, other: &Self) { + for other_v in other.versions.iter() { + match self + .versions + .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) + { + Ok(i) => { + let mut v = &mut self.versions[i]; + if other_v.size > v.size { + v.size = other_v.size; + } + if other_v.is_complete && !v.is_complete { + v.is_complete = true; + } + } + Err(i) => { + self.versions.insert(i, other_v.clone()); + } + } + } + let last_complete = self + .versions + .iter() + .enumerate() + .rev() + .filter(|(_, v)| v.is_complete) + .next() + .map(|(vi, _)| vi); + + if let Some(last_vi) = last_complete { + self.versions = self.versions.drain(last_vi..).collect::>(); + } + } +} + +pub struct ObjectTable { + pub background: Arc, + pub version_table: Arc>, +} + +#[async_trait] +impl TableSchema for ObjectTable { + type P = String; + type S = String; + type E = Object; + type Filter = (); + + async fn updated(&self, old: Option, new: Option) -> Result<(), Error> { + let version_table = self.version_table.clone(); + if let (Some(old_v), Some(new_v)) = (old, new) { + // Propagate deletion of old versions + for v in old_v.versions.iter() { + if new_v + .versions + .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) + .is_err() + { + let deleted_version = Version::new( + v.uuid, + old_v.bucket.clone(), + old_v.key.clone(), + true, + vec![], + ); + version_table.insert(&deleted_version).await?; + } + } + } + Ok(()) + } + + fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { + // TODO + true + } +} diff --git a/src/core/version_table.rs b/src/core/version_table.rs new file mode 100644 index 00000000..ae32e5cb --- /dev/null +++ b/src/core/version_table.rs @@ -0,0 +1,131 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +use garage_util::background::BackgroundRunner; +use garage_util::data::*; +use garage_util::error::Error; + +use garage_table::table_sharded::*; +use garage_table::*; + +use crate::block_ref_table::*; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Version { + // Primary key + pub uuid: UUID, + + // Actual data: the blocks for this version + pub deleted: bool, + blocks: Vec, + + // Back link to bucket+key so that we can figure if + // this was deleted later on + pub bucket: String, + pub key: String, +} + +impl Version { + pub fn new( + uuid: UUID, + bucket: String, + key: String, + deleted: bool, + blocks: Vec, + ) -> Self { + let mut ret = Self { + uuid, + deleted, + blocks: vec![], + bucket, + key, + }; + for b in blocks { + ret.add_block(b) + .expect("Twice the same VersionBlock in Version constructor"); + } + ret + } + /// Adds a block if it wasn't already present + pub fn add_block(&mut self, new: VersionBlock) -> Result<(), ()> { + match self.blocks.binary_search_by(|b| b.offset.cmp(&new.offset)) { + Err(i) => { + self.blocks.insert(i, new); + Ok(()) + } + Ok(_) => Err(()), + } + } + pub fn blocks(&self) -> &[VersionBlock] { + &self.blocks[..] + } +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct VersionBlock { + pub offset: u64, + pub hash: Hash, +} + +impl Entry for Version { + fn partition_key(&self) -> &Hash { + &self.uuid + } + fn sort_key(&self) -> &EmptyKey { + &EmptyKey + } + + fn merge(&mut self, other: &Self) { + if other.deleted { + self.deleted = true; + self.blocks.clear(); + } else if !self.deleted { + for bi in other.blocks.iter() { + match self.blocks.binary_search_by(|x| x.offset.cmp(&bi.offset)) { + Ok(_) => (), + Err(pos) => { + self.blocks.insert(pos, bi.clone()); + } + } + } + } + } +} + +pub struct VersionTable { + pub background: Arc, + pub block_ref_table: Arc>, +} + +#[async_trait] +impl TableSchema for VersionTable { + type P = Hash; + type S = EmptyKey; + type E = Version; + type Filter = (); + + async fn updated(&self, old: Option, new: Option) -> Result<(), Error> { + let block_ref_table = self.block_ref_table.clone(); + if let (Some(old_v), Some(new_v)) = (old, new) { + // Propagate deletion of version blocks + if new_v.deleted && !old_v.deleted { + let deleted_block_refs = old_v + .blocks + .iter() + .map(|vb| BlockRef { + block: vb.hash, + version: old_v.uuid, + deleted: true, + }) + .collect::>(); + block_ref_table.insert_many(&deleted_block_refs[..]).await?; + } + } + Ok(()) + } + + fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { + !entry.deleted + } +} diff --git a/src/data.rs b/src/data.rs deleted file mode 100644 index 8f976f71..00000000 --- a/src/data.rs +++ /dev/null @@ -1,124 +0,0 @@ -use rand::Rng; -use serde::de::{self, Visitor}; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use sha2::{Digest, Sha256}; -use std::fmt; -use std::time::{SystemTime, UNIX_EPOCH}; - -#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)] -pub struct FixedBytes32([u8; 32]); - -impl From<[u8; 32]> for FixedBytes32 { - fn from(x: [u8; 32]) -> FixedBytes32 { - FixedBytes32(x) - } -} - -impl std::convert::AsRef<[u8]> for FixedBytes32 { - fn as_ref(&self) -> &[u8] { - &self.0[..] - } -} - -impl Eq for FixedBytes32 {} - -impl fmt::Debug for FixedBytes32 { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}…", hex::encode(&self.0[..8])) - } -} - -struct FixedBytes32Visitor; -impl<'de> Visitor<'de> for FixedBytes32Visitor { - type Value = FixedBytes32; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("a byte slice of size 32") - } - - fn visit_bytes(self, value: &[u8]) -> Result { - if value.len() == 32 { - let mut res = [0u8; 32]; - res.copy_from_slice(value); - Ok(res.into()) - } else { - Err(E::custom(format!( - "Invalid byte string length {}, expected 32", - value.len() - ))) - } - } -} - -impl<'de> Deserialize<'de> for FixedBytes32 { - fn deserialize>(deserializer: D) -> Result { - deserializer.deserialize_bytes(FixedBytes32Visitor) - } -} - -impl Serialize for FixedBytes32 { - fn serialize(&self, serializer: S) -> Result { - serializer.serialize_bytes(&self.0[..]) - } -} - -impl FixedBytes32 { - pub fn as_slice(&self) -> &[u8] { - &self.0[..] - } - pub fn as_slice_mut(&mut self) -> &mut [u8] { - &mut self.0[..] - } - pub fn to_vec(&self) -> Vec { - self.0.to_vec() - } -} - -pub type UUID = FixedBytes32; -pub type Hash = FixedBytes32; - -pub fn hash(data: &[u8]) -> Hash { - let mut hasher = Sha256::new(); - hasher.input(data); - let mut hash = [0u8; 32]; - hash.copy_from_slice(&hasher.result()[..]); - hash.into() -} - -pub fn gen_uuid() -> UUID { - rand::thread_rng().gen::<[u8; 32]>().into() -} - -pub fn now_msec() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Fix your clock :o") - .as_millis() as u64 -} - -// RMP serialization with names of fields and variants - -pub fn rmp_to_vec_all_named(val: &T) -> Result, rmp_serde::encode::Error> -where - T: Serialize + ?Sized, -{ - let mut wr = Vec::with_capacity(128); - let mut se = rmp_serde::Serializer::new(&mut wr) - .with_struct_map() - .with_string_variants(); - val.serialize(&mut se)?; - Ok(wr) -} - -pub fn debug_serialize(x: T) -> String { - match serde_json::to_string(&x) { - Ok(ss) => { - if ss.len() > 100 { - ss[..100].to_string() - } else { - ss - } - } - Err(e) => format!("", e), - } -} diff --git a/src/error.rs b/src/error.rs deleted file mode 100644 index 6290dc24..00000000 --- a/src/error.rs +++ /dev/null @@ -1,95 +0,0 @@ -use err_derive::Error; -use hyper::StatusCode; -use std::io; - -use crate::data::Hash; -use crate::rpc::rpc_client::RPCError; - -#[derive(Debug, Error)] -pub enum Error { - #[error(display = "IO error: {}", _0)] - Io(#[error(source)] io::Error), - - #[error(display = "Hyper error: {}", _0)] - Hyper(#[error(source)] hyper::Error), - - #[error(display = "HTTP error: {}", _0)] - HTTP(#[error(source)] http::Error), - - #[error(display = "Invalid HTTP header value: {}", _0)] - HTTPHeader(#[error(source)] http::header::ToStrError), - - #[error(display = "TLS error: {}", _0)] - TLS(#[error(source)] rustls::TLSError), - - #[error(display = "PKI error: {}", _0)] - PKI(#[error(source)] webpki::Error), - - #[error(display = "Sled error: {}", _0)] - Sled(#[error(source)] sled::Error), - - #[error(display = "Messagepack encode error: {}", _0)] - RMPEncode(#[error(source)] rmp_serde::encode::Error), - #[error(display = "Messagepack decode error: {}", _0)] - RMPDecode(#[error(source)] rmp_serde::decode::Error), - #[error(display = "JSON error: {}", _0)] - JSON(#[error(source)] serde_json::error::Error), - #[error(display = "TOML decode error: {}", _0)] - TomlDecode(#[error(source)] toml::de::Error), - - #[error(display = "Timeout: {}", _0)] - RPCTimeout(#[error(source)] tokio::time::Elapsed), - - #[error(display = "Tokio join error: {}", _0)] - TokioJoin(#[error(source)] tokio::task::JoinError), - - #[error(display = "RPC call error: {}", _0)] - RPC(#[error(source)] RPCError), - - #[error(display = "Remote error: {} (status code {})", _0, _1)] - RemoteError(String, StatusCode), - - #[error(display = "Bad request: {}", _0)] - BadRequest(String), - - #[error(display = "Not found")] - NotFound, - - #[error(display = "Corrupt data: does not match hash {:?}", _0)] - CorruptData(Hash), - - #[error(display = "{}", _0)] - Message(String), -} - -impl Error { - pub fn http_status_code(&self) -> StatusCode { - match self { - Error::BadRequest(_) => StatusCode::BAD_REQUEST, - Error::NotFound => StatusCode::NOT_FOUND, - Error::RPC(_) => StatusCode::SERVICE_UNAVAILABLE, - _ => StatusCode::INTERNAL_SERVER_ERROR, - } - } -} - -impl From> for Error { - fn from(e: sled::TransactionError) -> Error { - match e { - sled::TransactionError::Abort(x) => x, - sled::TransactionError::Storage(x) => Error::Sled(x), - } - } -} - -impl From> for Error { - fn from(_e: tokio::sync::watch::error::SendError) -> Error { - Error::Message(format!("Watch send error")) - } -} - -impl From> for Error { - fn from(_e: tokio::sync::mpsc::error::SendError) -> Error { - Error::Message(format!("MPSC send error")) - } -} diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml new file mode 100644 index 00000000..08b55c32 --- /dev/null +++ b/src/garage/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "garage" +version = "0.1.0" +authors = ["Alex Auvolat "] +edition = "2018" + +[[bin]] +name = "garage" +path = "main.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +garage_util = { path = "../util" } +garage_rpc = { path = "../rpc" } +garage_table = { path = "../table" } +garage_core = { path = "../core" } +garage_api = { path = "../api" } + +bytes = "0.4" +rand = "0.7" +hex = "0.3" +sha2 = "0.8" +log = "0.4" +pretty_env_logger = "0.4" + +sled = "0.31" + +structopt = { version = "0.3", default-features = false } +toml = "0.5" +rmp-serde = "0.14.3" +serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } + +futures = "0.3" +futures-util = "0.3" +tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs new file mode 100644 index 00000000..aeaf2682 --- /dev/null +++ b/src/garage/admin_rpc.rs @@ -0,0 +1,358 @@ +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; + +use garage_util::data::*; +use garage_util::error::Error; + +use garage_table::*; + +use garage_rpc::rpc_client::*; +use garage_rpc::rpc_server::*; + +use garage_core::bucket_table::*; +use garage_core::garage::Garage; +use garage_core::key_table::*; + +use crate::repair::Repair; +use crate::*; + +pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30); +pub const ADMIN_RPC_PATH: &str = "_admin"; + +#[derive(Debug, Serialize, Deserialize)] +pub enum AdminRPC { + BucketOperation(BucketOperation), + KeyOperation(KeyOperation), + LaunchRepair(RepairOpt), + + // Replies + Ok(String), + BucketList(Vec), + BucketInfo(Bucket), + KeyList(Vec<(String, String)>), + KeyInfo(Key), +} + +impl RpcMessage for AdminRPC {} + +pub struct AdminRpcHandler { + garage: Arc, + rpc_client: Arc>, +} + +impl AdminRpcHandler { + pub fn new(garage: Arc) -> Arc { + let rpc_client = garage.system.clone().rpc_client::(ADMIN_RPC_PATH); + Arc::new(Self { garage, rpc_client }) + } + + pub fn register_handler(self: Arc, rpc_server: &mut RpcServer) { + rpc_server.add_handler::(ADMIN_RPC_PATH.to_string(), move |msg, _addr| { + let self2 = self.clone(); + async move { + match msg { + AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, + AdminRPC::KeyOperation(ko) => self2.handle_key_cmd(ko).await, + AdminRPC::LaunchRepair(opt) => self2.handle_launch_repair(opt).await, + _ => Err(Error::BadRequest(format!("Invalid RPC"))), + } + } + }); + } + + async fn handle_bucket_cmd(&self, cmd: BucketOperation) -> Result { + match cmd { + BucketOperation::List => { + let bucket_names = self + .garage + .bucket_table + .get_range(&EmptyKey, None, Some(()), 10000) + .await? + .iter() + .map(|b| b.name.to_string()) + .collect::>(); + Ok(AdminRPC::BucketList(bucket_names)) + } + BucketOperation::Info(query) => { + let bucket = self.get_existing_bucket(&query.name).await?; + Ok(AdminRPC::BucketInfo(bucket)) + } + BucketOperation::Create(query) => { + let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?; + if bucket.as_ref().filter(|b| !b.deleted).is_some() { + return Err(Error::BadRequest(format!( + "Bucket {} already exists", + query.name + ))); + } + let new_time = match bucket { + Some(b) => std::cmp::max(b.timestamp + 1, now_msec()), + None => now_msec(), + }; + self.garage + .bucket_table + .insert(&Bucket::new(query.name.clone(), new_time, false, vec![])) + .await?; + Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name))) + } + BucketOperation::Delete(query) => { + let bucket = self.get_existing_bucket(&query.name).await?; + let objects = self + .garage + .object_table + .get_range(&query.name, None, Some(()), 10) + .await?; + if !objects.is_empty() { + return Err(Error::BadRequest(format!( + "Bucket {} is not empty", + query.name + ))); + } + if !query.yes { + return Err(Error::BadRequest(format!( + "Add --yes flag to really perform this operation" + ))); + } + // --- done checking, now commit --- + for ak in bucket.authorized_keys() { + if let Some(key) = self.garage.key_table.get(&EmptyKey, &ak.key_id).await? { + if !key.deleted { + self.update_key_bucket(key, &bucket.name, false, false) + .await?; + } + } else { + return Err(Error::Message(format!("Key not found: {}", ak.key_id))); + } + } + self.garage + .bucket_table + .insert(&Bucket::new( + query.name.clone(), + std::cmp::max(bucket.timestamp + 1, now_msec()), + true, + vec![], + )) + .await?; + Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name))) + } + BucketOperation::Allow(query) => { + let key = self.get_existing_key(&query.key_id).await?; + let bucket = self.get_existing_bucket(&query.bucket).await?; + let allow_read = query.read || key.allow_read(&query.bucket); + let allow_write = query.write || key.allow_write(&query.bucket); + self.update_key_bucket(key, &query.bucket, allow_read, allow_write) + .await?; + self.update_bucket_key(bucket, &query.key_id, allow_read, allow_write) + .await?; + Ok(AdminRPC::Ok(format!( + "New permissions for {} on {}: read {}, write {}.", + &query.key_id, &query.bucket, allow_read, allow_write + ))) + } + BucketOperation::Deny(query) => { + let key = self.get_existing_key(&query.key_id).await?; + let bucket = self.get_existing_bucket(&query.bucket).await?; + let allow_read = !query.read && key.allow_read(&query.bucket); + let allow_write = !query.write && key.allow_write(&query.bucket); + self.update_key_bucket(key, &query.bucket, allow_read, allow_write) + .await?; + self.update_bucket_key(bucket, &query.key_id, allow_read, allow_write) + .await?; + Ok(AdminRPC::Ok(format!( + "New permissions for {} on {}: read {}, write {}.", + &query.key_id, &query.bucket, allow_read, allow_write + ))) + } + } + } + + async fn handle_key_cmd(&self, cmd: KeyOperation) -> Result { + match cmd { + KeyOperation::List => { + let key_ids = self + .garage + .key_table + .get_range(&EmptyKey, None, Some(()), 10000) + .await? + .iter() + .map(|k| (k.key_id.to_string(), k.name.to_string())) + .collect::>(); + Ok(AdminRPC::KeyList(key_ids)) + } + KeyOperation::Info(query) => { + let key = self.get_existing_key(&query.key_id).await?; + Ok(AdminRPC::KeyInfo(key)) + } + KeyOperation::New(query) => { + let key = Key::new(query.name, vec![]); + self.garage.key_table.insert(&key).await?; + Ok(AdminRPC::KeyInfo(key)) + } + KeyOperation::Rename(query) => { + let mut key = self.get_existing_key(&query.key_id).await?; + key.name_timestamp = std::cmp::max(key.name_timestamp + 1, now_msec()); + key.name = query.new_name; + self.garage.key_table.insert(&key).await?; + Ok(AdminRPC::KeyInfo(key)) + } + KeyOperation::Delete(query) => { + let key = self.get_existing_key(&query.key_id).await?; + if !query.yes { + return Err(Error::BadRequest(format!( + "Add --yes flag to really perform this operation" + ))); + } + // --- done checking, now commit --- + for ab in key.authorized_buckets().iter() { + if let Some(bucket) = + self.garage.bucket_table.get(&EmptyKey, &ab.bucket).await? + { + if !bucket.deleted { + self.update_bucket_key(bucket, &key.key_id, false, false) + .await?; + } + } else { + return Err(Error::Message(format!("Bucket not found: {}", ab.bucket))); + } + } + let del_key = Key::delete(key.key_id); + self.garage.key_table.insert(&del_key).await?; + Ok(AdminRPC::Ok(format!( + "Key {} was deleted successfully.", + query.key_id + ))) + } + } + } + + async fn get_existing_bucket(&self, bucket: &String) -> Result { + self.garage + .bucket_table + .get(&EmptyKey, bucket) + .await? + .filter(|b| !b.deleted) + .map(Ok) + .unwrap_or(Err(Error::BadRequest(format!( + "Bucket {} does not exist", + bucket + )))) + } + + async fn get_existing_key(&self, id: &String) -> Result { + self.garage + .key_table + .get(&EmptyKey, id) + .await? + .filter(|k| !k.deleted) + .map(Ok) + .unwrap_or(Err(Error::BadRequest(format!("Key {} does not exist", id)))) + } + + async fn update_bucket_key( + &self, + mut bucket: Bucket, + key_id: &String, + allow_read: bool, + allow_write: bool, + ) -> Result<(), Error> { + let timestamp = match bucket + .authorized_keys() + .iter() + .find(|x| x.key_id == *key_id) + { + None => now_msec(), + Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()), + }; + bucket.clear_keys(); + bucket + .add_key(AllowedKey { + key_id: key_id.clone(), + timestamp, + allow_read, + allow_write, + }) + .unwrap(); + self.garage.bucket_table.insert(&bucket).await?; + Ok(()) + } + + async fn update_key_bucket( + &self, + mut key: Key, + bucket: &String, + allow_read: bool, + allow_write: bool, + ) -> Result<(), Error> { + let timestamp = match key + .authorized_buckets() + .iter() + .find(|x| x.bucket == *bucket) + { + None => now_msec(), + Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()), + }; + key.clear_buckets(); + key.add_bucket(AllowedBucket { + bucket: bucket.clone(), + timestamp, + allow_read, + allow_write, + }) + .unwrap(); + self.garage.key_table.insert(&key).await?; + Ok(()) + } + + async fn handle_launch_repair(self: &Arc, opt: RepairOpt) -> Result { + if !opt.yes { + return Err(Error::BadRequest(format!( + "Please provide the --yes flag to initiate repair operations." + ))); + } + if opt.all_nodes { + let mut opt_to_send = opt.clone(); + opt_to_send.all_nodes = false; + + let mut failures = vec![]; + let ring = self.garage.system.ring.borrow().clone(); + for node in ring.config.members.keys() { + if self + .rpc_client + .call( + *node, + AdminRPC::LaunchRepair(opt_to_send.clone()), + ADMIN_RPC_TIMEOUT, + ) + .await + .is_err() + { + failures.push(node.clone()); + } + } + if failures.is_empty() { + Ok(AdminRPC::Ok(format!("Repair launched on all nodes"))) + } else { + Err(Error::Message(format!( + "Could not launch repair on nodes: {:?} (launched successfully on other nodes)", + failures + ))) + } + } else { + let repair = Repair { + garage: self.garage.clone(), + }; + self.garage + .system + .background + .spawn_worker("Repair worker".into(), move |must_exit| async move { + repair.repair_worker(opt, must_exit).await + }) + .await; + Ok(AdminRPC::Ok(format!( + "Repair launched on {:?}", + self.garage.system.id + ))) + } + } +} diff --git a/src/garage/main.rs b/src/garage/main.rs new file mode 100644 index 00000000..1185871f --- /dev/null +++ b/src/garage/main.rs @@ -0,0 +1,531 @@ +#![recursion_limit = "1024"] + +#[macro_use] +extern crate log; + +mod admin_rpc; +mod repair; +mod server; + +use std::collections::HashSet; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use structopt::StructOpt; + +use garage_util::config::TlsConfig; +use garage_util::data::*; +use garage_util::error::Error; + +use garage_rpc::membership::*; +use garage_rpc::rpc_client::*; + +use admin_rpc::*; + +#[derive(StructOpt, Debug)] +#[structopt(name = "garage")] +pub struct Opt { + /// RPC connect to this host to execute client operations + #[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")] + rpc_host: SocketAddr, + + #[structopt(long = "ca-cert")] + ca_cert: Option, + #[structopt(long = "client-cert")] + client_cert: Option, + #[structopt(long = "client-key")] + client_key: Option, + + #[structopt(subcommand)] + cmd: Command, +} + +#[derive(StructOpt, Debug)] +pub enum Command { + /// Run Garage server + #[structopt(name = "server")] + Server(ServerOpt), + + /// Get network status + #[structopt(name = "status")] + Status, + + /// Garage node operations + #[structopt(name = "node")] + Node(NodeOperation), + + /// Bucket operations + #[structopt(name = "bucket")] + Bucket(BucketOperation), + + /// Key operations + #[structopt(name = "key")] + Key(KeyOperation), + + /// Start repair of node data + #[structopt(name = "repair")] + Repair(RepairOpt), +} + +#[derive(StructOpt, Debug)] +pub struct ServerOpt { + /// Configuration file + #[structopt(short = "c", long = "config", default_value = "./config.toml")] + config_file: PathBuf, +} + +#[derive(StructOpt, Debug)] +pub enum NodeOperation { + /// Configure Garage node + #[structopt(name = "configure")] + Configure(ConfigureNodeOpt), + + /// Remove Garage node from cluster + #[structopt(name = "remove")] + Remove(RemoveNodeOpt), +} + +#[derive(StructOpt, Debug)] +pub struct ConfigureNodeOpt { + /// Node to configure (prefix of hexadecimal node id) + node_id: String, + + /// Location (datacenter) of the node + #[structopt(short = "d", long = "datacenter")] + datacenter: Option, + + /// Number of tokens + #[structopt(short = "n", long = "n-tokens")] + n_tokens: Option, + + /// Optionnal node tag + #[structopt(short = "t", long = "tag")] + tag: Option, +} + +#[derive(StructOpt, Debug)] +pub struct RemoveNodeOpt { + /// Node to configure (prefix of hexadecimal node id) + node_id: String, + + /// If this flag is not given, the node won't be removed + #[structopt(long = "yes")] + yes: bool, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub enum BucketOperation { + /// List buckets + #[structopt(name = "list")] + List, + + /// Get bucket info + #[structopt(name = "info")] + Info(BucketOpt), + + /// Create bucket + #[structopt(name = "create")] + Create(BucketOpt), + + /// Delete bucket + #[structopt(name = "delete")] + Delete(DeleteBucketOpt), + + /// Allow key to read or write to bucket + #[structopt(name = "allow")] + Allow(PermBucketOpt), + + /// Allow key to read or write to bucket + #[structopt(name = "deny")] + Deny(PermBucketOpt), +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct BucketOpt { + /// Bucket name + pub name: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct DeleteBucketOpt { + /// Bucket name + pub name: String, + + /// If this flag is not given, the bucket won't be deleted + #[structopt(long = "yes")] + pub yes: bool, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct PermBucketOpt { + /// Access key ID + #[structopt(long = "key")] + pub key_id: String, + + /// Allow/deny read operations + #[structopt(long = "read")] + pub read: bool, + + /// Allow/deny write operations + #[structopt(long = "write")] + pub write: bool, + + /// Bucket name + pub bucket: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub enum KeyOperation { + /// List keys + #[structopt(name = "list")] + List, + + /// Get key info + #[structopt(name = "info")] + Info(KeyOpt), + + /// Create new key + #[structopt(name = "new")] + New(KeyNewOpt), + + /// Rename key + #[structopt(name = "rename")] + Rename(KeyRenameOpt), + + /// Delete key + #[structopt(name = "delete")] + Delete(KeyDeleteOpt), +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct KeyOpt { + /// ID of the key + key_id: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct KeyNewOpt { + /// Name of the key + #[structopt(long = "name", default_value = "Unnamed key")] + name: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct KeyRenameOpt { + /// ID of the key + key_id: String, + + /// New name of the key + new_name: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct KeyDeleteOpt { + /// ID of the key + key_id: String, + + /// Confirm deletion + #[structopt(long = "yes")] + yes: bool, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +pub struct RepairOpt { + /// Launch repair operation on all nodes + #[structopt(short = "a", long = "all-nodes")] + pub all_nodes: bool, + + /// Confirm the launch of the repair operation + #[structopt(long = "yes")] + pub yes: bool, + + #[structopt(subcommand)] + pub what: Option, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum RepairWhat { + /// Only do a full sync of metadata tables + #[structopt(name = "tables")] + Tables, + /// Only repair (resync/rebalance) the set of stored blocks + #[structopt(name = "blocks")] + Blocks, + /// Only redo the propagation of object deletions to the version table (slow) + #[structopt(name = "versions")] + Versions, + /// Only redo the propagation of version deletions to the block ref table (extremely slow) + #[structopt(name = "block_refs")] + BlockRefs, +} + +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + + let opt = Opt::from_args(); + + let tls_config = match (opt.ca_cert, opt.client_cert, opt.client_key) { + (Some(ca_cert), Some(client_cert), Some(client_key)) => Some(TlsConfig { + ca_cert, + node_cert: client_cert, + node_key: client_key, + }), + (None, None, None) => None, + _ => { + warn!("Missing one of: --ca-cert, --node-cert, --node-key. Not using TLS."); + None + } + }; + + let rpc_http_cli = + Arc::new(RpcHttpClient::new(8, &tls_config).expect("Could not create RPC client")); + let membership_rpc_cli = + RpcAddrClient::new(rpc_http_cli.clone(), MEMBERSHIP_RPC_PATH.to_string()); + let admin_rpc_cli = RpcAddrClient::new(rpc_http_cli.clone(), ADMIN_RPC_PATH.to_string()); + + let resp = match opt.cmd { + Command::Server(server_opt) => { + // Abort on panic (same behavior as in Go) + std::panic::set_hook(Box::new(|panic_info| { + error!("{}", panic_info.to_string()); + std::process::abort(); + })); + + server::run_server(server_opt.config_file).await + } + Command::Status => cmd_status(membership_rpc_cli, opt.rpc_host).await, + Command::Node(NodeOperation::Configure(configure_opt)) => { + cmd_configure(membership_rpc_cli, opt.rpc_host, configure_opt).await + } + Command::Node(NodeOperation::Remove(remove_opt)) => { + cmd_remove(membership_rpc_cli, opt.rpc_host, remove_opt).await + } + Command::Bucket(bo) => { + cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::BucketOperation(bo)).await + } + Command::Key(bo) => { + cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::KeyOperation(bo)).await + } + Command::Repair(ro) => { + cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::LaunchRepair(ro)).await + } + }; + + if let Err(e) = resp { + error!("Error: {}", e); + } +} + +async fn cmd_status(rpc_cli: RpcAddrClient, rpc_host: SocketAddr) -> Result<(), Error> { + let status = match rpc_cli + .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT) + .await?? + { + Message::AdvertiseNodesUp(nodes) => nodes, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + let config = match rpc_cli + .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) + .await?? + { + Message::AdvertiseConfig(cfg) => cfg, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + println!("Healthy nodes:"); + for adv in status.iter().filter(|x| x.is_up) { + if let Some(cfg) = config.members.get(&adv.id) { + println!( + "{:?}\t{}\t{}\t[{}]\t{}\t{}", + adv.id, adv.state_info.hostname, adv.addr, cfg.tag, cfg.datacenter, cfg.n_tokens + ); + } else { + println!( + "{:?}\t{}\t{}\tUNCONFIGURED/REMOVED", + adv.id, adv.state_info.hostname, adv.addr + ); + } + } + + let status_keys = status.iter().map(|x| x.id).collect::>(); + let failure_case_1 = status.iter().any(|x| !x.is_up); + let failure_case_2 = config + .members + .iter() + .any(|(id, _)| !status_keys.contains(id)); + if failure_case_1 || failure_case_2 { + println!("\nFailed nodes:"); + for adv in status.iter().filter(|x| !x.is_up) { + if let Some(cfg) = config.members.get(&adv.id) { + println!( + "{:?}\t{}\t{}\t[{}]\t{}\t{}\tlast seen: {}s ago", + adv.id, + adv.state_info.hostname, + adv.addr, + cfg.tag, + cfg.datacenter, + cfg.n_tokens, + (now_msec() - adv.last_seen) / 1000, + ); + } + } + for (id, cfg) in config.members.iter() { + if !status.iter().any(|x| x.id == *id) { + println!( + "{:?}\t{}\t{}\t{}\tnever seen", + id, cfg.tag, cfg.datacenter, cfg.n_tokens + ); + } + } + } + + Ok(()) +} + +async fn cmd_configure( + rpc_cli: RpcAddrClient, + rpc_host: SocketAddr, + args: ConfigureNodeOpt, +) -> Result<(), Error> { + let status = match rpc_cli + .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT) + .await?? + { + Message::AdvertiseNodesUp(nodes) => nodes, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + let mut candidates = vec![]; + for adv in status.iter() { + if hex::encode(&adv.id).starts_with(&args.node_id) { + candidates.push(adv.id); + } + } + if candidates.len() != 1 { + return Err(Error::Message(format!( + "{} matching nodes", + candidates.len() + ))); + } + + let mut config = match rpc_cli + .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) + .await?? + { + Message::AdvertiseConfig(cfg) => cfg, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + let new_entry = match config.members.get(&candidates[0]) { + None => NetworkConfigEntry { + datacenter: args + .datacenter + .expect("Please specifiy a datacenter with the -d flag"), + n_tokens: args + .n_tokens + .expect("Please specifiy a number of tokens with the -n flag"), + tag: args.tag.unwrap_or("".to_string()), + }, + Some(old) => NetworkConfigEntry { + datacenter: args.datacenter.unwrap_or(old.datacenter.to_string()), + n_tokens: args.n_tokens.unwrap_or(old.n_tokens), + tag: args.tag.unwrap_or(old.tag.to_string()), + }, + }; + + config.members.insert(candidates[0].clone(), new_entry); + config.version += 1; + + rpc_cli + .call( + &rpc_host, + &Message::AdvertiseConfig(config), + ADMIN_RPC_TIMEOUT, + ) + .await??; + Ok(()) +} + +async fn cmd_remove( + rpc_cli: RpcAddrClient, + rpc_host: SocketAddr, + args: RemoveNodeOpt, +) -> Result<(), Error> { + let mut config = match rpc_cli + .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) + .await?? + { + Message::AdvertiseConfig(cfg) => cfg, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + let mut candidates = vec![]; + for (key, _) in config.members.iter() { + if hex::encode(key).starts_with(&args.node_id) { + candidates.push(*key); + } + } + if candidates.len() != 1 { + return Err(Error::Message(format!( + "{} matching nodes", + candidates.len() + ))); + } + + if !args.yes { + return Err(Error::Message(format!( + "Add the flag --yes to really remove {:?} from the cluster", + candidates[0] + ))); + } + + config.members.remove(&candidates[0]); + config.version += 1; + + rpc_cli + .call( + &rpc_host, + &Message::AdvertiseConfig(config), + ADMIN_RPC_TIMEOUT, + ) + .await??; + Ok(()) +} + +async fn cmd_admin( + rpc_cli: RpcAddrClient, + rpc_host: SocketAddr, + args: AdminRPC, +) -> Result<(), Error> { + match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await?? { + AdminRPC::Ok(msg) => { + println!("{}", msg); + } + AdminRPC::BucketList(bl) => { + println!("List of buckets:"); + for bucket in bl { + println!("{}", bucket); + } + } + AdminRPC::BucketInfo(bucket) => { + println!("{:?}", bucket); + } + AdminRPC::KeyList(kl) => { + println!("List of keys:"); + for key in kl { + println!("{}\t{}", key.0, key.1); + } + } + AdminRPC::KeyInfo(key) => { + println!("{:?}", key); + } + r => { + error!("Unexpected response: {:?}", r); + } + } + Ok(()) +} diff --git a/src/garage/repair.rs b/src/garage/repair.rs new file mode 100644 index 00000000..4efb9e84 --- /dev/null +++ b/src/garage/repair.rs @@ -0,0 +1,183 @@ +use std::sync::Arc; + +use tokio::sync::watch; + +use garage_core::block_ref_table::*; +use garage_core::garage::Garage; +use garage_core::version_table::*; +use garage_table::*; +use garage_util::error::Error; + +use crate::*; + +pub struct Repair { + pub garage: Arc, +} + +impl Repair { + pub async fn repair_worker( + &self, + opt: RepairOpt, + must_exit: watch::Receiver, + ) -> Result<(), Error> { + let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true); + + if todo(RepairWhat::Tables) { + info!("Launching a full sync of tables"); + self.garage + .bucket_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + self.garage + .object_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + self.garage + .version_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + self.garage + .block_ref_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + } + + // TODO: wait for full sync to finish before proceeding to the rest? + + if todo(RepairWhat::Versions) { + info!("Repairing the versions table"); + self.repair_versions(&must_exit).await?; + } + + if todo(RepairWhat::BlockRefs) { + info!("Repairing the block refs table"); + self.repair_block_ref(&must_exit).await?; + } + + if opt.what.is_none() { + info!("Repairing the RC"); + self.repair_rc(&must_exit).await?; + } + + if todo(RepairWhat::Blocks) { + info!("Repairing the stored blocks"); + self.garage + .block_manager + .repair_data_store(&must_exit) + .await?; + } + + Ok(()) + } + + async fn repair_versions(&self, must_exit: &watch::Receiver) -> Result<(), Error> { + let mut pos = vec![]; + + while let Some((item_key, item_bytes)) = self.garage.version_table.store.get_gt(&pos)? { + pos = item_key.to_vec(); + + let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?; + if version.deleted { + continue; + } + let object = self + .garage + .object_table + .get(&version.bucket, &version.key) + .await?; + let version_exists = match object { + Some(o) => o.versions().iter().any(|x| x.uuid == version.uuid), + None => { + warn!( + "Repair versions: object for version {:?} not found", + version + ); + false + } + }; + if !version_exists { + info!("Repair versions: marking version as deleted: {:?}", version); + self.garage + .version_table + .insert(&Version::new( + version.uuid, + version.bucket, + version.key, + true, + vec![], + )) + .await?; + } + + if *must_exit.borrow() { + break; + } + } + Ok(()) + } + + async fn repair_block_ref(&self, must_exit: &watch::Receiver) -> Result<(), Error> { + let mut pos = vec![]; + + while let Some((item_key, item_bytes)) = self.garage.block_ref_table.store.get_gt(&pos)? { + pos = item_key.to_vec(); + + let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?; + if block_ref.deleted { + continue; + } + let version = self + .garage + .version_table + .get(&block_ref.version, &EmptyKey) + .await?; + let ref_exists = match version { + Some(v) => !v.deleted, + None => { + warn!( + "Block ref repair: version for block ref {:?} not found", + block_ref + ); + false + } + }; + if !ref_exists { + info!( + "Repair block ref: marking block_ref as deleted: {:?}", + block_ref + ); + self.garage + .block_ref_table + .insert(&BlockRef { + block: block_ref.block, + version: block_ref.version, + deleted: true, + }) + .await?; + } + + if *must_exit.borrow() { + break; + } + } + Ok(()) + } + + async fn repair_rc(&self, _must_exit: &watch::Receiver) -> Result<(), Error> { + // TODO + warn!("repair_rc: not implemented"); + Ok(()) + } +} diff --git a/src/garage/server.rs b/src/garage/server.rs new file mode 100644 index 00000000..52d03464 --- /dev/null +++ b/src/garage/server.rs @@ -0,0 +1,87 @@ +use std::path::PathBuf; +use std::sync::Arc; + +use futures_util::future::*; +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::config::*; +use garage_util::error::Error; + +use garage_api::api_server; +use garage_core::garage::Garage; +use garage_rpc::rpc_server::RpcServer; + +use crate::admin_rpc::*; + +async fn shutdown_signal(send_cancel: watch::Sender) -> Result<(), Error> { + // Wait for the CTRL+C signal + tokio::signal::ctrl_c() + .await + .expect("failed to install CTRL+C signal handler"); + info!("Received CTRL+C, shutting down."); + send_cancel.broadcast(true)?; + Ok(()) +} + +async fn wait_from(mut chan: watch::Receiver) -> () { + while let Some(exit_now) = chan.recv().await { + if exit_now { + return; + } + } +} + +pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { + info!("Loading configuration..."); + let config = read_config(config_file).expect("Unable to read config file"); + + info!("Opening database..."); + let mut db_path = config.metadata_dir.clone(); + db_path.push("db"); + let db = sled::open(db_path).expect("Unable to open DB"); + + info!("Initialize RPC server..."); + let mut rpc_server = RpcServer::new(config.rpc_bind_addr.clone(), config.rpc_tls.clone()); + + info!("Initializing background runner..."); + let (send_cancel, watch_cancel) = watch::channel(false); + let background = BackgroundRunner::new(16, watch_cancel.clone()); + + let garage = Garage::new(config, db, background.clone(), &mut rpc_server).await; + + info!("Crate admin RPC handler..."); + AdminRpcHandler::new(garage.clone()).register_handler(&mut rpc_server); + + info!("Initializing RPC and API servers..."); + let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone())); + let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone())); + + futures::try_join!( + garage + .system + .clone() + .bootstrap(&garage.config.bootstrap_peers[..]) + .map(|rv| { + info!("Bootstrap done"); + Ok(rv) + }), + run_rpc_server.map(|rv| { + info!("RPC server exited"); + rv + }), + api_server.map(|rv| { + info!("API server exited"); + rv + }), + background.run().map(|rv| { + info!("Background runner exited"); + Ok(rv) + }), + shutdown_signal(send_cancel), + )?; + + info!("Cleaning up..."); + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index 2c25aadb..00000000 --- a/src/main.rs +++ /dev/null @@ -1,540 +0,0 @@ -#![recursion_limit = "1024"] - -#[macro_use] -extern crate log; - -mod background; -mod config; -mod data; -mod error; - -mod api; -mod rpc; -mod store; -mod table; - -mod admin_rpc; -mod server; - -use std::collections::HashSet; -use std::net::SocketAddr; -use std::path::PathBuf; -use std::sync::Arc; -use std::time::Duration; - -use serde::{Deserialize, Serialize}; -use structopt::StructOpt; - -use config::TlsConfig; -use data::*; -use error::Error; - -use rpc::membership::*; -use rpc::rpc_client::*; - -use admin_rpc::*; - -#[derive(StructOpt, Debug)] -#[structopt(name = "garage")] -pub struct Opt { - /// RPC connect to this host to execute client operations - #[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")] - rpc_host: SocketAddr, - - #[structopt(long = "ca-cert")] - ca_cert: Option, - #[structopt(long = "client-cert")] - client_cert: Option, - #[structopt(long = "client-key")] - client_key: Option, - - #[structopt(subcommand)] - cmd: Command, -} - -#[derive(StructOpt, Debug)] -pub enum Command { - /// Run Garage server - #[structopt(name = "server")] - Server(ServerOpt), - - /// Get network status - #[structopt(name = "status")] - Status, - - /// Garage node operations - #[structopt(name = "node")] - Node(NodeOperation), - - /// Bucket operations - #[structopt(name = "bucket")] - Bucket(BucketOperation), - - /// Key operations - #[structopt(name = "key")] - Key(KeyOperation), - - /// Start repair of node data - #[structopt(name = "repair")] - Repair(RepairOpt), -} - -#[derive(StructOpt, Debug)] -pub struct ServerOpt { - /// Configuration file - #[structopt(short = "c", long = "config", default_value = "./config.toml")] - config_file: PathBuf, -} - -#[derive(StructOpt, Debug)] -pub enum NodeOperation { - /// Configure Garage node - #[structopt(name = "configure")] - Configure(ConfigureNodeOpt), - - /// Remove Garage node from cluster - #[structopt(name = "remove")] - Remove(RemoveNodeOpt), -} - -#[derive(StructOpt, Debug)] -pub struct ConfigureNodeOpt { - /// Node to configure (prefix of hexadecimal node id) - node_id: String, - - /// Location (datacenter) of the node - #[structopt(short = "d", long = "datacenter")] - datacenter: Option, - - /// Number of tokens - #[structopt(short = "n", long = "n-tokens")] - n_tokens: Option, - - /// Optionnal node tag - #[structopt(short = "t", long = "tag")] - tag: Option, -} - -#[derive(StructOpt, Debug)] -pub struct RemoveNodeOpt { - /// Node to configure (prefix of hexadecimal node id) - node_id: String, - - /// If this flag is not given, the node won't be removed - #[structopt(long = "yes")] - yes: bool, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub enum BucketOperation { - /// List buckets - #[structopt(name = "list")] - List, - - /// Get bucket info - #[structopt(name = "info")] - Info(BucketOpt), - - /// Create bucket - #[structopt(name = "create")] - Create(BucketOpt), - - /// Delete bucket - #[structopt(name = "delete")] - Delete(DeleteBucketOpt), - - /// Allow key to read or write to bucket - #[structopt(name = "allow")] - Allow(PermBucketOpt), - - /// Allow key to read or write to bucket - #[structopt(name = "deny")] - Deny(PermBucketOpt), -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct BucketOpt { - /// Bucket name - pub name: String, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct DeleteBucketOpt { - /// Bucket name - pub name: String, - - /// If this flag is not given, the bucket won't be deleted - #[structopt(long = "yes")] - pub yes: bool, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct PermBucketOpt { - /// Access key ID - #[structopt(long = "key")] - pub key_id: String, - - /// Allow/deny read operations - #[structopt(long = "read")] - pub read: bool, - - /// Allow/deny write operations - #[structopt(long = "write")] - pub write: bool, - - /// Bucket name - pub bucket: String, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub enum KeyOperation { - /// List keys - #[structopt(name = "list")] - List, - - /// Get key info - #[structopt(name = "info")] - Info(KeyOpt), - - /// Create new key - #[structopt(name = "new")] - New(KeyNewOpt), - - /// Rename key - #[structopt(name = "rename")] - Rename(KeyRenameOpt), - - /// Delete key - #[structopt(name = "delete")] - Delete(KeyDeleteOpt), -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct KeyOpt { - /// ID of the key - key_id: String, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct KeyNewOpt { - /// Name of the key - #[structopt(long = "name", default_value = "Unnamed key")] - name: String, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct KeyRenameOpt { - /// ID of the key - key_id: String, - - /// New name of the key - new_name: String, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug)] -pub struct KeyDeleteOpt { - /// ID of the key - key_id: String, - - /// Confirm deletion - #[structopt(long = "yes")] - yes: bool, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] -pub struct RepairOpt { - /// Launch repair operation on all nodes - #[structopt(short = "a", long = "all-nodes")] - pub all_nodes: bool, - - /// Confirm the launch of the repair operation - #[structopt(long = "yes")] - pub yes: bool, - - #[structopt(subcommand)] - pub what: Option, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] -pub enum RepairWhat { - /// Only do a full sync of metadata tables - #[structopt(name = "tables")] - Tables, - /// Only repair (resync/rebalance) the set of stored blocks - #[structopt(name = "blocks")] - Blocks, - /// Only redo the propagation of object deletions to the version table (slow) - #[structopt(name = "versions")] - Versions, - /// Only redo the propagation of version deletions to the block ref table (extremely slow) - #[structopt(name = "block_refs")] - BlockRefs, -} - -#[tokio::main] -async fn main() { - pretty_env_logger::init(); - - let opt = Opt::from_args(); - - let tls_config = match (opt.ca_cert, opt.client_cert, opt.client_key) { - (Some(ca_cert), Some(client_cert), Some(client_key)) => Some(TlsConfig { - ca_cert, - node_cert: client_cert, - node_key: client_key, - }), - (None, None, None) => None, - _ => { - warn!("Missing one of: --ca-cert, --node-cert, --node-key. Not using TLS."); - None - } - }; - - let rpc_http_cli = - Arc::new(RpcHttpClient::new(8, &tls_config).expect("Could not create RPC client")); - let membership_rpc_cli = - RpcAddrClient::new(rpc_http_cli.clone(), MEMBERSHIP_RPC_PATH.to_string()); - let admin_rpc_cli = RpcAddrClient::new(rpc_http_cli.clone(), ADMIN_RPC_PATH.to_string()); - - let resp = match opt.cmd { - Command::Server(server_opt) => { - // Abort on panic (same behavior as in Go) - std::panic::set_hook(Box::new(|panic_info| { - error!("{}", panic_info.to_string()); - std::process::abort(); - })); - - server::run_server(server_opt.config_file).await - } - Command::Status => cmd_status(membership_rpc_cli, opt.rpc_host).await, - Command::Node(NodeOperation::Configure(configure_opt)) => { - cmd_configure(membership_rpc_cli, opt.rpc_host, configure_opt).await - } - Command::Node(NodeOperation::Remove(remove_opt)) => { - cmd_remove(membership_rpc_cli, opt.rpc_host, remove_opt).await - } - Command::Bucket(bo) => { - cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::BucketOperation(bo)).await - } - Command::Key(bo) => { - cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::KeyOperation(bo)).await - } - Command::Repair(ro) => { - cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::LaunchRepair(ro)).await - } - }; - - if let Err(e) = resp { - error!("Error: {}", e); - } -} - -async fn cmd_status(rpc_cli: RpcAddrClient, rpc_host: SocketAddr) -> Result<(), Error> { - let status = match rpc_cli - .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT) - .await?? - { - Message::AdvertiseNodesUp(nodes) => nodes, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - let config = match rpc_cli - .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) - .await?? - { - Message::AdvertiseConfig(cfg) => cfg, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - - println!("Healthy nodes:"); - for adv in status.iter().filter(|x| x.is_up) { - if let Some(cfg) = config.members.get(&adv.id) { - println!( - "{:?}\t{}\t{}\t[{}]\t{}\t{}", - adv.id, adv.state_info.hostname, adv.addr, cfg.tag, cfg.datacenter, cfg.n_tokens - ); - } else { - println!( - "{:?}\t{}\t{}\tUNCONFIGURED/REMOVED", - adv.id, adv.state_info.hostname, adv.addr - ); - } - } - - let status_keys = status.iter().map(|x| x.id).collect::>(); - let failure_case_1 = status.iter().any(|x| !x.is_up); - let failure_case_2 = config - .members - .iter() - .any(|(id, _)| !status_keys.contains(id)); - if failure_case_1 || failure_case_2 { - println!("\nFailed nodes:"); - for adv in status.iter().filter(|x| !x.is_up) { - if let Some(cfg) = config.members.get(&adv.id) { - println!( - "{:?}\t{}\t{}\t[{}]\t{}\t{}\tlast seen: {}s ago", - adv.id, - adv.state_info.hostname, - adv.addr, - cfg.tag, - cfg.datacenter, - cfg.n_tokens, - (now_msec() - adv.last_seen) / 1000, - ); - } - } - for (id, cfg) in config.members.iter() { - if !status.iter().any(|x| x.id == *id) { - println!( - "{:?}\t{}\t{}\t{}\tnever seen", - id, cfg.tag, cfg.datacenter, cfg.n_tokens - ); - } - } - } - - Ok(()) -} - -async fn cmd_configure( - rpc_cli: RpcAddrClient, - rpc_host: SocketAddr, - args: ConfigureNodeOpt, -) -> Result<(), Error> { - let status = match rpc_cli - .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT) - .await?? - { - Message::AdvertiseNodesUp(nodes) => nodes, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - - let mut candidates = vec![]; - for adv in status.iter() { - if hex::encode(&adv.id).starts_with(&args.node_id) { - candidates.push(adv.id); - } - } - if candidates.len() != 1 { - return Err(Error::Message(format!( - "{} matching nodes", - candidates.len() - ))); - } - - let mut config = match rpc_cli - .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) - .await?? - { - Message::AdvertiseConfig(cfg) => cfg, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - - let new_entry = match config.members.get(&candidates[0]) { - None => NetworkConfigEntry { - datacenter: args - .datacenter - .expect("Please specifiy a datacenter with the -d flag"), - n_tokens: args - .n_tokens - .expect("Please specifiy a number of tokens with the -n flag"), - tag: args.tag.unwrap_or("".to_string()), - }, - Some(old) => NetworkConfigEntry { - datacenter: args.datacenter.unwrap_or(old.datacenter.to_string()), - n_tokens: args.n_tokens.unwrap_or(old.n_tokens), - tag: args.tag.unwrap_or(old.tag.to_string()), - }, - }; - - config.members.insert(candidates[0].clone(), new_entry); - config.version += 1; - - rpc_cli - .call( - &rpc_host, - &Message::AdvertiseConfig(config), - ADMIN_RPC_TIMEOUT, - ) - .await??; - Ok(()) -} - -async fn cmd_remove( - rpc_cli: RpcAddrClient, - rpc_host: SocketAddr, - args: RemoveNodeOpt, -) -> Result<(), Error> { - let mut config = match rpc_cli - .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) - .await?? - { - Message::AdvertiseConfig(cfg) => cfg, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - - let mut candidates = vec![]; - for (key, _) in config.members.iter() { - if hex::encode(key).starts_with(&args.node_id) { - candidates.push(*key); - } - } - if candidates.len() != 1 { - return Err(Error::Message(format!( - "{} matching nodes", - candidates.len() - ))); - } - - if !args.yes { - return Err(Error::Message(format!( - "Add the flag --yes to really remove {:?} from the cluster", - candidates[0] - ))); - } - - config.members.remove(&candidates[0]); - config.version += 1; - - rpc_cli - .call( - &rpc_host, - &Message::AdvertiseConfig(config), - ADMIN_RPC_TIMEOUT, - ) - .await??; - Ok(()) -} - -async fn cmd_admin( - rpc_cli: RpcAddrClient, - rpc_host: SocketAddr, - args: AdminRPC, -) -> Result<(), Error> { - match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await?? { - AdminRPC::Ok(msg) => { - println!("{}", msg); - } - AdminRPC::BucketList(bl) => { - println!("List of buckets:"); - for bucket in bl { - println!("{}", bucket); - } - } - AdminRPC::BucketInfo(bucket) => { - println!("{:?}", bucket); - } - AdminRPC::KeyList(kl) => { - println!("List of keys:"); - for key in kl { - println!("{}\t{}", key.0, key.1); - } - } - AdminRPC::KeyInfo(key) => { - println!("{:?}", key); - } - r => { - error!("Unexpected response: {:?}", r); - } - } - Ok(()) -} diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml new file mode 100644 index 00000000..d7a09255 --- /dev/null +++ b/src/rpc/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "garage_rpc" +version = "0.1.0" +authors = ["Alex Auvolat "] +edition = "2018" + +[lib] +path = "lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +garage_util = { path = "../util" } + +bytes = "0.4" +rand = "0.7" +hex = "0.3" +sha2 = "0.8" +arc-swap = "0.4" +gethostname = "0.2" +log = "0.4" + +rmp-serde = "0.14.3" +serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } + +futures = "0.3" +futures-util = "0.3" +tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } + +http = "0.2" +hyper = "0.13" +rustls = "0.17" +tokio-rustls = "0.13" +hyper-rustls = { version = "0.20", default-features = false } +webpki = "0.21" + + diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs new file mode 100644 index 00000000..3fae6c3e --- /dev/null +++ b/src/rpc/lib.rs @@ -0,0 +1,7 @@ +#[macro_use] +extern crate log; + +pub mod membership; +pub mod rpc_client; +pub mod rpc_server; +pub mod tls_util; diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index e0509536..dcda2c40 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -17,12 +17,12 @@ use tokio::prelude::*; use tokio::sync::watch; use tokio::sync::Mutex; -use crate::background::BackgroundRunner; -use crate::data::*; -use crate::error::Error; +use garage_util::background::BackgroundRunner; +use garage_util::data::*; +use garage_util::error::Error; -use crate::rpc::rpc_client::*; -use crate::rpc::rpc_server::*; +use crate::rpc_client::*; +use crate::rpc_server::*; const PING_INTERVAL: Duration = Duration::from_secs(10); const PING_TIMEOUT: Duration = Duration::from_secs(2); diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs deleted file mode 100644 index 83fd0aac..00000000 --- a/src/rpc/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod membership; -pub mod rpc_client; -pub mod rpc_server; -pub mod tls_util; diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs index 027a3cde..3f943dcc 100644 --- a/src/rpc/rpc_client.rs +++ b/src/rpc/rpc_client.rs @@ -8,7 +8,6 @@ use std::time::Duration; use arc_swap::ArcSwapOption; use bytes::IntoBuf; -use err_derive::Error; use futures::future::Future; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; @@ -17,36 +16,17 @@ use hyper::client::{Client, HttpConnector}; use hyper::{Body, Method, Request}; use tokio::sync::{watch, Semaphore}; -use crate::background::BackgroundRunner; -use crate::data::*; -use crate::error::Error; +use garage_util::background::BackgroundRunner; +use garage_util::config::TlsConfig; +use garage_util::data::*; +use garage_util::error::{Error, RPCError}; -use crate::rpc::membership::Status; -use crate::rpc::rpc_server::RpcMessage; -use crate::rpc::tls_util; - -use crate::config::TlsConfig; +use crate::membership::Status; +use crate::rpc_server::RpcMessage; +use crate::tls_util; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); -#[derive(Debug, Error)] -pub enum RPCError { - #[error(display = "Node is down: {:?}.", _0)] - NodeDown(UUID), - #[error(display = "Timeout: {}", _0)] - Timeout(#[error(source)] tokio::time::Elapsed), - #[error(display = "HTTP error: {}", _0)] - HTTP(#[error(source)] http::Error), - #[error(display = "Hyper error: {}", _0)] - Hyper(#[error(source)] hyper::Error), - #[error(display = "Messagepack encode error: {}", _0)] - RMPEncode(#[error(source)] rmp_serde::encode::Error), - #[error(display = "Messagepack decode error: {}", _0)] - RMPDecode(#[error(source)] rmp_serde::decode::Error), - #[error(display = "Too many errors: {:?}", _0)] - TooManyErrors(Vec), -} - #[derive(Copy, Clone)] pub struct RequestStrategy { pub rs_timeout: Duration, diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs index 4ee53909..4386d733 100644 --- a/src/rpc/rpc_server.rs +++ b/src/rpc/rpc_server.rs @@ -16,11 +16,11 @@ use tokio::net::{TcpListener, TcpStream}; use tokio_rustls::server::TlsStream; use tokio_rustls::TlsAcceptor; -use crate::config::TlsConfig; -use crate::data::*; -use crate::error::Error; +use garage_util::config::TlsConfig; +use garage_util::data::*; +use garage_util::error::Error; -use crate::rpc::tls_util; +use crate::tls_util; pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {} diff --git a/src/rpc/tls_util.rs b/src/rpc/tls_util.rs index 52c52110..36ea7bf3 100644 --- a/src/rpc/tls_util.rs +++ b/src/rpc/tls_util.rs @@ -15,7 +15,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio_rustls::TlsConnector; use webpki::DNSNameRef; -use crate::error::Error; +use garage_util::error::Error; pub fn load_certs(filename: &str) -> Result, Error> { let certfile = fs::File::open(&filename)?; diff --git a/src/server.rs b/src/server.rs deleted file mode 100644 index 0724630a..00000000 --- a/src/server.rs +++ /dev/null @@ -1,247 +0,0 @@ -use std::path::PathBuf; -use std::sync::Arc; - -use futures_util::future::*; -use tokio::sync::watch; - -use crate::background::*; -use crate::config::*; -use crate::error::Error; - -use crate::rpc::membership::System; -use crate::rpc::rpc_client::RpcHttpClient; -use crate::rpc::rpc_server::RpcServer; - -use crate::table::table_fullcopy::*; -use crate::table::table_sharded::*; -use crate::table::*; - -use crate::store::block::*; -use crate::store::block_ref_table::*; -use crate::store::bucket_table::*; -use crate::store::key_table::*; -use crate::store::object_table::*; -use crate::store::version_table::*; - -use crate::api::api_server; - -use crate::admin_rpc::*; - -pub struct Garage { - pub config: Config, - - pub db: sled::Db, - pub background: Arc, - pub system: Arc, - pub block_manager: Arc, - - pub bucket_table: Arc>, - pub key_table: Arc>, - - pub object_table: Arc>, - pub version_table: Arc>, - pub block_ref_table: Arc>, -} - -impl Garage { - pub async fn new( - config: Config, - db: sled::Db, - background: Arc, - rpc_server: &mut RpcServer, - ) -> Arc { - info!("Initialize membership management system..."); - let rpc_http_client = Arc::new( - RpcHttpClient::new(config.max_concurrent_rpc_requests, &config.rpc_tls) - .expect("Could not create RPC client"), - ); - let system = System::new( - config.metadata_dir.clone(), - rpc_http_client, - background.clone(), - rpc_server, - ); - - let data_rep_param = TableShardedReplication { - replication_factor: config.data_replication_factor, - write_quorum: (config.data_replication_factor + 1) / 2, - read_quorum: 1, - }; - - let meta_rep_param = TableShardedReplication { - replication_factor: config.meta_replication_factor, - write_quorum: (config.meta_replication_factor + 1) / 2, - read_quorum: (config.meta_replication_factor + 1) / 2, - }; - - let control_rep_param = TableFullReplication::new( - config.meta_epidemic_factor, - (config.meta_epidemic_factor + 1) / 2, - ); - - info!("Initialize block manager..."); - let block_manager = BlockManager::new( - &db, - config.data_dir.clone(), - data_rep_param.clone(), - system.clone(), - rpc_server, - ); - - info!("Initialize block_ref_table..."); - let block_ref_table = Table::new( - BlockRefTable { - background: background.clone(), - block_manager: block_manager.clone(), - }, - data_rep_param.clone(), - system.clone(), - &db, - "block_ref".to_string(), - rpc_server, - ) - .await; - - info!("Initialize version_table..."); - let version_table = Table::new( - VersionTable { - background: background.clone(), - block_ref_table: block_ref_table.clone(), - }, - meta_rep_param.clone(), - system.clone(), - &db, - "version".to_string(), - rpc_server, - ) - .await; - - info!("Initialize object_table..."); - let object_table = Table::new( - ObjectTable { - background: background.clone(), - version_table: version_table.clone(), - }, - meta_rep_param.clone(), - system.clone(), - &db, - "object".to_string(), - rpc_server, - ) - .await; - - info!("Initialize bucket_table..."); - let bucket_table = Table::new( - BucketTable, - control_rep_param.clone(), - system.clone(), - &db, - "bucket".to_string(), - rpc_server, - ) - .await; - - info!("Initialize key_table_table..."); - let key_table = Table::new( - KeyTable, - control_rep_param.clone(), - system.clone(), - &db, - "key".to_string(), - rpc_server, - ) - .await; - - info!("Initialize Garage..."); - let garage = Arc::new(Self { - config, - db, - system: system.clone(), - block_manager, - background, - bucket_table, - key_table, - object_table, - version_table, - block_ref_table, - }); - - info!("Crate admin RPC handler..."); - AdminRpcHandler::new(garage.clone()).register_handler(rpc_server); - - info!("Start block manager background thread..."); - garage.block_manager.garage.swap(Some(garage.clone())); - garage.block_manager.clone().spawn_background_worker().await; - - garage - } -} - -async fn shutdown_signal(send_cancel: watch::Sender) -> Result<(), Error> { - // Wait for the CTRL+C signal - tokio::signal::ctrl_c() - .await - .expect("failed to install CTRL+C signal handler"); - info!("Received CTRL+C, shutting down."); - send_cancel.broadcast(true)?; - Ok(()) -} - -async fn wait_from(mut chan: watch::Receiver) -> () { - while let Some(exit_now) = chan.recv().await { - if exit_now { - return; - } - } -} - -pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { - info!("Loading configuration..."); - let config = read_config(config_file).expect("Unable to read config file"); - - info!("Opening database..."); - let mut db_path = config.metadata_dir.clone(); - db_path.push("db"); - let db = sled::open(db_path).expect("Unable to open DB"); - - info!("Initialize RPC server..."); - let mut rpc_server = RpcServer::new(config.rpc_bind_addr.clone(), config.rpc_tls.clone()); - - info!("Initializing background runner..."); - let (send_cancel, watch_cancel) = watch::channel(false); - let background = BackgroundRunner::new(16, watch_cancel.clone()); - - let garage = Garage::new(config, db, background.clone(), &mut rpc_server).await; - - info!("Initializing RPC and API servers..."); - let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone())); - let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone())); - - futures::try_join!( - garage - .system - .clone() - .bootstrap(&garage.config.bootstrap_peers[..]) - .map(|rv| { - info!("Bootstrap done"); - Ok(rv) - }), - run_rpc_server.map(|rv| { - info!("RPC server exited"); - rv - }), - api_server.map(|rv| { - info!("API server exited"); - rv - }), - background.run().map(|rv| { - info!("Background runner exited"); - Ok(rv) - }), - shutdown_signal(send_cancel), - )?; - - info!("Cleaning up..."); - - Ok(()) -} diff --git a/src/store/block.rs b/src/store/block.rs deleted file mode 100644 index e2ef32e0..00000000 --- a/src/store/block.rs +++ /dev/null @@ -1,506 +0,0 @@ -use std::path::PathBuf; -use std::sync::Arc; -use std::time::Duration; - -use arc_swap::ArcSwapOption; -use futures::future::*; -use futures::select; -use futures::stream::*; -use serde::{Deserialize, Serialize}; -use tokio::fs; -use tokio::prelude::*; -use tokio::sync::{watch, Mutex, Notify}; - -use crate::data; -use crate::data::*; -use crate::error::Error; - -use crate::rpc::membership::System; -use crate::rpc::rpc_client::*; -use crate::rpc::rpc_server::*; - -use crate::table::table_sharded::TableShardedReplication; -use crate::table::TableReplication; - -use crate::store::block_ref_table::*; - -use crate::server::Garage; - -pub const INLINE_THRESHOLD: usize = 3072; - -const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42); -const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); -const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); - -#[derive(Debug, Serialize, Deserialize)] -pub enum Message { - Ok, - GetBlock(Hash), - PutBlock(PutBlockMessage), - NeedBlockQuery(Hash), - NeedBlockReply(bool), -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct PutBlockMessage { - pub hash: Hash, - - #[serde(with = "serde_bytes")] - pub data: Vec, -} - -impl RpcMessage for Message {} - -pub struct BlockManager { - pub replication: TableShardedReplication, - pub data_dir: PathBuf, - pub data_dir_lock: Mutex<()>, - - pub rc: sled::Tree, - - pub resync_queue: sled::Tree, - pub resync_notify: Notify, - - pub system: Arc, - rpc_client: Arc>, - pub garage: ArcSwapOption, -} - -impl BlockManager { - pub fn new( - db: &sled::Db, - data_dir: PathBuf, - replication: TableShardedReplication, - system: Arc, - rpc_server: &mut RpcServer, - ) -> Arc { - let rc = db - .open_tree("block_local_rc") - .expect("Unable to open block_local_rc tree"); - rc.set_merge_operator(rc_merge); - - let resync_queue = db - .open_tree("block_local_resync_queue") - .expect("Unable to open block_local_resync_queue tree"); - - let rpc_path = "block_manager"; - let rpc_client = system.rpc_client::(rpc_path); - - let block_manager = Arc::new(Self { - replication, - data_dir, - data_dir_lock: Mutex::new(()), - rc, - resync_queue, - resync_notify: Notify::new(), - system, - rpc_client, - garage: ArcSwapOption::from(None), - }); - block_manager - .clone() - .register_handler(rpc_server, rpc_path.into()); - block_manager - } - - fn register_handler(self: Arc, rpc_server: &mut RpcServer, path: String) { - let self2 = self.clone(); - rpc_server.add_handler::(path, move |msg, _addr| { - let self2 = self2.clone(); - async move { self2.handle(&msg).await } - }); - - let self2 = self.clone(); - self.rpc_client - .set_local_handler(self.system.id, move |msg| { - let self2 = self2.clone(); - async move { self2.handle(&msg).await } - }); - } - - async fn handle(self: Arc, msg: &Message) -> Result { - match msg { - Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await, - Message::GetBlock(h) => self.read_block(h).await, - Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply), - _ => Err(Error::BadRequest(format!("Unexpected RPC message"))), - } - } - - pub async fn spawn_background_worker(self: Arc) { - // Launch 2 simultaneous workers for background resync loop preprocessing - for i in 0..2usize { - let bm2 = self.clone(); - let background = self.system.background.clone(); - tokio::spawn(async move { - tokio::time::delay_for(Duration::from_secs(10)).await; - background - .spawn_worker(format!("block resync worker {}", i), move |must_exit| { - bm2.resync_loop(must_exit) - }) - .await; - }); - } - } - - pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result { - let _lock = self.data_dir_lock.lock().await; - - let mut path = self.block_dir(hash); - fs::create_dir_all(&path).await?; - - path.push(hex::encode(hash)); - if fs::metadata(&path).await.is_ok() { - return Ok(Message::Ok); - } - - let mut f = fs::File::create(path).await?; - f.write_all(data).await?; - drop(f); - - Ok(Message::Ok) - } - - pub async fn read_block(&self, hash: &Hash) -> Result { - let path = self.block_path(hash); - - let mut f = match fs::File::open(&path).await { - Ok(f) => f, - Err(e) => { - // Not found but maybe we should have had it ?? - self.put_to_resync(hash, 0)?; - return Err(Into::into(e)); - } - }; - let mut data = vec![]; - f.read_to_end(&mut data).await?; - drop(f); - - if data::hash(&data[..]) != *hash { - let _lock = self.data_dir_lock.lock().await; - warn!("Block {:?} is corrupted. Deleting and resyncing.", hash); - fs::remove_file(path).await?; - self.put_to_resync(&hash, 0)?; - return Err(Error::CorruptData(*hash)); - } - - Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) - } - - pub async fn need_block(&self, hash: &Hash) -> Result { - let needed = self - .rc - .get(hash.as_ref())? - .map(|x| u64_from_bytes(x.as_ref()) > 0) - .unwrap_or(false); - if needed { - let path = self.block_path(hash); - let exists = fs::metadata(&path).await.is_ok(); - Ok(!exists) - } else { - Ok(false) - } - } - - fn block_dir(&self, hash: &Hash) -> PathBuf { - let mut path = self.data_dir.clone(); - path.push(hex::encode(&hash.as_slice()[0..1])); - path.push(hex::encode(&hash.as_slice()[1..2])); - path - } - fn block_path(&self, hash: &Hash) -> PathBuf { - let mut path = self.block_dir(hash); - path.push(hex::encode(hash.as_ref())); - path - } - - pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { - let old_rc = self.rc.get(&hash)?; - self.rc.merge(&hash, vec![1])?; - if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { - self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?; - } - Ok(()) - } - - pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { - let new_rc = self.rc.merge(&hash, vec![0])?; - if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { - self.put_to_resync(&hash, 0)?; - } - Ok(()) - } - - fn put_to_resync(&self, hash: &Hash, delay_millis: u64) -> Result<(), Error> { - let when = now_msec() + delay_millis; - trace!("Put resync_queue: {} {:?}", when, hash); - let mut key = u64::to_be_bytes(when).to_vec(); - key.extend(hash.as_ref()); - self.resync_queue.insert(key, hash.as_ref())?; - self.resync_notify.notify(); - Ok(()) - } - - async fn resync_loop( - self: Arc, - mut must_exit: watch::Receiver, - ) -> Result<(), Error> { - let mut n_failures = 0usize; - while !*must_exit.borrow() { - if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { - let time_msec = u64_from_bytes(&time_bytes[0..8]); - let now = now_msec(); - if now >= time_msec { - let mut hash = [0u8; 32]; - hash.copy_from_slice(hash_bytes.as_ref()); - let hash = Hash::from(hash); - - if let Err(e) = self.resync_iter(&hash).await { - warn!("Failed to resync block {:?}, retrying later: {}", hash, e); - self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?; - n_failures += 1; - if n_failures >= 10 { - warn!("Too many resync failures, throttling."); - tokio::time::delay_for(Duration::from_secs(1)).await; - } - } else { - n_failures = 0; - } - } else { - self.resync_queue.insert(time_bytes, hash_bytes)?; - let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now)); - select! { - _ = delay.fuse() => (), - _ = self.resync_notify.notified().fuse() => (), - _ = must_exit.recv().fuse() => (), - } - } - } else { - select! { - _ = self.resync_notify.notified().fuse() => (), - _ = must_exit.recv().fuse() => (), - } - } - } - Ok(()) - } - - async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> { - let path = self.block_path(hash); - - let exists = fs::metadata(&path).await.is_ok(); - let needed = self - .rc - .get(hash.as_ref())? - .map(|x| u64_from_bytes(x.as_ref()) > 0) - .unwrap_or(false); - - if exists != needed { - info!( - "Resync block {:?}: exists {}, needed {}", - hash, exists, needed - ); - } - - if exists && !needed { - let garage = self.garage.load_full().unwrap(); - let active_refs = garage - .block_ref_table - .get_range(&hash, None, Some(()), 1) - .await?; - let needed_by_others = !active_refs.is_empty(); - if needed_by_others { - let ring = self.system.ring.borrow().clone(); - let who = self.replication.replication_nodes(&hash, &ring); - let msg = Arc::new(Message::NeedBlockQuery(*hash)); - let who_needs_fut = who.iter().map(|to| { - self.rpc_client - .call_arc(*to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT) - }); - let who_needs = join_all(who_needs_fut).await; - - let mut need_nodes = vec![]; - for (node, needed) in who.into_iter().zip(who_needs.iter()) { - match needed { - Ok(Message::NeedBlockReply(needed)) => { - if *needed { - need_nodes.push(node); - } - } - Err(e) => { - return Err(Error::Message(format!( - "Should delete block, but unable to confirm that all other nodes that need it have it: {}", - e - ))); - } - Ok(_) => { - return Err(Error::Message(format!( - "Unexpected response to NeedBlockQuery RPC" - ))); - } - } - } - - if need_nodes.len() > 0 { - let put_block_message = self.read_block(hash).await?; - self.rpc_client - .try_call_many( - &need_nodes[..], - put_block_message, - RequestStrategy::with_quorum(need_nodes.len()) - .with_timeout(BLOCK_RW_TIMEOUT), - ) - .await?; - } - } - fs::remove_file(path).await?; - self.resync_queue.remove(&hash)?; - } - - if needed && !exists { - // TODO find a way to not do this if they are sending it to us - // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay - // between the RC being incremented and this part being called. - let block_data = self.rpc_get_block(&hash).await?; - self.write_block(hash, &block_data[..]).await?; - } - - Ok(()) - } - - pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { - let who = self.replication.read_nodes(&hash, &self.system); - let resps = self - .rpc_client - .try_call_many( - &who[..], - Message::GetBlock(*hash), - RequestStrategy::with_quorum(1) - .with_timeout(BLOCK_RW_TIMEOUT) - .interrupt_after_quorum(true), - ) - .await?; - - for resp in resps { - if let Message::PutBlock(msg) = resp { - return Ok(msg.data); - } - } - Err(Error::Message(format!( - "Unable to read block {:?}: no valid blocks returned", - hash - ))) - } - - pub async fn rpc_put_block(&self, hash: Hash, data: Vec) -> Result<(), Error> { - let who = self.replication.write_nodes(&hash, &self.system); - self.rpc_client - .try_call_many( - &who[..], - Message::PutBlock(PutBlockMessage { hash, data }), - RequestStrategy::with_quorum(self.replication.write_quorum()) - .with_timeout(BLOCK_RW_TIMEOUT), - ) - .await?; - Ok(()) - } - - pub async fn repair_data_store(&self, must_exit: &watch::Receiver) -> Result<(), Error> { - // 1. Repair blocks from RC table - let garage = self.garage.load_full().unwrap(); - let mut last_hash = None; - let mut i = 0usize; - for entry in garage.block_ref_table.store.iter() { - let (_k, v_bytes) = entry?; - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(v_bytes.as_ref())?; - if Some(&block_ref.block) == last_hash.as_ref() { - continue; - } - if !block_ref.deleted { - last_hash = Some(block_ref.block); - self.put_to_resync(&block_ref.block, 0)?; - } - i += 1; - if i & 0xFF == 0 && *must_exit.borrow() { - return Ok(()); - } - } - - // 2. Repair blocks actually on disk - let mut ls_data_dir = fs::read_dir(&self.data_dir).await?; - while let Some(data_dir_ent) = ls_data_dir.next().await { - let data_dir_ent = data_dir_ent?; - let dir_name = data_dir_ent.file_name(); - let dir_name = match dir_name.into_string() { - Ok(x) => x, - Err(_) => continue, - }; - if dir_name.len() != 2 || hex::decode(&dir_name).is_err() { - continue; - } - - let mut ls_data_dir_2 = match fs::read_dir(data_dir_ent.path()).await { - Err(e) => { - warn!( - "Warning: could not list dir {:?}: {}", - data_dir_ent.path().to_str(), - e - ); - continue; - } - Ok(x) => x, - }; - while let Some(file) = ls_data_dir_2.next().await { - let file = file?; - let file_name = file.file_name(); - let file_name = match file_name.into_string() { - Ok(x) => x, - Err(_) => continue, - }; - if file_name.len() != 64 { - continue; - } - let hash_bytes = match hex::decode(&file_name) { - Ok(h) => h, - Err(_) => continue, - }; - let mut hash = [0u8; 32]; - hash.copy_from_slice(&hash_bytes[..]); - self.put_to_resync(&hash.into(), 0)?; - - if *must_exit.borrow() { - return Ok(()); - } - } - } - Ok(()) - } -} - -fn u64_from_bytes(bytes: &[u8]) -> u64 { - assert!(bytes.len() == 8); - let mut x8 = [0u8; 8]; - x8.copy_from_slice(bytes); - u64::from_be_bytes(x8) -} - -fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option> { - let old = old.map(u64_from_bytes).unwrap_or(0); - assert!(new.len() == 1); - let new = match new[0] { - 0 => { - if old > 0 { - old - 1 - } else { - 0 - } - } - 1 => old + 1, - _ => unreachable!(), - }; - if new == 0 { - None - } else { - Some(u64::to_be_bytes(new).to_vec()) - } -} diff --git a/src/store/block_ref_table.rs b/src/store/block_ref_table.rs deleted file mode 100644 index c8a2a2a1..00000000 --- a/src/store/block_ref_table.rs +++ /dev/null @@ -1,68 +0,0 @@ -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; - -use crate::background::*; -use crate::data::*; -use crate::error::Error; - -use crate::table::*; - -use crate::store::block::*; - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct BlockRef { - // Primary key - pub block: Hash, - - // Sort key - pub version: UUID, - - // Keep track of deleted status - pub deleted: bool, -} - -impl Entry for BlockRef { - fn partition_key(&self) -> &Hash { - &self.block - } - fn sort_key(&self) -> &UUID { - &self.version - } - - fn merge(&mut self, other: &Self) { - if other.deleted { - self.deleted = true; - } - } -} - -pub struct BlockRefTable { - pub background: Arc, - pub block_manager: Arc, -} - -#[async_trait] -impl TableSchema for BlockRefTable { - type P = Hash; - type S = UUID; - type E = BlockRef; - type Filter = (); - - async fn updated(&self, old: Option, new: Option) -> Result<(), Error> { - let block = &old.as_ref().or(new.as_ref()).unwrap().block; - let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false); - let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false); - if is_after && !was_before { - self.block_manager.block_incref(block)?; - } - if was_before && !is_after { - self.block_manager.block_decref(block)?; - } - Ok(()) - } - - fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { - !entry.deleted - } -} diff --git a/src/store/bucket_table.rs b/src/store/bucket_table.rs deleted file mode 100644 index a9bdaa70..00000000 --- a/src/store/bucket_table.rs +++ /dev/null @@ -1,121 +0,0 @@ -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; - -use crate::error::Error; -use crate::table::*; - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct Bucket { - // Primary key - pub name: String, - - // Timestamp and deletion - // Upon version increment, all info is replaced - pub timestamp: u64, - pub deleted: bool, - - // Authorized keys - authorized_keys: Vec, -} - -impl Bucket { - pub fn new( - name: String, - timestamp: u64, - deleted: bool, - authorized_keys: Vec, - ) -> Self { - let mut ret = Bucket { - name, - timestamp, - deleted, - authorized_keys: vec![], - }; - for key in authorized_keys { - ret.add_key(key) - .expect("Duplicate AllowedKey in Bucket constructor"); - } - ret - } - /// Add a key only if it is not already present - pub fn add_key(&mut self, key: AllowedKey) -> Result<(), ()> { - match self - .authorized_keys - .binary_search_by(|k| k.key_id.cmp(&key.key_id)) - { - Err(i) => { - self.authorized_keys.insert(i, key); - Ok(()) - } - Ok(_) => Err(()), - } - } - pub fn authorized_keys(&self) -> &[AllowedKey] { - &self.authorized_keys[..] - } - pub fn clear_keys(&mut self) { - self.authorized_keys.clear(); - } -} - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct AllowedKey { - pub key_id: String, - pub timestamp: u64, - pub allow_read: bool, - pub allow_write: bool, -} - -impl Entry for Bucket { - fn partition_key(&self) -> &EmptyKey { - &EmptyKey - } - fn sort_key(&self) -> &String { - &self.name - } - - fn merge(&mut self, other: &Self) { - if other.timestamp < self.timestamp { - *self = other.clone(); - return; - } - if self.timestamp > other.timestamp || self.deleted { - return; - } - - for ak in other.authorized_keys.iter() { - match self - .authorized_keys - .binary_search_by(|our_ak| our_ak.key_id.cmp(&ak.key_id)) - { - Ok(i) => { - let our_ak = &mut self.authorized_keys[i]; - if ak.timestamp > our_ak.timestamp { - *our_ak = ak.clone(); - } - } - Err(i) => { - self.authorized_keys.insert(i, ak.clone()); - } - } - } - } -} - -pub struct BucketTable; - -#[async_trait] -impl TableSchema for BucketTable { - type P = EmptyKey; - type S = String; - type E = Bucket; - type Filter = (); - - async fn updated(&self, _old: Option, _new: Option) -> Result<(), Error> { - Ok(()) - } - - fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { - !entry.deleted - } -} diff --git a/src/store/key_table.rs b/src/store/key_table.rs deleted file mode 100644 index add6ab02..00000000 --- a/src/store/key_table.rs +++ /dev/null @@ -1,154 +0,0 @@ -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; - -use crate::data::*; -use crate::error::Error; -use crate::table::*; - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct Key { - // Primary key - pub key_id: String, - - // Associated secret key (immutable) - pub secret_key: String, - - // Name - pub name: String, - pub name_timestamp: u64, - - // Deletion - pub deleted: bool, - - // Authorized keys - authorized_buckets: Vec, -} - -impl Key { - pub fn new(name: String, buckets: Vec) -> Self { - let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..])); - let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]); - let mut ret = Self { - key_id, - secret_key, - name, - name_timestamp: now_msec(), - deleted: false, - authorized_buckets: vec![], - }; - for b in buckets { - ret.add_bucket(b) - .expect("Duplicate AllowedBucket in Key constructor"); - } - ret - } - pub fn delete(key_id: String) -> Self { - Self { - key_id, - secret_key: "".into(), - name: "".into(), - name_timestamp: now_msec(), - deleted: true, - authorized_buckets: vec![], - } - } - /// Add an authorized bucket, only if it wasn't there before - pub fn add_bucket(&mut self, new: AllowedBucket) -> Result<(), ()> { - match self - .authorized_buckets - .binary_search_by(|b| b.bucket.cmp(&new.bucket)) - { - Err(i) => { - self.authorized_buckets.insert(i, new); - Ok(()) - } - Ok(_) => Err(()), - } - } - pub fn authorized_buckets(&self) -> &[AllowedBucket] { - &self.authorized_buckets[..] - } - pub fn clear_buckets(&mut self) { - self.authorized_buckets.clear(); - } - pub fn allow_read(&self, bucket: &str) -> bool { - self.authorized_buckets - .iter() - .find(|x| x.bucket.as_str() == bucket) - .map(|x| x.allow_read) - .unwrap_or(false) - } - pub fn allow_write(&self, bucket: &str) -> bool { - self.authorized_buckets - .iter() - .find(|x| x.bucket.as_str() == bucket) - .map(|x| x.allow_write) - .unwrap_or(false) - } -} - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct AllowedBucket { - pub bucket: String, - pub timestamp: u64, - pub allow_read: bool, - pub allow_write: bool, -} - -impl Entry for Key { - fn partition_key(&self) -> &EmptyKey { - &EmptyKey - } - fn sort_key(&self) -> &String { - &self.key_id - } - - fn merge(&mut self, other: &Self) { - if other.deleted { - self.deleted = true; - } - if self.deleted { - self.authorized_buckets.clear(); - return; - } - if other.name_timestamp > self.name_timestamp { - self.name_timestamp = other.name_timestamp; - self.name = other.name.clone(); - } - - for ab in other.authorized_buckets.iter() { - match self - .authorized_buckets - .binary_search_by(|our_ab| our_ab.bucket.cmp(&ab.bucket)) - { - Ok(i) => { - let our_ab = &mut self.authorized_buckets[i]; - if ab.timestamp > our_ab.timestamp { - *our_ab = ab.clone(); - } - } - Err(i) => { - self.authorized_buckets.insert(i, ab.clone()); - } - } - } - } -} - -pub struct KeyTable; - -#[async_trait] -impl TableSchema for KeyTable { - type P = EmptyKey; - type S = String; - type E = Key; - type Filter = (); - - async fn updated(&self, _old: Option, _new: Option) -> Result<(), Error> { - Ok(()) - } - - fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { - !entry.deleted - } -} diff --git a/src/store/mod.rs b/src/store/mod.rs deleted file mode 100644 index 962264c4..00000000 --- a/src/store/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -pub mod block; -pub mod block_ref_table; -pub mod bucket_table; -pub mod key_table; -pub mod object_table; -pub mod repair; -pub mod version_table; diff --git a/src/store/object_table.rs b/src/store/object_table.rs deleted file mode 100644 index f329a7f4..00000000 --- a/src/store/object_table.rs +++ /dev/null @@ -1,165 +0,0 @@ -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; - -use crate::background::BackgroundRunner; -use crate::data::*; -use crate::error::Error; - -use crate::table::table_sharded::*; -use crate::table::*; - -use crate::store::version_table::*; - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct Object { - // Primary key - pub bucket: String, - - // Sort key - pub key: String, - - // Data - versions: Vec, -} - -impl Object { - pub fn new(bucket: String, key: String, versions: Vec) -> Self { - let mut ret = Self { - bucket, - key, - versions: vec![], - }; - for v in versions { - ret.add_version(v) - .expect("Twice the same ObjectVersion in Object constructor"); - } - ret - } - /// Adds a version if it wasn't already present - pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> { - match self - .versions - .binary_search_by(|v| v.cmp_key().cmp(&new.cmp_key())) - { - Err(i) => { - self.versions.insert(i, new); - Ok(()) - } - Ok(_) => Err(()), - } - } - pub fn versions(&self) -> &[ObjectVersion] { - &self.versions[..] - } -} - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersion { - pub uuid: UUID, - pub timestamp: u64, - - pub mime_type: String, - pub size: u64, - pub is_complete: bool, - - pub data: ObjectVersionData, -} - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub enum ObjectVersionData { - DeleteMarker, - Inline(#[serde(with = "serde_bytes")] Vec), - FirstBlock(Hash), -} - -impl ObjectVersion { - fn cmp_key(&self) -> (u64, &UUID) { - (self.timestamp, &self.uuid) - } -} - -impl Entry for Object { - fn partition_key(&self) -> &String { - &self.bucket - } - fn sort_key(&self) -> &String { - &self.key - } - - fn merge(&mut self, other: &Self) { - for other_v in other.versions.iter() { - match self - .versions - .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) - { - Ok(i) => { - let mut v = &mut self.versions[i]; - if other_v.size > v.size { - v.size = other_v.size; - } - if other_v.is_complete && !v.is_complete { - v.is_complete = true; - } - } - Err(i) => { - self.versions.insert(i, other_v.clone()); - } - } - } - let last_complete = self - .versions - .iter() - .enumerate() - .rev() - .filter(|(_, v)| v.is_complete) - .next() - .map(|(vi, _)| vi); - - if let Some(last_vi) = last_complete { - self.versions = self.versions.drain(last_vi..).collect::>(); - } - } -} - -pub struct ObjectTable { - pub background: Arc, - pub version_table: Arc>, -} - -#[async_trait] -impl TableSchema for ObjectTable { - type P = String; - type S = String; - type E = Object; - type Filter = (); - - async fn updated(&self, old: Option, new: Option) -> Result<(), Error> { - let version_table = self.version_table.clone(); - if let (Some(old_v), Some(new_v)) = (old, new) { - // Propagate deletion of old versions - for v in old_v.versions.iter() { - if new_v - .versions - .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) - .is_err() - { - let deleted_version = Version::new( - v.uuid, - old_v.bucket.clone(), - old_v.key.clone(), - true, - vec![], - ); - version_table.insert(&deleted_version).await?; - } - } - } - Ok(()) - } - - fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { - // TODO - true - } -} diff --git a/src/store/repair.rs b/src/store/repair.rs deleted file mode 100644 index 39c57fc1..00000000 --- a/src/store/repair.rs +++ /dev/null @@ -1,184 +0,0 @@ -use std::sync::Arc; - -use tokio::sync::watch; - -use crate::error::Error; -use crate::server::Garage; -use crate::table::*; - -use crate::store::block_ref_table::*; -use crate::store::version_table::*; - -use crate::*; - -pub struct Repair { - pub garage: Arc, -} - -impl Repair { - pub async fn repair_worker( - &self, - opt: RepairOpt, - must_exit: watch::Receiver, - ) -> Result<(), Error> { - let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true); - - if todo(RepairWhat::Tables) { - info!("Launching a full sync of tables"); - self.garage - .bucket_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - self.garage - .object_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - self.garage - .version_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - self.garage - .block_ref_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - } - - // TODO: wait for full sync to finish before proceeding to the rest? - - if todo(RepairWhat::Versions) { - info!("Repairing the versions table"); - self.repair_versions(&must_exit).await?; - } - - if todo(RepairWhat::BlockRefs) { - info!("Repairing the block refs table"); - self.repair_block_ref(&must_exit).await?; - } - - if opt.what.is_none() { - info!("Repairing the RC"); - self.repair_rc(&must_exit).await?; - } - - if todo(RepairWhat::Blocks) { - info!("Repairing the stored blocks"); - self.garage - .block_manager - .repair_data_store(&must_exit) - .await?; - } - - Ok(()) - } - - async fn repair_versions(&self, must_exit: &watch::Receiver) -> Result<(), Error> { - let mut pos = vec![]; - - while let Some((item_key, item_bytes)) = self.garage.version_table.store.get_gt(&pos)? { - pos = item_key.to_vec(); - - let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?; - if version.deleted { - continue; - } - let object = self - .garage - .object_table - .get(&version.bucket, &version.key) - .await?; - let version_exists = match object { - Some(o) => o.versions().iter().any(|x| x.uuid == version.uuid), - None => { - warn!( - "Repair versions: object for version {:?} not found", - version - ); - false - } - }; - if !version_exists { - info!("Repair versions: marking version as deleted: {:?}", version); - self.garage - .version_table - .insert(&Version::new( - version.uuid, - version.bucket, - version.key, - true, - vec![], - )) - .await?; - } - - if *must_exit.borrow() { - break; - } - } - Ok(()) - } - - async fn repair_block_ref(&self, must_exit: &watch::Receiver) -> Result<(), Error> { - let mut pos = vec![]; - - while let Some((item_key, item_bytes)) = self.garage.block_ref_table.store.get_gt(&pos)? { - pos = item_key.to_vec(); - - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?; - if block_ref.deleted { - continue; - } - let version = self - .garage - .version_table - .get(&block_ref.version, &EmptyKey) - .await?; - let ref_exists = match version { - Some(v) => !v.deleted, - None => { - warn!( - "Block ref repair: version for block ref {:?} not found", - block_ref - ); - false - } - }; - if !ref_exists { - info!( - "Repair block ref: marking block_ref as deleted: {:?}", - block_ref - ); - self.garage - .block_ref_table - .insert(&BlockRef { - block: block_ref.block, - version: block_ref.version, - deleted: true, - }) - .await?; - } - - if *must_exit.borrow() { - break; - } - } - Ok(()) - } - - async fn repair_rc(&self, _must_exit: &watch::Receiver) -> Result<(), Error> { - // TODO - warn!("repair_rc: not implemented"); - Ok(()) - } -} diff --git a/src/store/version_table.rs b/src/store/version_table.rs deleted file mode 100644 index 6d304cda..00000000 --- a/src/store/version_table.rs +++ /dev/null @@ -1,131 +0,0 @@ -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; - -use crate::background::BackgroundRunner; -use crate::data::*; -use crate::error::Error; - -use crate::table::table_sharded::*; -use crate::table::*; - -use crate::store::block_ref_table::*; - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct Version { - // Primary key - pub uuid: UUID, - - // Actual data: the blocks for this version - pub deleted: bool, - blocks: Vec, - - // Back link to bucket+key so that we can figure if - // this was deleted later on - pub bucket: String, - pub key: String, -} - -impl Version { - pub fn new( - uuid: UUID, - bucket: String, - key: String, - deleted: bool, - blocks: Vec, - ) -> Self { - let mut ret = Self { - uuid, - deleted, - blocks: vec![], - bucket, - key, - }; - for b in blocks { - ret.add_block(b) - .expect("Twice the same VersionBlock in Version constructor"); - } - ret - } - /// Adds a block if it wasn't already present - pub fn add_block(&mut self, new: VersionBlock) -> Result<(), ()> { - match self.blocks.binary_search_by(|b| b.offset.cmp(&new.offset)) { - Err(i) => { - self.blocks.insert(i, new); - Ok(()) - } - Ok(_) => Err(()), - } - } - pub fn blocks(&self) -> &[VersionBlock] { - &self.blocks[..] - } -} - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct VersionBlock { - pub offset: u64, - pub hash: Hash, -} - -impl Entry for Version { - fn partition_key(&self) -> &Hash { - &self.uuid - } - fn sort_key(&self) -> &EmptyKey { - &EmptyKey - } - - fn merge(&mut self, other: &Self) { - if other.deleted { - self.deleted = true; - self.blocks.clear(); - } else if !self.deleted { - for bi in other.blocks.iter() { - match self.blocks.binary_search_by(|x| x.offset.cmp(&bi.offset)) { - Ok(_) => (), - Err(pos) => { - self.blocks.insert(pos, bi.clone()); - } - } - } - } - } -} - -pub struct VersionTable { - pub background: Arc, - pub block_ref_table: Arc>, -} - -#[async_trait] -impl TableSchema for VersionTable { - type P = Hash; - type S = EmptyKey; - type E = Version; - type Filter = (); - - async fn updated(&self, old: Option, new: Option) -> Result<(), Error> { - let block_ref_table = self.block_ref_table.clone(); - if let (Some(old_v), Some(new_v)) = (old, new) { - // Propagate deletion of version blocks - if new_v.deleted && !old_v.deleted { - let deleted_block_refs = old_v - .blocks - .iter() - .map(|vb| BlockRef { - block: vb.hash, - version: old_v.uuid, - deleted: true, - }) - .collect::>(); - block_ref_table.insert_many(&deleted_block_refs[..]).await?; - } - } - Ok(()) - } - - fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { - !entry.deleted - } -} diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml new file mode 100644 index 00000000..714d0a6e --- /dev/null +++ b/src/table/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "garage_table" +version = "0.1.0" +authors = ["Alex Auvolat "] +edition = "2018" + +[lib] +path = "lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +garage_util = { path = "../util" } +garage_rpc = { path = "../rpc" } + +bytes = "0.4" +rand = "0.7" +hex = "0.3" +arc-swap = "0.4" +log = "0.4" + +sled = "0.31" + +rmp-serde = "0.14.3" +serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } +serde_bytes = "0.11" + +async-trait = "0.1.30" +futures = "0.3" +futures-util = "0.3" +tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } + diff --git a/src/table/lib.rs b/src/table/lib.rs new file mode 100644 index 00000000..f490b491 --- /dev/null +++ b/src/table/lib.rs @@ -0,0 +1,11 @@ +#![recursion_limit = "1024"] + +#[macro_use] +extern crate log; + +pub mod table; +pub mod table_fullcopy; +pub mod table_sharded; +pub mod table_sync; + +pub use table::*; diff --git a/src/table/mod.rs b/src/table/mod.rs deleted file mode 100644 index e03b8d0b..00000000 --- a/src/table/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -pub mod table; -pub mod table_fullcopy; -pub mod table_sharded; -pub mod table_sync; - -pub use table::*; diff --git a/src/table/table.rs b/src/table/table.rs index 50e8739a..94bacc60 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -8,14 +8,14 @@ use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; -use crate::data::*; -use crate::error::Error; +use garage_util::data::*; +use garage_util::error::Error; -use crate::rpc::membership::{Ring, System}; -use crate::rpc::rpc_client::*; -use crate::rpc::rpc_server::*; +use garage_rpc::membership::{Ring, System}; +use garage_rpc::rpc_client::*; +use garage_rpc::rpc_server::*; -use crate::table::table_sync::*; +use crate::table_sync::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); @@ -78,14 +78,14 @@ impl PartitionKey for EmptyKey { } } -impl> PartitionKey for T { +impl PartitionKey for String { fn hash(&self) -> Hash { - hash(self.as_ref().as_bytes()) + hash(self.as_bytes()) } } -impl> SortKey for T { +impl SortKey for String { fn sort_key(&self) -> &[u8] { - self.as_ref().as_bytes() + self.as_bytes() } } diff --git a/src/table/table_fullcopy.rs b/src/table/table_fullcopy.rs index 2cd2e464..a6c78a63 100644 --- a/src/table/table_fullcopy.rs +++ b/src/table/table_fullcopy.rs @@ -1,9 +1,10 @@ use arc_swap::ArcSwapOption; use std::sync::Arc; -use crate::data::*; -use crate::rpc::membership::{Ring, System}; -use crate::table::*; +use garage_rpc::membership::{Ring, System}; +use garage_util::data::*; + +use crate::*; #[derive(Clone)] pub struct TableFullReplication { diff --git a/src/table/table_sharded.rs b/src/table/table_sharded.rs index 5190f5d4..88856542 100644 --- a/src/table/table_sharded.rs +++ b/src/table/table_sharded.rs @@ -1,6 +1,7 @@ -use crate::data::*; -use crate::rpc::membership::{Ring, System}; -use crate::table::*; +use garage_rpc::membership::{Ring, System}; +use garage_util::data::*; + +use crate::*; #[derive(Clone)] pub struct TableShardedReplication { diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs index 8f6582a7..145b3068 100644 --- a/src/table/table_sync.rs +++ b/src/table/table_sync.rs @@ -12,10 +12,11 @@ use serde_bytes::ByteBuf; use tokio::sync::Mutex; use tokio::sync::{mpsc, watch}; -use crate::data::*; -use crate::error::Error; -use crate::rpc::membership::Ring; -use crate::table::*; +use garage_rpc::membership::Ring; +use garage_util::data::*; +use garage_util::error::Error; + +use crate::*; const MAX_DEPTH: usize = 16; const SCAN_INTERVAL: Duration = Duration::from_secs(3600); diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml new file mode 100644 index 00000000..6f61a586 --- /dev/null +++ b/src/util/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "garage_util" +version = "0.1.0" +authors = ["Alex Auvolat "] +edition = "2018" + +[lib] +path = "lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +rand = "0.7" +hex = "0.3" +sha2 = "0.8" +err-derive = "0.2.3" +log = "0.4" + +sled = "0.31" + +toml = "0.5" +rmp-serde = "0.14.3" +serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } +serde_json = "1.0" + +futures = "0.3" +futures-util = "0.3" +tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } + +http = "0.2" +hyper = "0.13" +rustls = "0.17" +webpki = "0.21" + + diff --git a/src/util/background.rs b/src/util/background.rs new file mode 100644 index 00000000..937062dd --- /dev/null +++ b/src/util/background.rs @@ -0,0 +1,124 @@ +use core::future::Future; +use std::pin::Pin; + +use futures::future::join_all; +use futures::select; +use futures_util::future::*; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio::sync::{mpsc, watch, Notify}; + +use crate::error::Error; + +type JobOutput = Result<(), Error>; +type Job = Pin + Send>>; + +pub struct BackgroundRunner { + n_runners: usize, + pub stop_signal: watch::Receiver, + + queue_in: mpsc::UnboundedSender<(Job, bool)>, + queue_out: Mutex>, + job_notify: Notify, + + workers: Mutex>>, +} + +impl BackgroundRunner { + pub fn new(n_runners: usize, stop_signal: watch::Receiver) -> Arc { + let (queue_in, queue_out) = mpsc::unbounded_channel(); + Arc::new(Self { + n_runners, + stop_signal, + queue_in, + queue_out: Mutex::new(queue_out), + job_notify: Notify::new(), + workers: Mutex::new(Vec::new()), + }) + } + + pub async fn run(self: Arc) { + let mut workers = self.workers.lock().await; + for i in 0..self.n_runners { + workers.push(tokio::spawn(self.clone().runner(i))); + } + drop(workers); + + let mut stop_signal = self.stop_signal.clone(); + while let Some(exit_now) = stop_signal.recv().await { + if exit_now { + let mut workers = self.workers.lock().await; + let workers_vec = workers.drain(..).collect::>(); + join_all(workers_vec).await; + return; + } + } + } + + pub fn spawn(&self, job: T) + where + T: Future + Send + 'static, + { + let boxed: Job = Box::pin(job); + let _: Result<_, _> = self.queue_in.clone().send((boxed, false)); + self.job_notify.notify(); + } + + pub fn spawn_cancellable(&self, job: T) + where + T: Future + Send + 'static, + { + let boxed: Job = Box::pin(job); + let _: Result<_, _> = self.queue_in.clone().send((boxed, true)); + self.job_notify.notify(); + } + + pub async fn spawn_worker(&self, name: String, worker: F) + where + F: FnOnce(watch::Receiver) -> T + Send + 'static, + T: Future + Send + 'static, + { + let mut workers = self.workers.lock().await; + let stop_signal = self.stop_signal.clone(); + workers.push(tokio::spawn(async move { + if let Err(e) = worker(stop_signal).await { + error!("Worker stopped with error: {}, error: {}", name, e); + } else { + info!("Worker exited successfully: {}", name); + } + })); + } + + async fn runner(self: Arc, i: usize) { + let mut stop_signal = self.stop_signal.clone(); + loop { + let must_exit: bool = *stop_signal.borrow(); + if let Some(job) = self.dequeue_job(must_exit).await { + if let Err(e) = job.await { + error!("Job failed: {}", e) + } + } else { + if must_exit { + info!("Background runner {} exiting", i); + return; + } + select! { + _ = self.job_notify.notified().fuse() => (), + _ = stop_signal.recv().fuse() => (), + } + } + } + } + + async fn dequeue_job(&self, must_exit: bool) -> Option { + let mut queue = self.queue_out.lock().await; + while let Ok((job, cancellable)) = queue.try_recv() { + if cancellable && must_exit { + continue; + } else { + return Some(job); + } + } + None + } +} diff --git a/src/util/config.rs b/src/util/config.rs new file mode 100644 index 00000000..cb871562 --- /dev/null +++ b/src/util/config.rs @@ -0,0 +1,66 @@ +use std::io::Read; +use std::net::SocketAddr; +use std::path::PathBuf; + +use serde::Deserialize; + +use crate::error::Error; + +#[derive(Deserialize, Debug, Clone)] +pub struct Config { + pub metadata_dir: PathBuf, + pub data_dir: PathBuf, + + pub api_bind_addr: SocketAddr, + pub rpc_bind_addr: SocketAddr, + + pub bootstrap_peers: Vec, + + #[serde(default = "default_max_concurrent_rpc_requests")] + pub max_concurrent_rpc_requests: usize, + + #[serde(default = "default_block_size")] + pub block_size: usize, + + #[serde(default = "default_replication_factor")] + pub meta_replication_factor: usize, + + #[serde(default = "default_epidemic_factor")] + pub meta_epidemic_factor: usize, + + #[serde(default = "default_replication_factor")] + pub data_replication_factor: usize, + + pub rpc_tls: Option, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct TlsConfig { + pub ca_cert: String, + pub node_cert: String, + pub node_key: String, +} + +fn default_max_concurrent_rpc_requests() -> usize { + 12 +} +fn default_block_size() -> usize { + 1048576 +} +fn default_replication_factor() -> usize { + 3 +} +fn default_epidemic_factor() -> usize { + 3 +} + +pub fn read_config(config_file: PathBuf) -> Result { + let mut file = std::fs::OpenOptions::new() + .read(true) + .open(config_file.as_path())?; + + let mut config = String::new(); + file.read_to_string(&mut config)?; + + Ok(toml::from_str(&config)?) +} diff --git a/src/util/data.rs b/src/util/data.rs new file mode 100644 index 00000000..8f976f71 --- /dev/null +++ b/src/util/data.rs @@ -0,0 +1,124 @@ +use rand::Rng; +use serde::de::{self, Visitor}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use sha2::{Digest, Sha256}; +use std::fmt; +use std::time::{SystemTime, UNIX_EPOCH}; + +#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)] +pub struct FixedBytes32([u8; 32]); + +impl From<[u8; 32]> for FixedBytes32 { + fn from(x: [u8; 32]) -> FixedBytes32 { + FixedBytes32(x) + } +} + +impl std::convert::AsRef<[u8]> for FixedBytes32 { + fn as_ref(&self) -> &[u8] { + &self.0[..] + } +} + +impl Eq for FixedBytes32 {} + +impl fmt::Debug for FixedBytes32 { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}…", hex::encode(&self.0[..8])) + } +} + +struct FixedBytes32Visitor; +impl<'de> Visitor<'de> for FixedBytes32Visitor { + type Value = FixedBytes32; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a byte slice of size 32") + } + + fn visit_bytes(self, value: &[u8]) -> Result { + if value.len() == 32 { + let mut res = [0u8; 32]; + res.copy_from_slice(value); + Ok(res.into()) + } else { + Err(E::custom(format!( + "Invalid byte string length {}, expected 32", + value.len() + ))) + } + } +} + +impl<'de> Deserialize<'de> for FixedBytes32 { + fn deserialize>(deserializer: D) -> Result { + deserializer.deserialize_bytes(FixedBytes32Visitor) + } +} + +impl Serialize for FixedBytes32 { + fn serialize(&self, serializer: S) -> Result { + serializer.serialize_bytes(&self.0[..]) + } +} + +impl FixedBytes32 { + pub fn as_slice(&self) -> &[u8] { + &self.0[..] + } + pub fn as_slice_mut(&mut self) -> &mut [u8] { + &mut self.0[..] + } + pub fn to_vec(&self) -> Vec { + self.0.to_vec() + } +} + +pub type UUID = FixedBytes32; +pub type Hash = FixedBytes32; + +pub fn hash(data: &[u8]) -> Hash { + let mut hasher = Sha256::new(); + hasher.input(data); + let mut hash = [0u8; 32]; + hash.copy_from_slice(&hasher.result()[..]); + hash.into() +} + +pub fn gen_uuid() -> UUID { + rand::thread_rng().gen::<[u8; 32]>().into() +} + +pub fn now_msec() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Fix your clock :o") + .as_millis() as u64 +} + +// RMP serialization with names of fields and variants + +pub fn rmp_to_vec_all_named(val: &T) -> Result, rmp_serde::encode::Error> +where + T: Serialize + ?Sized, +{ + let mut wr = Vec::with_capacity(128); + let mut se = rmp_serde::Serializer::new(&mut wr) + .with_struct_map() + .with_string_variants(); + val.serialize(&mut se)?; + Ok(wr) +} + +pub fn debug_serialize(x: T) -> String { + match serde_json::to_string(&x) { + Ok(ss) => { + if ss.len() > 100 { + ss[..100].to_string() + } else { + ss + } + } + Err(e) => format!("", e), + } +} diff --git a/src/util/error.rs b/src/util/error.rs new file mode 100644 index 00000000..f73d6915 --- /dev/null +++ b/src/util/error.rs @@ -0,0 +1,112 @@ +use err_derive::Error; +use hyper::StatusCode; +use std::io; + +use crate::data::*; + +#[derive(Debug, Error)] +pub enum RPCError { + #[error(display = "Node is down: {:?}.", _0)] + NodeDown(UUID), + #[error(display = "Timeout: {}", _0)] + Timeout(#[error(source)] tokio::time::Elapsed), + #[error(display = "HTTP error: {}", _0)] + HTTP(#[error(source)] http::Error), + #[error(display = "Hyper error: {}", _0)] + Hyper(#[error(source)] hyper::Error), + #[error(display = "Messagepack encode error: {}", _0)] + RMPEncode(#[error(source)] rmp_serde::encode::Error), + #[error(display = "Messagepack decode error: {}", _0)] + RMPDecode(#[error(source)] rmp_serde::decode::Error), + #[error(display = "Too many errors: {:?}", _0)] + TooManyErrors(Vec), +} + +#[derive(Debug, Error)] +pub enum Error { + #[error(display = "IO error: {}", _0)] + Io(#[error(source)] io::Error), + + #[error(display = "Hyper error: {}", _0)] + Hyper(#[error(source)] hyper::Error), + + #[error(display = "HTTP error: {}", _0)] + HTTP(#[error(source)] http::Error), + + #[error(display = "Invalid HTTP header value: {}", _0)] + HTTPHeader(#[error(source)] http::header::ToStrError), + + #[error(display = "TLS error: {}", _0)] + TLS(#[error(source)] rustls::TLSError), + + #[error(display = "PKI error: {}", _0)] + PKI(#[error(source)] webpki::Error), + + #[error(display = "Sled error: {}", _0)] + Sled(#[error(source)] sled::Error), + + #[error(display = "Messagepack encode error: {}", _0)] + RMPEncode(#[error(source)] rmp_serde::encode::Error), + #[error(display = "Messagepack decode error: {}", _0)] + RMPDecode(#[error(source)] rmp_serde::decode::Error), + #[error(display = "JSON error: {}", _0)] + JSON(#[error(source)] serde_json::error::Error), + #[error(display = "TOML decode error: {}", _0)] + TomlDecode(#[error(source)] toml::de::Error), + + #[error(display = "Timeout: {}", _0)] + RPCTimeout(#[error(source)] tokio::time::Elapsed), + + #[error(display = "Tokio join error: {}", _0)] + TokioJoin(#[error(source)] tokio::task::JoinError), + + #[error(display = "RPC call error: {}", _0)] + RPC(#[error(source)] RPCError), + + #[error(display = "Remote error: {} (status code {})", _0, _1)] + RemoteError(String, StatusCode), + + #[error(display = "Bad request: {}", _0)] + BadRequest(String), + + #[error(display = "Not found")] + NotFound, + + #[error(display = "Corrupt data: does not match hash {:?}", _0)] + CorruptData(Hash), + + #[error(display = "{}", _0)] + Message(String), +} + +impl Error { + pub fn http_status_code(&self) -> StatusCode { + match self { + Error::BadRequest(_) => StatusCode::BAD_REQUEST, + Error::NotFound => StatusCode::NOT_FOUND, + Error::RPC(_) => StatusCode::SERVICE_UNAVAILABLE, + _ => StatusCode::INTERNAL_SERVER_ERROR, + } + } +} + +impl From> for Error { + fn from(e: sled::TransactionError) -> Error { + match e { + sled::TransactionError::Abort(x) => x, + sled::TransactionError::Storage(x) => Error::Sled(x), + } + } +} + +impl From> for Error { + fn from(_e: tokio::sync::watch::error::SendError) -> Error { + Error::Message(format!("Watch send error")) + } +} + +impl From> for Error { + fn from(_e: tokio::sync::mpsc::error::SendError) -> Error { + Error::Message(format!("MPSC send error")) + } +} diff --git a/src/util/lib.rs b/src/util/lib.rs new file mode 100644 index 00000000..0bf09bf6 --- /dev/null +++ b/src/util/lib.rs @@ -0,0 +1,7 @@ +#[macro_use] +extern crate log; + +pub mod background; +pub mod config; +pub mod data; +pub mod error; -- cgit v1.2.3