aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/Cargo.toml21
-rw-r--r--src/garage/admin.rs321
-rw-r--r--src/garage/cli/cmd.rs18
-rw-r--r--src/garage/cli/structs.rs53
-rw-r--r--src/garage/cli/util.rs163
-rw-r--r--src/garage/main.rs36
-rw-r--r--src/garage/repair/offline.rs17
-rw-r--r--src/garage/repair/online.rs93
-rw-r--r--src/garage/server.rs11
-rw-r--r--src/garage/tests/bucket.rs22
-rw-r--r--src/garage/tests/s3/streaming_signature.rs8
11 files changed, 581 insertions, 182 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index f9d7cf3a..b43b0242 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage"
-version = "0.8.0"
+version = "0.8.1"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -21,14 +21,14 @@ path = "tests/lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_db = { version = "0.8.0", path = "../db" }
-garage_api = { version = "0.8.0", path = "../api" }
-garage_block = { version = "0.8.0", path = "../block" }
-garage_model = { version = "0.8.0", path = "../model" }
-garage_rpc = { version = "0.8.0", path = "../rpc" }
-garage_table = { version = "0.8.0", path = "../table" }
-garage_util = { version = "0.8.0", path = "../util" }
-garage_web = { version = "0.8.0", path = "../web" }
+garage_db = { version = "0.8.1", path = "../db" }
+garage_api = { version = "0.8.1", path = "../api" }
+garage_block = { version = "0.8.1", path = "../block" }
+garage_model = { version = "0.8.1", path = "../model" }
+garage_rpc = { version = "0.8.1", path = "../rpc" }
+garage_table = { version = "0.8.1", path = "../table" }
+garage_util = { version = "0.8.1", path = "../util" }
+garage_web = { version = "0.8.1", path = "../web" }
backtrace = "0.3"
bytes = "1.0"
@@ -42,7 +42,6 @@ rand = "0.8"
async-trait = "0.1.7"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
-rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
structopt = { version = "0.3", default-features = false }
@@ -74,7 +73,7 @@ base64 = "0.13"
[features]
-default = [ "bundled-libs", "metrics", "sled" ]
+default = [ "bundled-libs", "metrics", "sled", "k2v" ]
k2v = [ "garage_util/k2v", "garage_api/k2v" ]
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index e973cfe7..c669b5e6 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -5,9 +5,11 @@ use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
+use garage_util::background::BackgroundRunner;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
+use garage_util::formater::format_table_to_string;
use garage_util::time::*;
use garage_table::replication::*;
@@ -15,6 +17,7 @@ use garage_table::*;
use garage_rpc::*;
+use garage_block::manager::BlockResyncErrorInfo;
use garage_block::repair::ScrubWorkerCommand;
use garage_model::bucket_alias_table::*;
@@ -24,6 +27,8 @@ use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*;
use garage_model::migrate::Migrate;
use garage_model::permission::*;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::Version;
use crate::cli::*;
use crate::repair::online::launch_online_repair;
@@ -38,7 +43,8 @@ pub enum AdminRpc {
LaunchRepair(RepairOpt),
Migrate(MigrateOpt),
Stats(StatsOpt),
- Worker(WorkerOpt),
+ Worker(WorkerOperation),
+ BlockOperation(BlockOperation),
// Replies
Ok(String),
@@ -54,6 +60,13 @@ pub enum AdminRpc {
HashMap<usize, garage_util::background::WorkerInfo>,
WorkerListOpt,
),
+ WorkerInfo(usize, garage_util::background::WorkerInfo),
+ BlockErrorList(Vec<BlockResyncErrorInfo>),
+ BlockInfo {
+ hash: Hash,
+ refcount: u64,
+ versions: Vec<Result<Version, Uuid>>,
+ },
}
impl Rpc for AdminRpc {
@@ -62,17 +75,24 @@ impl Rpc for AdminRpc {
pub struct AdminRpcHandler {
garage: Arc<Garage>,
+ background: Arc<BackgroundRunner>,
endpoint: Arc<Endpoint<AdminRpc, Self>>,
}
impl AdminRpcHandler {
- pub fn new(garage: Arc<Garage>) -> Arc<Self> {
+ pub fn new(garage: Arc<Garage>, background: Arc<BackgroundRunner>) -> Arc<Self> {
let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
- let admin = Arc::new(Self { garage, endpoint });
+ let admin = Arc::new(Self {
+ garage,
+ background,
+ endpoint,
+ });
admin.endpoint.set_handler(admin.clone());
admin
}
+ // ================ BUCKET COMMANDS ====================
+
async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
match cmd {
BucketOperation::List => self.handle_list_buckets().await,
@@ -551,6 +571,8 @@ impl AdminRpcHandler {
Ok(AdminRpc::Ok(ret))
}
+ // ================ KEY COMMANDS ====================
+
async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
match cmd {
KeyOperation::List => self.handle_list_keys().await,
@@ -688,6 +710,8 @@ impl AdminRpcHandler {
Ok(AdminRpc::KeyInfo(key, relevant_buckets))
}
+ // ================ MIGRATION COMMANDS ====================
+
async fn handle_migrate(self: &Arc<Self>, opt: MigrateOpt) -> Result<AdminRpc, Error> {
if !opt.yes {
return Err(Error::BadRequest(
@@ -704,6 +728,8 @@ impl AdminRpcHandler {
Ok(AdminRpc::Ok("Migration successfull.".into()))
}
+ // ================ REPAIR COMMANDS ====================
+
async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
if !opt.yes {
return Err(Error::BadRequest(
@@ -739,7 +765,7 @@ impl AdminRpcHandler {
)))
}
} else {
- launch_online_repair(self.garage.clone(), opt).await;
+ launch_online_repair(&self.garage, &self.background, opt).await?;
Ok(AdminRpc::Ok(format!(
"Repair launched on {:?}",
self.garage.system.id
@@ -747,6 +773,8 @@ impl AdminRpcHandler {
}
}
+ // ================ STATS COMMANDS ====================
+
async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
if opt.all_nodes {
let mut ret = String::new();
@@ -763,11 +791,12 @@ impl AdminRpcHandler {
match self
.endpoint
.call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL)
- .await?
+ .await
{
- Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(),
- Ok(x) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(),
- Err(e) => writeln!(&mut ret, "Error: {}", e).unwrap(),
+ Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(),
+ Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(),
+ Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(),
+ Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(),
}
}
Ok(AdminRpc::Ok(ret))
@@ -787,6 +816,7 @@ impl AdminRpcHandler {
.unwrap_or_else(|| "(unknown)".into()),
)
.unwrap();
+
writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
// Gather ring statistics
@@ -805,21 +835,38 @@ impl AdminRpcHandler {
writeln!(&mut ret, " {:?} {}", n, c).unwrap();
}
- self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt)?;
- self.gather_table_stats(&mut ret, &self.garage.key_table, &opt)?;
- self.gather_table_stats(&mut ret, &self.garage.object_table, &opt)?;
- self.gather_table_stats(&mut ret, &self.garage.version_table, &opt)?;
- self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt)?;
+ // Gather table statistics
+ let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
+ table.push(self.gather_table_stats(&self.garage.bucket_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.key_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.object_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.version_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.block_ref_table, opt.detailed)?);
+ write!(
+ &mut ret,
+ "\nTable stats:\n{}",
+ format_table_to_string(table)
+ )
+ .unwrap();
+ // Gather block manager statistics
writeln!(&mut ret, "\nBlock manager stats:").unwrap();
- if opt.detailed {
- writeln!(
- &mut ret,
- " number of RC entries (~= number of blocks): {}",
- self.garage.block_manager.rc_len()?
- )
- .unwrap();
- }
+ let rc_len = if opt.detailed {
+ self.garage.block_manager.rc_len()?.to_string()
+ } else {
+ self.garage
+ .block_manager
+ .rc_fast_len()?
+ .map(|x| x.to_string())
+ .unwrap_or_else(|| "NC".into())
+ };
+
+ writeln!(
+ &mut ret,
+ " number of RC entries (~= number of blocks): {}",
+ rc_len
+ )
+ .unwrap();
writeln!(
&mut ret,
" resync queue length: {}",
@@ -833,67 +880,83 @@ impl AdminRpcHandler {
)
.unwrap();
+ if !opt.detailed {
+ writeln!(&mut ret, "\nIf values are missing (marked as NC), consider adding the --detailed flag - this will be slow.").unwrap();
+ }
+
Ok(ret)
}
fn gather_table_stats<F, R>(
&self,
- to: &mut String,
t: &Arc<Table<F, R>>,
- opt: &StatsOpt,
- ) -> Result<(), Error>
+ detailed: bool,
+ ) -> Result<String, Error>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- writeln!(to, "\nTable stats for {}", F::TABLE_NAME).unwrap();
- if opt.detailed {
- writeln!(
- to,
- " number of items: {}",
- t.data.store.len().map_err(GarageError::from)?
+ let (data_len, mkl_len) = if detailed {
+ (
+ t.data.store.len().map_err(GarageError::from)?.to_string(),
+ t.merkle_updater.merkle_tree_len()?.to_string(),
)
- .unwrap();
- writeln!(
- to,
- " Merkle tree size: {}",
- t.merkle_updater.merkle_tree_len()?
+ } else {
+ (
+ t.data
+ .store
+ .fast_len()
+ .map_err(GarageError::from)?
+ .map(|x| x.to_string())
+ .unwrap_or_else(|| "NC".into()),
+ t.merkle_updater
+ .merkle_tree_fast_len()?
+ .map(|x| x.to_string())
+ .unwrap_or_else(|| "NC".into()),
)
- .unwrap();
- }
- writeln!(
- to,
- " Merkle updater todo queue length: {}",
- t.merkle_updater.todo_len()?
- )
- .unwrap();
- writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()?).unwrap();
+ };
- Ok(())
+ Ok(format!(
+ " {}\t{}\t{}\t{}\t{}",
+ F::TABLE_NAME,
+ data_len,
+ mkl_len,
+ t.merkle_updater.todo_len()?,
+ t.data.gc_todo_len()?
+ ))
}
- // ----
+ // ================ WORKER COMMANDS ====================
- async fn handle_worker_cmd(&self, opt: WorkerOpt) -> Result<AdminRpc, Error> {
- match opt.cmd {
- WorkerCmd::List { opt } => {
- let workers = self.garage.background.get_worker_info();
- Ok(AdminRpc::WorkerList(workers, opt))
+ async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ WorkerOperation::List { opt } => {
+ let workers = self.background.get_worker_info();
+ Ok(AdminRpc::WorkerList(workers, *opt))
+ }
+ WorkerOperation::Info { tid } => {
+ let info = self
+ .background
+ .get_worker_info()
+ .get(tid)
+ .ok_or_bad_request(format!("No worker with TID {}", tid))?
+ .clone();
+ Ok(AdminRpc::WorkerInfo(*tid, info))
}
- WorkerCmd::Set { opt } => match opt {
+ WorkerOperation::Set { opt } => match opt {
WorkerSetCmd::ScrubTranquility { tranquility } => {
- let scrub_command = ScrubWorkerCommand::SetTranquility(tranquility);
+ let scrub_command = ScrubWorkerCommand::SetTranquility(*tranquility);
self.garage
.block_manager
.send_scrub_command(scrub_command)
- .await;
+ .await?;
Ok(AdminRpc::Ok("Scrub tranquility updated".into()))
}
- WorkerSetCmd::ResyncNWorkers { n_workers } => {
+ WorkerSetCmd::ResyncWorkerCount { worker_count } => {
self.garage
.block_manager
.resync
- .set_n_workers(n_workers)
+ .set_n_workers(*worker_count)
.await?;
Ok(AdminRpc::Ok("Number of resync workers updated".into()))
}
@@ -901,13 +964,154 @@ impl AdminRpcHandler {
self.garage
.block_manager
.resync
- .set_tranquility(tranquility)
+ .set_tranquility(*tranquility)
.await?;
Ok(AdminRpc::Ok("Resync tranquility updated".into()))
}
},
}
}
+
+ // ================ BLOCK COMMANDS ====================
+
+ async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList(
+ self.garage.block_manager.list_resync_errors()?,
+ )),
+ BlockOperation::Info { hash } => {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let refcount = self.garage.block_manager.get_block_rc(&hash)?;
+ let block_refs = self
+ .garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+ let mut versions = vec![];
+ for br in block_refs {
+ if let Some(v) = self
+ .garage
+ .version_table
+ .get(&br.version, &EmptyKey)
+ .await?
+ {
+ versions.push(Ok(v));
+ } else {
+ versions.push(Err(br.version));
+ }
+ }
+ Ok(AdminRpc::BlockInfo {
+ hash,
+ refcount,
+ versions,
+ })
+ }
+ BlockOperation::RetryNow { all, blocks } => {
+ if *all {
+ if !blocks.is_empty() {
+ return Err(Error::BadRequest(
+ "--all was specified, cannot also specify blocks".into(),
+ ));
+ }
+ let blocks = self.garage.block_manager.list_resync_errors()?;
+ for b in blocks.iter() {
+ self.garage.block_manager.resync.clear_backoff(&b.hash)?;
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks returned in queue for a retry now (check logs to see results)",
+ blocks.len()
+ )))
+ } else {
+ for hash in blocks {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ self.garage.block_manager.resync.clear_backoff(&hash)?;
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks returned in queue for a retry now (check logs to see results)",
+ blocks.len()
+ )))
+ }
+ }
+ BlockOperation::Purge { yes, blocks } => {
+ if !yes {
+ return Err(Error::BadRequest(
+ "Pass the --yes flag to confirm block purge operation.".into(),
+ ));
+ }
+
+ let mut obj_dels = 0;
+ let mut ver_dels = 0;
+
+ for hash in blocks {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let block_refs = self
+ .garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+
+ for br in block_refs {
+ let version = match self
+ .garage
+ .version_table
+ .get(&br.version, &EmptyKey)
+ .await?
+ {
+ Some(v) => v,
+ None => continue,
+ };
+
+ if let Some(object) = self
+ .garage
+ .object_table
+ .get(&version.bucket_id, &version.key)
+ .await?
+ {
+ let ov = object.versions().iter().rev().find(|v| v.is_complete());
+ if let Some(ov) = ov {
+ if ov.uuid == br.version {
+ let del_uuid = gen_uuid();
+ let deleted_object = Object::new(
+ version.bucket_id,
+ version.key.clone(),
+ vec![ObjectVersion {
+ uuid: del_uuid,
+ timestamp: ov.timestamp + 1,
+ state: ObjectVersionState::Complete(
+ ObjectVersionData::DeleteMarker,
+ ),
+ }],
+ );
+ self.garage.object_table.insert(&deleted_object).await?;
+ obj_dels += 1;
+ }
+ }
+ }
+
+ if !version.deleted.get() {
+ let deleted_version = Version::new(
+ version.uuid,
+ version.bucket_id,
+ version.key.clone(),
+ true,
+ );
+ self.garage.version_table.insert(&deleted_version).await?;
+ ver_dels += 1;
+ }
+ }
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks were purged: {} object deletion markers added, {} versions marked deleted",
+ blocks.len(),
+ obj_dels,
+ ver_dels
+ )))
+ }
+ }
+ }
}
#[async_trait]
@@ -923,7 +1127,8 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await,
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
- AdminRpc::Worker(opt) => self.handle_worker_cmd(opt.clone()).await,
+ AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
+ AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
m => Err(GarageError::unexpected_rpc_message(m).into()),
}
}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index e352ddf2..0d180ecd 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -41,6 +41,9 @@ pub async fn cli_command_dispatch(
}
Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await,
Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await,
+ Command::Block(bo) => {
+ cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BlockOperation(bo)).await
+ }
_ => unreachable!(),
}
}
@@ -186,7 +189,20 @@ pub async fn cmd_admin(
print_key_info(&key, &rb);
}
AdminRpc::WorkerList(wi, wlo) => {
- print_worker_info(wi, wlo);
+ print_worker_list(wi, wlo);
+ }
+ AdminRpc::WorkerInfo(tid, wi) => {
+ print_worker_info(tid, wi);
+ }
+ AdminRpc::BlockErrorList(el) => {
+ print_block_error_list(el);
+ }
+ AdminRpc::BlockInfo {
+ hash,
+ refcount,
+ versions,
+ } => {
+ print_block_info(hash, refcount, versions);
}
r => {
error!("Unexpected response: {:?}", r);
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 49a1f267..825c1813 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -49,7 +49,11 @@ pub enum Command {
/// Manage background workers
#[structopt(name = "worker", version = garage_version())]
- Worker(WorkerOpt),
+ Worker(WorkerOperation),
+
+ /// Low-level debug operations on data blocks
+ #[structopt(name = "block", version = garage_version())]
+ Block(BlockOperation),
}
#[derive(StructOpt, Debug)]
@@ -513,20 +517,17 @@ pub struct StatsOpt {
pub detailed: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
-pub struct WorkerOpt {
- #[structopt(subcommand)]
- pub cmd: WorkerCmd,
-}
-
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
-pub enum WorkerCmd {
+pub enum WorkerOperation {
/// List all workers on Garage node
#[structopt(name = "list", version = garage_version())]
List {
#[structopt(flatten)]
opt: WorkerListOpt,
},
+ /// Get detailed information about a worker
+ #[structopt(name = "info", version = garage_version())]
+ Info { tid: usize },
/// Set worker parameter
#[structopt(name = "set", version = garage_version())]
Set {
@@ -551,9 +552,41 @@ pub enum WorkerSetCmd {
#[structopt(name = "scrub-tranquility", version = garage_version())]
ScrubTranquility { tranquility: u32 },
/// Set number of concurrent block resync workers
- #[structopt(name = "resync-n-workers", version = garage_version())]
- ResyncNWorkers { n_workers: usize },
+ #[structopt(name = "resync-worker-count", version = garage_version())]
+ ResyncWorkerCount { worker_count: usize },
/// Set tranquility of block resync operations
#[structopt(name = "resync-tranquility", version = garage_version())]
ResyncTranquility { tranquility: u32 },
}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum BlockOperation {
+ /// List all blocks that currently have a resync error
+ #[structopt(name = "list-errors", version = garage_version())]
+ ListErrors,
+ /// Get detailed information about a single block
+ #[structopt(name = "info", version = garage_version())]
+ Info {
+ /// Hash of the block for which to retrieve information
+ hash: String,
+ },
+ /// Retry now the resync of one or many blocks
+ #[structopt(name = "retry-now", version = garage_version())]
+ RetryNow {
+ /// Retry all blocks that have a resync error
+ #[structopt(long = "all")]
+ all: bool,
+ /// Hashes of the block to retry to resync now
+ blocks: Vec<String>,
+ },
+ /// Delete all objects referencing a missing block
+ #[structopt(name = "purge", version = garage_version())]
+ Purge {
+ /// Mandatory to confirm this operation
+ #[structopt(long = "yes")]
+ yes: bool,
+ /// Hashes of the block to purge
+ #[structopt(required = true)]
+ blocks: Vec<String>,
+ },
+}
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 396938ae..63fd9eba 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -3,14 +3,17 @@ use std::time::Duration;
use garage_util::background::*;
use garage_util::crdt::*;
-use garage_util::data::Uuid;
+use garage_util::data::*;
use garage_util::error::*;
use garage_util::formater::format_table;
use garage_util::time::*;
+use garage_block::manager::BlockResyncErrorInfo;
+
use garage_model::bucket_table::*;
use garage_model::key_table::*;
use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
+use garage_model::s3::version_table::Version;
use crate::cli::structs::WorkerListOpt;
@@ -241,7 +244,7 @@ pub fn find_matching_node(
}
}
-pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
+pub fn print_worker_list(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
let mut wi = wi.into_iter().collect::<Vec<_>>();
wi.sort_by_key(|(tid, info)| {
(
@@ -254,7 +257,7 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
)
});
- let mut table = vec![];
+ let mut table = vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()];
for (tid, info) in wi.iter() {
if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) {
continue;
@@ -263,33 +266,147 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
continue;
}
- table.push(format!("{}\t{}\t{}", tid, info.state, info.name));
- if let Some(i) = &info.info {
- table.push(format!("\t\t {}", i));
- }
let tf = timeago::Formatter::new();
- let (err_ago, err_msg) = info
+ let err_ago = info
.last_error
.as_ref()
- .map(|(m, t)| {
- (
- tf.convert(Duration::from_millis(now_msec() - t)),
- m.as_str(),
- )
- })
- .unwrap_or(("(?) ago".into(), "(?)"));
- if info.consecutive_errors > 0 {
+ .map(|(_, t)| tf.convert(Duration::from_millis(now_msec() - t)))
+ .unwrap_or_default();
+ let (total_err, consec_err) = if info.errors > 0 {
+ (info.errors.to_string(), info.consecutive_errors.to_string())
+ } else {
+ ("-".into(), "-".into())
+ };
+
+ table.push(format!(
+ "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
+ tid,
+ info.state,
+ info.name,
+ info.status
+ .tranquility
+ .as_ref()
+ .map(ToString::to_string)
+ .unwrap_or_else(|| "-".into()),
+ info.status.progress.as_deref().unwrap_or("-"),
+ info.status
+ .queue_length
+ .as_ref()
+ .map(ToString::to_string)
+ .unwrap_or_else(|| "-".into()),
+ total_err,
+ consec_err,
+ err_ago,
+ ));
+ }
+ format_table(table);
+}
+
+pub fn print_worker_info(tid: usize, info: WorkerInfo) {
+ let mut table = vec![];
+ table.push(format!("Task id:\t{}", tid));
+ table.push(format!("Worker name:\t{}", info.name));
+ match info.state {
+ WorkerState::Throttled(t) => {
table.push(format!(
- "\t\t {} consecutive errors ({} total), last {}",
- info.consecutive_errors, info.errors, err_ago,
+ "Worker state:\tBusy (throttled, paused for {:.3}s)",
+ t
));
- table.push(format!("\t\t {}", err_msg));
- } else if info.errors > 0 {
- table.push(format!("\t\t ({} errors, last {})", info.errors, err_ago,));
- if wlo.errors {
- table.push(format!("\t\t {}", err_msg));
+ }
+ s => {
+ table.push(format!("Worker state:\t{}", s));
+ }
+ };
+ if let Some(tql) = info.status.tranquility {
+ table.push(format!("Tranquility:\t{}", tql));
+ }
+
+ table.push("".into());
+ table.push(format!("Total errors:\t{}", info.errors));
+ table.push(format!("Consecutive errs:\t{}", info.consecutive_errors));
+ if let Some((s, t)) = info.last_error {
+ table.push(format!("Last error:\t{}", s));
+ let tf = timeago::Formatter::new();
+ table.push(format!(
+ "Last error time:\t{}",
+ tf.convert(Duration::from_millis(now_msec() - t))
+ ));
+ }
+
+ table.push("".into());
+ if let Some(p) = info.status.progress {
+ table.push(format!("Progress:\t{}", p));
+ }
+ if let Some(ql) = info.status.queue_length {
+ table.push(format!("Queue length:\t{}", ql));
+ }
+ if let Some(pe) = info.status.persistent_errors {
+ table.push(format!("Persistent errors:\t{}", pe));
+ }
+
+ for (i, s) in info.status.freeform.iter().enumerate() {
+ if i == 0 {
+ if table.last() != Some(&"".into()) {
+ table.push("".into());
}
+ table.push(format!("Message:\t{}", s));
+ } else {
+ table.push(format!("\t{}", s));
}
}
format_table(table);
}
+
+pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
+ let now = now_msec();
+ let tf = timeago::Formatter::new();
+ let mut tf2 = timeago::Formatter::new();
+ tf2.ago("");
+
+ let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()];
+ for e in el {
+ table.push(format!(
+ "{}\t{}\t{}\t{}\tin {}",
+ hex::encode(e.hash.as_slice()),
+ e.refcount,
+ e.error_count,
+ tf.convert(Duration::from_millis(now - e.last_try)),
+ tf2.convert(Duration::from_millis(e.next_try - now))
+ ));
+ }
+ format_table(table);
+}
+
+pub fn print_block_info(hash: Hash, refcount: u64, versions: Vec<Result<Version, Uuid>>) {
+ println!("Block hash: {}", hex::encode(hash.as_slice()));
+ println!("Refcount: {}", refcount);
+ println!();
+
+ let mut table = vec!["Version\tBucket\tKey\tDeleted".into()];
+ let mut nondeleted_count = 0;
+ for v in versions.iter() {
+ match v {
+ Ok(ver) => {
+ table.push(format!(
+ "{:?}\t{:?}\t{}\t{:?}",
+ ver.uuid,
+ ver.bucket_id,
+ ver.key,
+ ver.deleted.get()
+ ));
+ if !ver.deleted.get() {
+ nondeleted_count += 1;
+ }
+ }
+ Err(vh) => {
+ table.push(format!("{:?}\t\t\tyes", vh));
+ }
+ }
+ }
+ format_table(table);
+
+ if refcount != nondeleted_count {
+ println!();
+ println!("Warning: refcount does not match number of non-deleted versions");
+ }
+}
diff --git a/src/garage/main.rs b/src/garage/main.rs
index 8e64273f..cd1d6228 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -130,9 +130,16 @@ async fn main() {
std::process::abort();
}));
+ // Parse arguments and dispatch command line
+ let opt = Opt::from_clap(&Opt::clap().version(version.as_str()).get_matches());
+
// Initialize logging as well as other libraries used in Garage
if std::env::var("RUST_LOG").is_err() {
- std::env::set_var("RUST_LOG", "netapp=info,garage=info")
+ let default_log = match &opt.cmd {
+ Command::Server => "netapp=info,garage=info",
+ _ => "netapp=warn,garage=warn",
+ };
+ std::env::set_var("RUST_LOG", default_log)
}
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
@@ -140,9 +147,6 @@ async fn main() {
.init();
sodiumoxide::init().expect("Unable to init sodiumoxide");
- // Parse arguments and dispatch command line
- let opt = Opt::from_clap(&Opt::clap().version(version.as_str()).get_matches());
-
let res = match opt.cmd {
Command::Server => server::run_server(opt.config_file).await,
Command::OfflineRepair(repair_opt) => {
@@ -185,9 +189,9 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk);
// Find and parse the address of the target host
- let (id, addr) = if let Some(h) = opt.rpc_host {
+ let (id, addr, is_default_addr) = if let Some(h) = opt.rpc_host {
let (id, addrs) = parse_and_resolve_peer_addr(&h).ok_or_else(|| format!("Invalid RPC remote node identifier: {}. Expected format is <pubkey>@<IP or hostname>:<port>.", h))?;
- (id, addrs[0])
+ (id, addrs[0], false)
} else {
let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir)
.err_context(READ_KEY_ERROR)?;
@@ -198,24 +202,26 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
.ok_or_message("unable to resolve rpc_public_addr specified in config file")?
.next()
.ok_or_message("unable to resolve rpc_public_addr specified in config file")?;
- (node_id, a)
+ (node_id, a, false)
} else {
let default_addr = SocketAddr::new(
"127.0.0.1".parse().unwrap(),
config.as_ref().unwrap().rpc_bind_addr.port(),
);
- warn!(
- "Trying to contact Garage node at default address {}",
- default_addr
- );
- warn!("If this doesn't work, consider adding rpc_public_addr in your config file or specifying the -h command line parameter.");
- (node_id, default_addr)
+ (node_id, default_addr, true)
}
};
// Connect to target host
- netapp.clone().try_connect(addr, id).await
- .err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct public key.")?;
+ if let Err(e) = netapp.clone().try_connect(addr, id).await {
+ if is_default_addr {
+ warn!(
+ "Tried to contact Garage node at default address {}, which didn't work. If that address is wrong, consider setting rpc_public_addr in your config file.",
+ addr
+ );
+ }
+ Err(e).err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct public key.")?;
+ }
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs
index 7760a8bd..25193e4a 100644
--- a/src/garage/repair/offline.rs
+++ b/src/garage/repair/offline.rs
@@ -1,8 +1,5 @@
use std::path::PathBuf;
-use tokio::sync::watch;
-
-use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::*;
@@ -20,12 +17,8 @@ pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Resu
info!("Loading configuration...");
let config = read_config(config_file)?;
- info!("Initializing background runner...");
- let (done_tx, done_rx) = watch::channel(false);
- let (background, await_background_done) = BackgroundRunner::new(16, done_rx);
-
info!("Initializing Garage main data store...");
- let garage = Garage::new(config.clone(), background)?;
+ let garage = Garage::new(config)?;
info!("Launching repair operation...");
match opt.what {
@@ -43,13 +36,7 @@ pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Resu
}
}
- info!("Repair operation finished, shutting down Garage internals...");
- done_tx.send(true).unwrap();
- drop(garage);
-
- await_background_done.await?;
-
- info!("Cleaning up...");
+ info!("Repair operation finished, shutting down...");
Ok(())
}
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index e33cf097..627e3bf3 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -12,38 +12,37 @@ use garage_model::s3::version_table::*;
use garage_table::*;
use garage_util::background::*;
use garage_util::error::Error;
+use garage_util::migrate::Migrate;
use crate::*;
-pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
+pub async fn launch_online_repair(
+ garage: &Arc<Garage>,
+ bg: &BackgroundRunner,
+ opt: RepairOpt,
+) -> Result<(), Error> {
match opt.what {
RepairWhat::Tables => {
info!("Launching a full sync of tables");
- garage.bucket_table.syncer.add_full_sync();
- garage.object_table.syncer.add_full_sync();
- garage.version_table.syncer.add_full_sync();
- garage.block_ref_table.syncer.add_full_sync();
- garage.key_table.syncer.add_full_sync();
+ garage.bucket_table.syncer.add_full_sync()?;
+ garage.object_table.syncer.add_full_sync()?;
+ garage.version_table.syncer.add_full_sync()?;
+ garage.block_ref_table.syncer.add_full_sync()?;
+ garage.key_table.syncer.add_full_sync()?;
}
RepairWhat::Versions => {
info!("Repairing the versions table");
- garage
- .background
- .spawn_worker(RepairVersionsWorker::new(garage.clone()));
+ bg.spawn_worker(RepairVersionsWorker::new(garage.clone()));
}
RepairWhat::BlockRefs => {
info!("Repairing the block refs table");
- garage
- .background
- .spawn_worker(RepairBlockrefsWorker::new(garage.clone()));
+ bg.spawn_worker(RepairBlockrefsWorker::new(garage.clone()));
}
RepairWhat::Blocks => {
info!("Repairing the stored blocks");
- garage
- .background
- .spawn_worker(garage_block::repair::RepairWorker::new(
- garage.block_manager.clone(),
- ));
+ bg.spawn_worker(garage_block::repair::RepairWorker::new(
+ garage.block_manager.clone(),
+ ));
}
RepairWhat::Scrub { cmd } => {
let cmd = match cmd {
@@ -56,9 +55,10 @@ pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
}
};
info!("Sending command to scrub worker: {:?}", cmd);
- garage.block_manager.send_scrub_command(cmd).await;
+ garage.block_manager.send_scrub_command(cmd).await?;
}
}
+ Ok(())
}
// ----
@@ -85,25 +85,23 @@ impl Worker for RepairVersionsWorker {
"Version repair worker".into()
}
- fn info(&self) -> Option<String> {
- Some(format!("{} items done", self.counter))
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ progress: Some(self.counter.to_string()),
+ ..Default::default()
+ }
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- let item_bytes = match self.garage.version_table.data.store.get_gt(&self.pos)? {
- Some((k, v)) => {
- self.pos = k;
- v
- }
+ let (item_bytes, next_pos) = match self.garage.version_table.data.store.get_gt(&self.pos)? {
+ Some((k, v)) => (v, k),
None => {
info!("repair_versions: finished, done {}", self.counter);
return Ok(WorkerState::Done);
}
};
- self.counter += 1;
-
- let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
+ let version = Version::decode(&item_bytes).ok_or_message("Cannot decode Version")?;
if !version.deleted.get() {
let object = self
.garage
@@ -131,10 +129,13 @@ impl Worker for RepairVersionsWorker {
}
}
+ self.counter += 1;
+ self.pos = next_pos;
+
Ok(WorkerState::Busy)
}
- async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ async fn wait_for_work(&mut self) -> WorkerState {
unreachable!()
}
}
@@ -163,25 +164,24 @@ impl Worker for RepairBlockrefsWorker {
"Block refs repair worker".into()
}
- fn info(&self) -> Option<String> {
- Some(format!("{} items done", self.counter))
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ progress: Some(self.counter.to_string()),
+ ..Default::default()
+ }
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- let item_bytes = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? {
- Some((k, v)) => {
- self.pos = k;
- v
- }
- None => {
- info!("repair_block_ref: finished, done {}", self.counter);
- return Ok(WorkerState::Done);
- }
- };
-
- self.counter += 1;
+ let (item_bytes, next_pos) =
+ match self.garage.block_ref_table.data.store.get_gt(&self.pos)? {
+ Some((k, v)) => (v, k),
+ None => {
+ info!("repair_block_ref: finished, done {}", self.counter);
+ return Ok(WorkerState::Done);
+ }
+ };
- let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
+ let block_ref = BlockRef::decode(&item_bytes).ok_or_message("Cannot decode BlockRef")?;
if !block_ref.deleted.get() {
let version = self
.garage
@@ -206,10 +206,13 @@ impl Worker for RepairBlockrefsWorker {
}
}
+ self.counter += 1;
+ self.pos = next_pos;
+
Ok(WorkerState::Busy)
}
- async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ async fn wait_for_work(&mut self) -> WorkerState {
unreachable!()
}
}
diff --git a/src/garage/server.rs b/src/garage/server.rs
index d4099a97..16f1b625 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -35,12 +35,15 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
#[cfg(feature = "metrics")]
let metrics_exporter = opentelemetry_prometheus::exporter().init();
+ info!("Initializing Garage main data store...");
+ let garage = Garage::new(config.clone())?;
+
info!("Initializing background runner...");
let watch_cancel = watch_shutdown_signal();
- let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
+ let (background, await_background_done) = BackgroundRunner::new(watch_cancel.clone());
- info!("Initializing Garage main data store...");
- let garage = Garage::new(config.clone(), background)?;
+ info!("Spawning Garage workers...");
+ garage.spawn_workers(&background);
if config.admin.trace_sink.is_some() {
info!("Initialize tracing...");
@@ -63,7 +66,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
info!("Create admin RPC handler...");
- AdminRpcHandler::new(garage.clone());
+ AdminRpcHandler::new(garage.clone(), background.clone());
// ---- Launch public-facing API servers ----
diff --git a/src/garage/tests/bucket.rs b/src/garage/tests/bucket.rs
index b32af068..9c363013 100644
--- a/src/garage/tests/bucket.rs
+++ b/src/garage/tests/bucket.rs
@@ -1,4 +1,5 @@
use crate::common;
+use crate::common::ext::CommandExt;
use aws_sdk_s3::model::BucketLocationConstraint;
use aws_sdk_s3::output::DeleteBucketOutput;
@@ -8,6 +9,27 @@ async fn test_bucket_all() {
let bucket_name = "hello";
{
+ // Check bucket cannot be created if not authorized
+ ctx.garage
+ .command()
+ .args(["key", "deny"])
+ .args(["--create-bucket", &ctx.garage.key.id])
+ .quiet()
+ .expect_success_output("Could not deny key to create buckets");
+
+ // Try create bucket, should fail
+ let r = ctx.client.create_bucket().bucket(bucket_name).send().await;
+ assert!(r.is_err());
+ }
+ {
+ // Now allow key to create bucket
+ ctx.garage
+ .command()
+ .args(["key", "allow"])
+ .args(["--create-bucket", &ctx.garage.key.id])
+ .quiet()
+ .expect_success_output("Could not deny key to create buckets");
+
// Create bucket
//@TODO check with an invalid bucket name + with an already existing bucket
let r = ctx
diff --git a/src/garage/tests/s3/streaming_signature.rs b/src/garage/tests/s3/streaming_signature.rs
index c68f7dfc..48da7607 100644
--- a/src/garage/tests/s3/streaming_signature.rs
+++ b/src/garage/tests/s3/streaming_signature.rs
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use crate::common;
+use crate::common::ext::CommandExt;
use common::custom_requester::BodySignature;
use hyper::Method;
@@ -105,6 +106,13 @@ async fn test_create_bucket_streaming() {
let ctx = common::context();
let bucket = "createbucket-streaming";
+ ctx.garage
+ .command()
+ .args(["key", "allow"])
+ .args(["--create-bucket", &ctx.garage.key.id])
+ .quiet()
+ .expect_success_output("Could not allow key to create buckets");
+
{
// create bucket
let _ = ctx