aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/Cargo.toml1
-rw-r--r--src/garage/admin.rs106
-rw-r--r--src/garage/cli/cmd.rs14
-rw-r--r--src/garage/cli/structs.rs46
-rw-r--r--src/garage/cli/util.rs65
-rw-r--r--src/garage/main.rs18
-rw-r--r--src/garage/repair/mod.rs2
-rw-r--r--src/garage/repair/offline.rs57
-rw-r--r--src/garage/repair/online.rs (renamed from src/garage/repair.rs)4
-rw-r--r--src/garage/server.rs50
10 files changed, 287 insertions, 76 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index eb643160..640e6975 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -30,6 +30,7 @@ garage_util = { version = "0.7.0", path = "../util" }
garage_web = { version = "0.7.0", path = "../web" }
bytes = "1.0"
+bytesize = "1.1"
hex = "0.4"
tracing = { version = "0.1.30", features = ["log-always"] }
pretty_env_logger = "0.4"
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index c662aa00..6630ae16 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -24,11 +24,12 @@ use garage_model::migrate::Migrate;
use garage_model::permission::*;
use crate::cli::*;
-use crate::repair::Repair;
+use crate::repair::online::OnlineRepair;
pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
#[derive(Debug, Serialize, Deserialize)]
+#[allow(clippy::large_enum_variant)]
pub enum AdminRpc {
BucketOperation(BucketOperation),
KeyOperation(KeyOperation),
@@ -38,8 +39,15 @@ pub enum AdminRpc {
// Replies
Ok(String),
- BucketList(Vec<Bucket>),
- BucketInfo(Bucket, HashMap<String, Key>),
+ BucketList {
+ buckets: Vec<Bucket>,
+ counters: HashMap<Uuid, HashMap<String, i64>>,
+ },
+ BucketInfo {
+ bucket: Bucket,
+ relevant_keys: HashMap<String, Key>,
+ counters: HashMap<String, i64>,
+ },
KeyList(Vec<(String, String)>),
KeyInfo(Key, HashMap<Uuid, Bucket>),
}
@@ -72,6 +80,7 @@ impl AdminRpcHandler {
BucketOperation::Allow(query) => self.handle_bucket_allow(query).await,
BucketOperation::Deny(query) => self.handle_bucket_deny(query).await,
BucketOperation::Website(query) => self.handle_bucket_website(query).await,
+ BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await,
}
}
@@ -87,7 +96,25 @@ impl AdminRpcHandler {
EnumerationOrder::Forward,
)
.await?;
- Ok(AdminRpc::BucketList(buckets))
+
+ let ring = self.garage.system.ring.borrow().clone();
+ let counters = self
+ .garage
+ .object_counter_table
+ .table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())),
+ 15000,
+ EnumerationOrder::Forward,
+ )
+ .await?
+ .iter()
+ .map(|x| (x.sk, x.filtered_values(&ring)))
+ .collect::<HashMap<_, _>>();
+
+ Ok(AdminRpc::BucketList { buckets, counters })
}
async fn handle_bucket_info(&self, query: &BucketOpt) -> Result<AdminRpc, Error> {
@@ -104,6 +131,15 @@ impl AdminRpcHandler {
.get_existing_bucket(bucket_id)
.await?;
+ let counters = self
+ .garage
+ .object_counter_table
+ .table
+ .get(&EmptyKey, &bucket_id)
+ .await?
+ .map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
+ .unwrap_or_default();
+
let mut relevant_keys = HashMap::new();
for (k, _) in bucket
.state
@@ -139,7 +175,11 @@ impl AdminRpcHandler {
}
}
- Ok(AdminRpc::BucketInfo(bucket, relevant_keys))
+ Ok(AdminRpc::BucketInfo {
+ bucket,
+ relevant_keys,
+ counters,
+ })
}
#[allow(clippy::ptr_arg)]
@@ -431,6 +471,60 @@ impl AdminRpcHandler {
Ok(AdminRpc::Ok(msg))
}
+ async fn handle_bucket_set_quotas(&self, query: &SetQuotasOpt) -> Result<AdminRpc, Error> {
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ let mut bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let bucket_state = bucket.state.as_option_mut().unwrap();
+
+ if query.max_size.is_none() && query.max_objects.is_none() {
+ return Err(Error::BadRequest(
+ "You must specify either --max-size or --max-objects (or both) for this command to do something.".to_string(),
+ ));
+ }
+
+ let mut quotas = bucket_state.quotas.get().clone();
+
+ match query.max_size.as_ref().map(String::as_ref) {
+ Some("none") => quotas.max_size = None,
+ Some(v) => {
+ let bs = v
+ .parse::<bytesize::ByteSize>()
+ .ok_or_bad_request(format!("Invalid size specified: {}", v))?;
+ quotas.max_size = Some(bs.as_u64());
+ }
+ _ => (),
+ }
+
+ match query.max_objects.as_ref().map(String::as_ref) {
+ Some("none") => quotas.max_objects = None,
+ Some(v) => {
+ let mo = v
+ .parse::<u64>()
+ .ok_or_bad_request(format!("Invalid number specified: {}", v))?;
+ quotas.max_objects = Some(mo);
+ }
+ _ => (),
+ }
+
+ bucket_state.quotas.update(quotas);
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Quotas updated for {}",
+ &query.bucket
+ )))
+ }
+
async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
match cmd {
KeyOperation::List => self.handle_list_keys().await,
@@ -619,7 +713,7 @@ impl AdminRpcHandler {
)))
}
} else {
- let repair = Repair {
+ let repair = OnlineRepair {
garage: self.garage.clone(),
};
self.garage
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index b2dd8f14..e8e834e8 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -166,11 +166,15 @@ pub async fn cmd_admin(
AdminRpc::Ok(msg) => {
println!("{}", msg);
}
- AdminRpc::BucketList(bl) => {
- print_bucket_list(bl);
- }
- AdminRpc::BucketInfo(bucket, rk) => {
- print_bucket_info(&bucket, &rk);
+ AdminRpc::BucketList { buckets, counters } => {
+ print_bucket_list(buckets, counters);
+ }
+ AdminRpc::BucketInfo {
+ bucket,
+ relevant_keys,
+ counters,
+ } => {
+ print_bucket_info(&bucket, &relevant_keys, &counters);
}
AdminRpc::KeyList(kl) => {
print_key_list(kl);
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index a0c49aeb..28228b07 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -33,10 +33,15 @@ pub enum Command {
#[structopt(name = "migrate")]
Migrate(MigrateOpt),
- /// Start repair of node data
+ /// Start repair of node data on remote node
#[structopt(name = "repair")]
Repair(RepairOpt),
+ /// Offline reparation of node data (these repairs must be run offline
+ /// directly on the server node)
+ #[structopt(name = "offline-repair")]
+ OfflineRepair(OfflineRepairOpt),
+
/// Gather node statistics
#[structopt(name = "stats")]
Stats(StatsOpt),
@@ -175,6 +180,10 @@ pub enum BucketOperation {
/// Expose as website or not
#[structopt(name = "website")]
Website(WebsiteOpt),
+
+ /// Set the quotas for this bucket
+ #[structopt(name = "set-quotas")]
+ SetQuotas(SetQuotasOpt),
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
@@ -262,6 +271,21 @@ pub struct PermBucketOpt {
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub struct SetQuotasOpt {
+ /// Bucket name
+ pub bucket: String,
+
+ /// Set a maximum size for the bucket (specify a size e.g. in MiB or GiB,
+ /// or `none` for no size restriction)
+ #[structopt(long = "max-size")]
+ pub max_size: Option<String>,
+
+ /// Set a maximum number of objects for the bucket (or `none` for no restriction)
+ #[structopt(long = "max-objects")]
+ pub max_objects: Option<String>,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub enum KeyOperation {
/// List keys
#[structopt(name = "list")]
@@ -406,6 +430,26 @@ pub enum RepairWhat {
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+pub struct OfflineRepairOpt {
+ /// Confirm the launch of the repair operation
+ #[structopt(long = "yes")]
+ pub yes: bool,
+
+ #[structopt(subcommand)]
+ pub what: OfflineRepairWhat,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum OfflineRepairWhat {
+ /// Repair K2V item counters
+ #[structopt(name = "k2v_item_counters")]
+ K2VItemCounters,
+ /// Repair object counters
+ #[structopt(name = "object_counters")]
+ ObjectCounters,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct StatsOpt {
/// Gather statistics from all nodes
#[structopt(short = "a", long = "all-nodes")]
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 6d73be3a..153e77a8 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -7,12 +7,13 @@ use garage_util::formater::format_table;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
+use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
-pub fn print_bucket_list(bl: Vec<Bucket>) {
+pub fn print_bucket_list(buckets: Vec<Bucket>, counters: HashMap<Uuid, HashMap<String, i64>>) {
println!("List of buckets:");
let mut table = vec![];
- for bucket in bl {
+ for bucket in buckets {
let aliases = bucket
.aliases()
.iter()
@@ -29,11 +30,24 @@ pub fn print_bucket_list(bl: Vec<Bucket>) {
[((k, n), _, _)] => format!("{}:{}", k, n),
s => format!("[{} local aliases]", s.len()),
};
+
+ let counters_tb = match counters.get(&bucket.id) {
+ Some(c) => format!(
+ "\t{}\t{}\t{}",
+ bytesize::ByteSize::b(c.get(BYTES).cloned().unwrap_or_default() as u64)
+ .to_string_as(true),
+ c.get(OBJECTS).cloned().unwrap_or_default(),
+ c.get(UNFINISHED_UPLOADS).cloned().unwrap_or_default(),
+ ),
+ None => "".into(),
+ };
+
table.push(format!(
- "\t{}\t{}\t{}",
+ "\t{}\t{}\t{}{}",
aliases.join(","),
local_aliases_n,
- hex::encode(bucket.id)
+ hex::encode(bucket.id),
+ counters_tb
));
}
format_table(table);
@@ -121,7 +135,11 @@ pub fn print_key_info(key: &Key, relevant_buckets: &HashMap<Uuid, Bucket>) {
}
}
-pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>) {
+pub fn print_bucket_info(
+ bucket: &Bucket,
+ relevant_keys: &HashMap<String, Key>,
+ counters: &HashMap<String, i64>,
+) {
let key_name = |k| {
relevant_keys
.get(k)
@@ -133,7 +151,42 @@ pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>)
match &bucket.state {
Deletable::Deleted => println!("Bucket is deleted."),
Deletable::Present(p) => {
- println!("Website access: {}", p.website_config.get().is_some());
+ let size =
+ bytesize::ByteSize::b(counters.get(BYTES).cloned().unwrap_or_default() as u64);
+ println!(
+ "\nSize: {} ({})",
+ size.to_string_as(true),
+ size.to_string_as(false)
+ );
+ println!(
+ "Objects: {}",
+ counters.get(OBJECTS).cloned().unwrap_or_default()
+ );
+ println!(
+ "Unfinished multipart uploads: {}",
+ counters
+ .get(UNFINISHED_UPLOADS)
+ .cloned()
+ .unwrap_or_default()
+ );
+
+ println!("\nWebsite access: {}", p.website_config.get().is_some());
+
+ let quotas = p.quotas.get();
+ if quotas.max_size.is_some() || quotas.max_objects.is_some() {
+ println!("\nQuotas:");
+ if let Some(ms) = quotas.max_size {
+ let ms = bytesize::ByteSize::b(ms);
+ println!(
+ " maximum size: {} ({})",
+ ms.to_string_as(true),
+ ms.to_string_as(false)
+ );
+ }
+ if let Some(mo) = quotas.max_objects {
+ println!(" maximum number of objects: {}", mo);
+ }
+ }
println!("\nGlobal aliases:");
for (alias, _, active) in p.aliases.items().iter() {
diff --git a/src/garage/main.rs b/src/garage/main.rs
index bd09b6ea..3fa5c3c0 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -61,17 +61,17 @@ async fn main() {
pretty_env_logger::init();
sodiumoxide::init().expect("Unable to init sodiumoxide");
- let opt = Opt::from_args();
+ // Abort on panic (same behavior as in Go)
+ std::panic::set_hook(Box::new(|panic_info| {
+ error!("{}", panic_info.to_string());
+ std::process::abort();
+ }));
+ let opt = Opt::from_args();
let res = match opt.cmd {
- Command::Server => {
- // Abort on panic (same behavior as in Go)
- std::panic::set_hook(Box::new(|panic_info| {
- error!("{}", panic_info.to_string());
- std::process::abort();
- }));
-
- server::run_server(opt.config_file).await
+ Command::Server => server::run_server(opt.config_file).await,
+ Command::OfflineRepair(repair_opt) => {
+ repair::offline::offline_repair(opt.config_file, repair_opt).await
}
Command::Node(NodeOperation::NodeId(node_id_opt)) => {
node_id_command(opt.config_file, node_id_opt.quiet)
diff --git a/src/garage/repair/mod.rs b/src/garage/repair/mod.rs
new file mode 100644
index 00000000..4699ace5
--- /dev/null
+++ b/src/garage/repair/mod.rs
@@ -0,0 +1,2 @@
+pub mod offline;
+pub mod online;
diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs
new file mode 100644
index 00000000..ef56cc5c
--- /dev/null
+++ b/src/garage/repair/offline.rs
@@ -0,0 +1,57 @@
+use std::path::PathBuf;
+
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::config::*;
+use garage_util::error::*;
+
+use garage_model::garage::Garage;
+
+use crate::cli::structs::*;
+
+pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Result<(), Error> {
+ if !opt.yes {
+ return Err(Error::Message(
+ "Please add the --yes flag to launch repair operation".into(),
+ ));
+ }
+
+ info!("Loading configuration...");
+ let config = read_config(config_file)?;
+
+ info!("Initializing background runner...");
+ let (done_tx, done_rx) = watch::channel(false);
+ let (background, await_background_done) = BackgroundRunner::new(16, done_rx);
+
+ info!("Initializing Garage main data store...");
+ let garage = Garage::new(config.clone(), background)?;
+
+ info!("Launching repair operation...");
+ match opt.what {
+ OfflineRepairWhat::K2VItemCounters => {
+ #[cfg(feature = "k2v")]
+ garage
+ .k2v
+ .counter_table
+ .offline_recount_all(&garage.k2v.item_table)?;
+ #[cfg(not(feature = "k2v"))]
+ error!("K2V not enabled in this build.");
+ }
+ OfflineRepairWhat::ObjectCounters => {
+ garage
+ .object_counter_table
+ .offline_recount_all(&garage.object_table)?;
+ }
+ }
+
+ info!("Repair operation finished, shutting down Garage internals...");
+ done_tx.send(true).unwrap();
+ drop(garage);
+
+ await_background_done.await?;
+
+ info!("Cleaning up...");
+
+ Ok(())
+}
diff --git a/src/garage/repair.rs b/src/garage/repair/online.rs
index 17e14b8b..d6a71742 100644
--- a/src/garage/repair.rs
+++ b/src/garage/repair/online.rs
@@ -11,11 +11,11 @@ use garage_util::error::Error;
use crate::*;
-pub struct Repair {
+pub struct OnlineRepair {
pub garage: Arc<Garage>,
}
-impl Repair {
+impl OnlineRepair {
pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) {
if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
warn!("Repair worker failed with error: {}", e);
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 7aa6185f..6321357a 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -2,8 +2,6 @@ use std::path::PathBuf;
use tokio::sync::watch;
-use garage_db as db;
-
use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
@@ -29,57 +27,14 @@ async fn wait_from(mut chan: watch::Receiver<bool>) {
pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Loading configuration...");
- let config = read_config(config_file).expect("Unable to read config file");
-
- info!("Opening database...");
- let mut db_path = config.metadata_dir.clone();
- std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory");
- let db = match config.db_engine.as_str() {
- "sled" => {
- db_path.push("db");
- info!("Opening Sled database at: {}", db_path.display());
- let db = db::sled_adapter::sled::Config::default()
- .path(&db_path)
- .cache_capacity(config.sled_cache_capacity)
- .flush_every_ms(Some(config.sled_flush_every_ms))
- .open()
- .expect("Unable to open sled DB");
- db::sled_adapter::SledDb::init(db)
- }
- "sqlite" | "sqlite3" | "rusqlite" => {
- db_path.push("db.sqlite");
- info!("Opening Sqlite database at: {}", db_path.display());
- let db = db::sqlite_adapter::rusqlite::Connection::open(db_path)
- .expect("Unable to open sqlite DB");
- db::sqlite_adapter::SqliteDb::init(db)
- }
- "lmdb" | "heed" => {
- db_path.push("db.lmdb");
- info!("Opening LMDB database at: {}", db_path.display());
- std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory");
- let map_size = garage_db::lmdb_adapter::recommended_map_size();
-
- let db = db::lmdb_adapter::heed::EnvOpenOptions::new()
- .max_dbs(100)
- .map_size(map_size)
- .open(&db_path)
- .expect("Unable to open LMDB DB");
- db::lmdb_adapter::LmdbDb::init(db)
- }
- e => {
- return Err(Error::Message(format!(
- "Unsupported DB engine: {} (options: sled, sqlite, lmdb)",
- e
- )));
- }
- };
+ let config = read_config(config_file)?;
info!("Initializing background runner...");
let watch_cancel = netapp::util::watch_ctrl_c();
let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
info!("Initializing Garage main data store...");
- let garage = Garage::new(config.clone(), db, background);
+ let garage = Garage::new(config.clone(), background)?;
info!("Initialize tracing...");
if let Some(export_to) = config.admin.trace_sink {
@@ -89,6 +44,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initialize Admin API server and metrics collector...");
let admin_server = AdminApiServer::new(garage.clone());
+ info!("Launching internal Garage cluster communications...");
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
info!("Create admin RPC handler...");