diff options
Diffstat (limited to 'src/garage')
-rw-r--r-- | src/garage/Cargo.toml | 1 | ||||
-rw-r--r-- | src/garage/admin.rs | 106 | ||||
-rw-r--r-- | src/garage/cli/cmd.rs | 14 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 46 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 65 | ||||
-rw-r--r-- | src/garage/main.rs | 18 | ||||
-rw-r--r-- | src/garage/repair/mod.rs | 2 | ||||
-rw-r--r-- | src/garage/repair/offline.rs | 57 | ||||
-rw-r--r-- | src/garage/repair/online.rs (renamed from src/garage/repair.rs) | 4 | ||||
-rw-r--r-- | src/garage/server.rs | 50 |
10 files changed, 287 insertions, 76 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index eb643160..640e6975 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -30,6 +30,7 @@ garage_util = { version = "0.7.0", path = "../util" } garage_web = { version = "0.7.0", path = "../web" } bytes = "1.0" +bytesize = "1.1" hex = "0.4" tracing = { version = "0.1.30", features = ["log-always"] } pretty_env_logger = "0.4" diff --git a/src/garage/admin.rs b/src/garage/admin.rs index c662aa00..6630ae16 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -24,11 +24,12 @@ use garage_model::migrate::Migrate; use garage_model::permission::*; use crate::cli::*; -use crate::repair::Repair; +use crate::repair::online::OnlineRepair; pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; #[derive(Debug, Serialize, Deserialize)] +#[allow(clippy::large_enum_variant)] pub enum AdminRpc { BucketOperation(BucketOperation), KeyOperation(KeyOperation), @@ -38,8 +39,15 @@ pub enum AdminRpc { // Replies Ok(String), - BucketList(Vec<Bucket>), - BucketInfo(Bucket, HashMap<String, Key>), + BucketList { + buckets: Vec<Bucket>, + counters: HashMap<Uuid, HashMap<String, i64>>, + }, + BucketInfo { + bucket: Bucket, + relevant_keys: HashMap<String, Key>, + counters: HashMap<String, i64>, + }, KeyList(Vec<(String, String)>), KeyInfo(Key, HashMap<Uuid, Bucket>), } @@ -72,6 +80,7 @@ impl AdminRpcHandler { BucketOperation::Allow(query) => self.handle_bucket_allow(query).await, BucketOperation::Deny(query) => self.handle_bucket_deny(query).await, BucketOperation::Website(query) => self.handle_bucket_website(query).await, + BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await, } } @@ -87,7 +96,25 @@ impl AdminRpcHandler { EnumerationOrder::Forward, ) .await?; - Ok(AdminRpc::BucketList(buckets)) + + let ring = self.garage.system.ring.borrow().clone(); + let counters = self + .garage + .object_counter_table + .table + .get_range( + &EmptyKey, + None, + Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())), + 15000, + EnumerationOrder::Forward, + ) + .await? + .iter() + .map(|x| (x.sk, x.filtered_values(&ring))) + .collect::<HashMap<_, _>>(); + + Ok(AdminRpc::BucketList { buckets, counters }) } async fn handle_bucket_info(&self, query: &BucketOpt) -> Result<AdminRpc, Error> { @@ -104,6 +131,15 @@ impl AdminRpcHandler { .get_existing_bucket(bucket_id) .await?; + let counters = self + .garage + .object_counter_table + .table + .get(&EmptyKey, &bucket_id) + .await? + .map(|x| x.filtered_values(&self.garage.system.ring.borrow())) + .unwrap_or_default(); + let mut relevant_keys = HashMap::new(); for (k, _) in bucket .state @@ -139,7 +175,11 @@ impl AdminRpcHandler { } } - Ok(AdminRpc::BucketInfo(bucket, relevant_keys)) + Ok(AdminRpc::BucketInfo { + bucket, + relevant_keys, + counters, + }) } #[allow(clippy::ptr_arg)] @@ -431,6 +471,60 @@ impl AdminRpcHandler { Ok(AdminRpc::Ok(msg)) } + async fn handle_bucket_set_quotas(&self, query: &SetQuotasOpt) -> Result<AdminRpc, Error> { + let bucket_id = self + .garage + .bucket_helper() + .resolve_global_bucket_name(&query.bucket) + .await? + .ok_or_bad_request("Bucket not found")?; + + let mut bucket = self + .garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + let bucket_state = bucket.state.as_option_mut().unwrap(); + + if query.max_size.is_none() && query.max_objects.is_none() { + return Err(Error::BadRequest( + "You must specify either --max-size or --max-objects (or both) for this command to do something.".to_string(), + )); + } + + let mut quotas = bucket_state.quotas.get().clone(); + + match query.max_size.as_ref().map(String::as_ref) { + Some("none") => quotas.max_size = None, + Some(v) => { + let bs = v + .parse::<bytesize::ByteSize>() + .ok_or_bad_request(format!("Invalid size specified: {}", v))?; + quotas.max_size = Some(bs.as_u64()); + } + _ => (), + } + + match query.max_objects.as_ref().map(String::as_ref) { + Some("none") => quotas.max_objects = None, + Some(v) => { + let mo = v + .parse::<u64>() + .ok_or_bad_request(format!("Invalid number specified: {}", v))?; + quotas.max_objects = Some(mo); + } + _ => (), + } + + bucket_state.quotas.update(quotas); + self.garage.bucket_table.insert(&bucket).await?; + + Ok(AdminRpc::Ok(format!( + "Quotas updated for {}", + &query.bucket + ))) + } + async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> { match cmd { KeyOperation::List => self.handle_list_keys().await, @@ -619,7 +713,7 @@ impl AdminRpcHandler { ))) } } else { - let repair = Repair { + let repair = OnlineRepair { garage: self.garage.clone(), }; self.garage diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index b2dd8f14..e8e834e8 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -166,11 +166,15 @@ pub async fn cmd_admin( AdminRpc::Ok(msg) => { println!("{}", msg); } - AdminRpc::BucketList(bl) => { - print_bucket_list(bl); - } - AdminRpc::BucketInfo(bucket, rk) => { - print_bucket_info(&bucket, &rk); + AdminRpc::BucketList { buckets, counters } => { + print_bucket_list(buckets, counters); + } + AdminRpc::BucketInfo { + bucket, + relevant_keys, + counters, + } => { + print_bucket_info(&bucket, &relevant_keys, &counters); } AdminRpc::KeyList(kl) => { print_key_list(kl); diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index a0c49aeb..28228b07 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -33,10 +33,15 @@ pub enum Command { #[structopt(name = "migrate")] Migrate(MigrateOpt), - /// Start repair of node data + /// Start repair of node data on remote node #[structopt(name = "repair")] Repair(RepairOpt), + /// Offline reparation of node data (these repairs must be run offline + /// directly on the server node) + #[structopt(name = "offline-repair")] + OfflineRepair(OfflineRepairOpt), + /// Gather node statistics #[structopt(name = "stats")] Stats(StatsOpt), @@ -175,6 +180,10 @@ pub enum BucketOperation { /// Expose as website or not #[structopt(name = "website")] Website(WebsiteOpt), + + /// Set the quotas for this bucket + #[structopt(name = "set-quotas")] + SetQuotas(SetQuotasOpt), } #[derive(Serialize, Deserialize, StructOpt, Debug)] @@ -262,6 +271,21 @@ pub struct PermBucketOpt { } #[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct SetQuotasOpt { + /// Bucket name + pub bucket: String, + + /// Set a maximum size for the bucket (specify a size e.g. in MiB or GiB, + /// or `none` for no size restriction) + #[structopt(long = "max-size")] + pub max_size: Option<String>, + + /// Set a maximum number of objects for the bucket (or `none` for no restriction) + #[structopt(long = "max-objects")] + pub max_objects: Option<String>, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] pub enum KeyOperation { /// List keys #[structopt(name = "list")] @@ -406,6 +430,26 @@ pub enum RepairWhat { } #[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +pub struct OfflineRepairOpt { + /// Confirm the launch of the repair operation + #[structopt(long = "yes")] + pub yes: bool, + + #[structopt(subcommand)] + pub what: OfflineRepairWhat, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum OfflineRepairWhat { + /// Repair K2V item counters + #[structopt(name = "k2v_item_counters")] + K2VItemCounters, + /// Repair object counters + #[structopt(name = "object_counters")] + ObjectCounters, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct StatsOpt { /// Gather statistics from all nodes #[structopt(short = "a", long = "all-nodes")] diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 6d73be3a..153e77a8 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -7,12 +7,13 @@ use garage_util::formater::format_table; use garage_model::bucket_table::*; use garage_model::key_table::*; +use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS}; -pub fn print_bucket_list(bl: Vec<Bucket>) { +pub fn print_bucket_list(buckets: Vec<Bucket>, counters: HashMap<Uuid, HashMap<String, i64>>) { println!("List of buckets:"); let mut table = vec![]; - for bucket in bl { + for bucket in buckets { let aliases = bucket .aliases() .iter() @@ -29,11 +30,24 @@ pub fn print_bucket_list(bl: Vec<Bucket>) { [((k, n), _, _)] => format!("{}:{}", k, n), s => format!("[{} local aliases]", s.len()), }; + + let counters_tb = match counters.get(&bucket.id) { + Some(c) => format!( + "\t{}\t{}\t{}", + bytesize::ByteSize::b(c.get(BYTES).cloned().unwrap_or_default() as u64) + .to_string_as(true), + c.get(OBJECTS).cloned().unwrap_or_default(), + c.get(UNFINISHED_UPLOADS).cloned().unwrap_or_default(), + ), + None => "".into(), + }; + table.push(format!( - "\t{}\t{}\t{}", + "\t{}\t{}\t{}{}", aliases.join(","), local_aliases_n, - hex::encode(bucket.id) + hex::encode(bucket.id), + counters_tb )); } format_table(table); @@ -121,7 +135,11 @@ pub fn print_key_info(key: &Key, relevant_buckets: &HashMap<Uuid, Bucket>) { } } -pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>) { +pub fn print_bucket_info( + bucket: &Bucket, + relevant_keys: &HashMap<String, Key>, + counters: &HashMap<String, i64>, +) { let key_name = |k| { relevant_keys .get(k) @@ -133,7 +151,42 @@ pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>) match &bucket.state { Deletable::Deleted => println!("Bucket is deleted."), Deletable::Present(p) => { - println!("Website access: {}", p.website_config.get().is_some()); + let size = + bytesize::ByteSize::b(counters.get(BYTES).cloned().unwrap_or_default() as u64); + println!( + "\nSize: {} ({})", + size.to_string_as(true), + size.to_string_as(false) + ); + println!( + "Objects: {}", + counters.get(OBJECTS).cloned().unwrap_or_default() + ); + println!( + "Unfinished multipart uploads: {}", + counters + .get(UNFINISHED_UPLOADS) + .cloned() + .unwrap_or_default() + ); + + println!("\nWebsite access: {}", p.website_config.get().is_some()); + + let quotas = p.quotas.get(); + if quotas.max_size.is_some() || quotas.max_objects.is_some() { + println!("\nQuotas:"); + if let Some(ms) = quotas.max_size { + let ms = bytesize::ByteSize::b(ms); + println!( + " maximum size: {} ({})", + ms.to_string_as(true), + ms.to_string_as(false) + ); + } + if let Some(mo) = quotas.max_objects { + println!(" maximum number of objects: {}", mo); + } + } println!("\nGlobal aliases:"); for (alias, _, active) in p.aliases.items().iter() { diff --git a/src/garage/main.rs b/src/garage/main.rs index bd09b6ea..3fa5c3c0 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -61,17 +61,17 @@ async fn main() { pretty_env_logger::init(); sodiumoxide::init().expect("Unable to init sodiumoxide"); - let opt = Opt::from_args(); + // Abort on panic (same behavior as in Go) + std::panic::set_hook(Box::new(|panic_info| { + error!("{}", panic_info.to_string()); + std::process::abort(); + })); + let opt = Opt::from_args(); let res = match opt.cmd { - Command::Server => { - // Abort on panic (same behavior as in Go) - std::panic::set_hook(Box::new(|panic_info| { - error!("{}", panic_info.to_string()); - std::process::abort(); - })); - - server::run_server(opt.config_file).await + Command::Server => server::run_server(opt.config_file).await, + Command::OfflineRepair(repair_opt) => { + repair::offline::offline_repair(opt.config_file, repair_opt).await } Command::Node(NodeOperation::NodeId(node_id_opt)) => { node_id_command(opt.config_file, node_id_opt.quiet) diff --git a/src/garage/repair/mod.rs b/src/garage/repair/mod.rs new file mode 100644 index 00000000..4699ace5 --- /dev/null +++ b/src/garage/repair/mod.rs @@ -0,0 +1,2 @@ +pub mod offline; +pub mod online; diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs new file mode 100644 index 00000000..ef56cc5c --- /dev/null +++ b/src/garage/repair/offline.rs @@ -0,0 +1,57 @@ +use std::path::PathBuf; + +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::config::*; +use garage_util::error::*; + +use garage_model::garage::Garage; + +use crate::cli::structs::*; + +pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Result<(), Error> { + if !opt.yes { + return Err(Error::Message( + "Please add the --yes flag to launch repair operation".into(), + )); + } + + info!("Loading configuration..."); + let config = read_config(config_file)?; + + info!("Initializing background runner..."); + let (done_tx, done_rx) = watch::channel(false); + let (background, await_background_done) = BackgroundRunner::new(16, done_rx); + + info!("Initializing Garage main data store..."); + let garage = Garage::new(config.clone(), background)?; + + info!("Launching repair operation..."); + match opt.what { + OfflineRepairWhat::K2VItemCounters => { + #[cfg(feature = "k2v")] + garage + .k2v + .counter_table + .offline_recount_all(&garage.k2v.item_table)?; + #[cfg(not(feature = "k2v"))] + error!("K2V not enabled in this build."); + } + OfflineRepairWhat::ObjectCounters => { + garage + .object_counter_table + .offline_recount_all(&garage.object_table)?; + } + } + + info!("Repair operation finished, shutting down Garage internals..."); + done_tx.send(true).unwrap(); + drop(garage); + + await_background_done.await?; + + info!("Cleaning up..."); + + Ok(()) +} diff --git a/src/garage/repair.rs b/src/garage/repair/online.rs index 17e14b8b..d6a71742 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair/online.rs @@ -11,11 +11,11 @@ use garage_util::error::Error; use crate::*; -pub struct Repair { +pub struct OnlineRepair { pub garage: Arc<Garage>, } -impl Repair { +impl OnlineRepair { pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) { if let Err(e) = self.repair_worker_aux(opt, must_exit).await { warn!("Repair worker failed with error: {}", e); diff --git a/src/garage/server.rs b/src/garage/server.rs index 7aa6185f..6321357a 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -2,8 +2,6 @@ use std::path::PathBuf; use tokio::sync::watch; -use garage_db as db; - use garage_util::background::*; use garage_util::config::*; use garage_util::error::Error; @@ -29,57 +27,14 @@ async fn wait_from(mut chan: watch::Receiver<bool>) { pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Loading configuration..."); - let config = read_config(config_file).expect("Unable to read config file"); - - info!("Opening database..."); - let mut db_path = config.metadata_dir.clone(); - std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory"); - let db = match config.db_engine.as_str() { - "sled" => { - db_path.push("db"); - info!("Opening Sled database at: {}", db_path.display()); - let db = db::sled_adapter::sled::Config::default() - .path(&db_path) - .cache_capacity(config.sled_cache_capacity) - .flush_every_ms(Some(config.sled_flush_every_ms)) - .open() - .expect("Unable to open sled DB"); - db::sled_adapter::SledDb::init(db) - } - "sqlite" | "sqlite3" | "rusqlite" => { - db_path.push("db.sqlite"); - info!("Opening Sqlite database at: {}", db_path.display()); - let db = db::sqlite_adapter::rusqlite::Connection::open(db_path) - .expect("Unable to open sqlite DB"); - db::sqlite_adapter::SqliteDb::init(db) - } - "lmdb" | "heed" => { - db_path.push("db.lmdb"); - info!("Opening LMDB database at: {}", db_path.display()); - std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory"); - let map_size = garage_db::lmdb_adapter::recommended_map_size(); - - let db = db::lmdb_adapter::heed::EnvOpenOptions::new() - .max_dbs(100) - .map_size(map_size) - .open(&db_path) - .expect("Unable to open LMDB DB"); - db::lmdb_adapter::LmdbDb::init(db) - } - e => { - return Err(Error::Message(format!( - "Unsupported DB engine: {} (options: sled, sqlite, lmdb)", - e - ))); - } - }; + let config = read_config(config_file)?; info!("Initializing background runner..."); let watch_cancel = netapp::util::watch_ctrl_c(); let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone()); info!("Initializing Garage main data store..."); - let garage = Garage::new(config.clone(), db, background); + let garage = Garage::new(config.clone(), background)?; info!("Initialize tracing..."); if let Some(export_to) = config.admin.trace_sink { @@ -89,6 +44,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Initialize Admin API server and metrics collector..."); let admin_server = AdminApiServer::new(garage.clone()); + info!("Launching internal Garage cluster communications..."); let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone())); info!("Create admin RPC handler..."); |