aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/admin_rpc.rs17
-rw-r--r--src/api_server.rs6
-rw-r--r--src/background.rs8
-rw-r--r--src/block.rs20
-rw-r--r--src/main.rs13
-rw-r--r--src/membership.rs14
-rw-r--r--src/rpc_client.rs2
-rw-r--r--src/rpc_server.rs19
-rw-r--r--src/server.rs42
-rw-r--r--src/table.rs4
-rw-r--r--src/table_sync.rs20
11 files changed, 94 insertions, 71 deletions
diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs
index 125f897e..0e9d2338 100644
--- a/src/admin_rpc.rs
+++ b/src/admin_rpc.rs
@@ -222,12 +222,15 @@ impl AdminRpcHandler {
let version_exists = match object {
Some(o) => o.versions.iter().any(|x| x.uuid == version.uuid),
None => {
- eprintln!("No object entry found for version {:?}", version);
+ warn!(
+ "Repair versions: object for version {:?} not found",
+ version
+ );
false
}
};
if !version_exists {
- eprintln!("Marking deleted version: {:?}", version);
+ info!("Repair versions: marking version as deleted: {:?}", version);
self.garage
.version_table
.insert(&Version {
@@ -265,12 +268,18 @@ impl AdminRpcHandler {
let ref_exists = match version {
Some(v) => !v.deleted,
None => {
- eprintln!("No version found for block ref {:?}", block_ref);
+ warn!(
+ "Block ref repair: version for block ref {:?} not found",
+ block_ref
+ );
false
}
};
if !ref_exists {
- eprintln!("Marking deleted block_ref: {:?}", block_ref);
+ info!(
+ "Repair block ref: marking block_ref as deleted: {:?}",
+ block_ref
+ );
self.garage
.block_ref_table
.insert(&BlockRef {
diff --git a/src/api_server.rs b/src/api_server.rs
index fbff7b2f..6487d5fe 100644
--- a/src/api_server.rs
+++ b/src/api_server.rs
@@ -42,7 +42,7 @@ pub async fn run_api_server(
let server = Server::bind(&addr).serve(service);
let graceful = server.with_graceful_shutdown(shutdown_signal);
- println!("API server listening on http://{}", addr);
+ info!("API server listening on http://{}", addr);
graceful.await?;
Ok(())
@@ -69,7 +69,7 @@ async fn handler_inner(
req: Request<Body>,
addr: SocketAddr,
) -> Result<Response<BodyType>, Error> {
- eprintln!("{} {} {}", addr, req.method(), req.uri());
+ info!("{} {} {}", addr, req.method(), req.uri());
let bucket = req
.headers()
@@ -231,7 +231,7 @@ impl BodyChunker {
while !self.read_all && self.buf.len() < self.block_size {
if let Some(block) = self.body.next().await {
let bytes = block?;
- eprintln!("Body next: {} bytes", bytes.len());
+ trace!("Body next: {} bytes", bytes.len());
self.buf.extend(&bytes[..]);
} else {
self.read_all = true;
diff --git a/src/background.rs b/src/background.rs
index b9a4684a..1f04e49b 100644
--- a/src/background.rs
+++ b/src/background.rs
@@ -77,9 +77,9 @@ impl BackgroundRunner {
let stop_signal = self.stop_signal.clone();
workers.push(tokio::spawn(async move {
if let Err(e) = worker(stop_signal).await {
- eprintln!("Worker stopped with error: {}, error: {}", name, e);
+ error!("Worker stopped with error: {}, error: {}", name, e);
} else {
- println!("Worker exited successfully: {}", name);
+ info!("Worker exited successfully: {}", name);
}
}));
}
@@ -90,11 +90,11 @@ impl BackgroundRunner {
let must_exit: bool = *stop_signal.borrow();
if let Some(job) = self.dequeue_job(must_exit).await {
if let Err(e) = job.await {
- eprintln!("Job failed: {}", e)
+ error!("Job failed: {}", e)
}
} else {
if must_exit {
- eprintln!("Background runner {} exiting", i);
+ info!("Background runner {} exiting", i);
return;
}
tokio::time::delay_for(Duration::from_secs(1)).await;
diff --git a/src/block.rs b/src/block.rs
index 81b111ce..ec29db12 100644
--- a/src/block.rs
+++ b/src/block.rs
@@ -153,7 +153,7 @@ impl BlockManager {
if data::hash(&data[..]) != *hash {
let _lock = self.lock.lock().await;
- eprintln!("Block {:?} is corrupted. Deleting and resyncing.", hash);
+ warn!("Block {:?} is corrupted. Deleting and resyncing.", hash);
fs::remove_file(path).await?;
self.put_to_resync(&hash, 0)?;
return Err(Error::CorruptData(hash.clone()));
@@ -211,7 +211,7 @@ impl BlockManager {
fn put_to_resync(&self, hash: &Hash, delay_millis: u64) -> Result<(), Error> {
let when = now_msec() + delay_millis;
- eprintln!("Put resync_queue: {} {:?}", when, hash);
+ trace!("Put resync_queue: {} {:?}", when, hash);
let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref());
self.resync_queue.insert(key, hash.as_ref())?;
@@ -222,7 +222,7 @@ impl BlockManager {
while !*must_exit.borrow() {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
let time_msec = u64_from_bytes(&time_bytes[0..8]);
- eprintln!(
+ trace!(
"First in resync queue: {} (now = {})",
time_msec,
now_msec()
@@ -233,7 +233,7 @@ impl BlockManager {
let hash = Hash::from(hash);
if let Err(e) = self.resync_iter(&hash).await {
- eprintln!("Failed to resync block {:?}, retrying later: {}", hash, e);
+ warn!("Failed to resync block {:?}, retrying later: {}", hash, e);
self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?;
}
continue;
@@ -256,10 +256,12 @@ impl BlockManager {
.map(|x| u64_from_bytes(x.as_ref()) > 0)
.unwrap_or(false);
- eprintln!(
- "Resync block {:?}: exists {}, needed {}",
- hash, exists, needed
- );
+ if exists != needed {
+ info!(
+ "Resync block {:?}: exists {}, needed {}",
+ hash, exists, needed
+ );
+ }
if exists && !needed {
let garage = self.garage.load_full().unwrap();
@@ -396,7 +398,7 @@ impl BlockManager {
let mut ls_data_dir_2 = match fs::read_dir(data_dir_ent.path()).await {
Err(e) => {
- eprintln!(
+ warn!(
"Warning: could not list dir {:?}: {}",
data_dir_ent.path().to_str(),
e
diff --git a/src/main.rs b/src/main.rs
index 11890e57..9b75740d 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,5 +1,8 @@
#![recursion_limit = "1024"]
+#[macro_use]
+extern crate log;
+
mod data;
mod error;
@@ -190,6 +193,8 @@ pub struct RepairOpt {
#[tokio::main]
async fn main() {
+ pretty_env_logger::init();
+
let opt = Opt::from_args();
let tls_config = match (opt.ca_cert, opt.client_cert, opt.client_key) {
@@ -200,7 +205,7 @@ async fn main() {
}),
(None, None, None) => None,
_ => {
- eprintln!("Missing one of: --ca-cert, --node-cert, --node-key. Not using TLS.");
+ warn!("Missing one of: --ca-cert, --node-cert, --node-key. Not using TLS.");
None
}
};
@@ -215,7 +220,7 @@ async fn main() {
Command::Server(server_opt) => {
// Abort on panic (same behavior as in Go)
std::panic::set_hook(Box::new(|panic_info| {
- eprintln!("{}", panic_info.to_string());
+ error!("{}", panic_info.to_string());
std::process::abort();
}));
@@ -237,7 +242,7 @@ async fn main() {
};
if let Err(e) = resp {
- eprintln!("Error: {}", e);
+ error!("Error: {}", e);
}
}
@@ -414,7 +419,7 @@ async fn cmd_admin(
println!("{:?}", bucket);
}
r => {
- eprintln!("Unexpected response: {:?}", r);
+ error!("Unexpected response: {:?}", r);
}
}
Ok(())
diff --git a/src/membership.rs b/src/membership.rs
index 193b5936..6f6dd47d 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -133,7 +133,7 @@ impl Status {
);
match old_status {
None => {
- eprintln!("Newly pingable node: {}", hex::encode(&info.id));
+ info!("Newly pingable node: {}", hex::encode(&info.id));
true
}
Some(x) => x.addr != addr,
@@ -145,12 +145,12 @@ impl Status {
nodes.sort_unstable_by_key(|(id, _status)| *id);
let mut hasher = Sha256::new();
- eprintln!("Current set of pingable nodes: --");
+ debug!("Current set of pingable nodes: --");
for (id, status) in nodes {
- eprintln!("{} {}", hex::encode(&id), status.addr);
+ debug!("{} {}", hex::encode(&id), status.addr);
hasher.input(format!("{} {}\n", hex::encode(&id), status.addr));
}
- eprintln!("END --");
+ debug!("END --");
self.hash
.as_slice_mut()
.copy_from_slice(&hasher.result()[..]);
@@ -263,7 +263,7 @@ impl System {
let net_config = match read_network_config(&config.metadata_dir) {
Ok(x) => x,
Err(e) => {
- println!(
+ info!(
"No valid previous network configuration stored ({}), starting fresh.",
e
);
@@ -448,7 +448,7 @@ impl System {
.map(|x| x.remaining_ping_attempts)
.unwrap_or(0);
if remaining_attempts == 0 {
- eprintln!(
+ warn!(
"Removing node {} after too many failed pings",
hex::encode(&id)
);
@@ -465,7 +465,7 @@ impl System {
status.recalculate_hash();
}
if let Err(e) = update_locked.0.broadcast(Arc::new(status)) {
- eprintln!("In ping_nodes: could not save status update ({})", e);
+ error!("In ping_nodes: could not save status update ({})", e);
}
drop(update_locked);
diff --git a/src/rpc_client.rs b/src/rpc_client.rs
index b2a0cf22..2b994402 100644
--- a/src/rpc_client.rs
+++ b/src/rpc_client.rs
@@ -221,7 +221,7 @@ impl RpcHttpClient {
let resp = tokio::time::timeout(timeout, resp_fut)
.await?
.map_err(|e| {
- eprintln!(
+ warn!(
"RPC HTTP client error when connecting to {}: {}",
to_addr, e
);
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index f78c27f1..4541e4da 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::net::SocketAddr;
use std::pin::Pin;
+use std::time::Instant;
use std::sync::Arc;
use bytes::IntoBuf;
@@ -36,17 +37,21 @@ async fn handle_func<M, F, Fut>(
handler: Arc<F>,
req: Request<Body>,
sockaddr: SocketAddr,
+ name: Arc<String>,
) -> Result<Response<Body>, Error>
where
M: RpcMessage + 'static,
F: Fn(M, SocketAddr) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<M, Error>> + Send + 'static,
{
+ let begin_time = Instant::now();
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
let msg = rmp_serde::decode::from_read::<_, M>(whole_body.into_buf())?;
+ let req_str = debug_serialize(&msg);
match handler(msg, sockaddr).await {
Ok(resp) => {
let resp_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Ok(resp))?;
+ trace!("]RPC:{},ok ({} ms)", name, (Instant::now()-begin_time).as_millis());
Ok(Response::new(Body::from(resp_bytes)))
}
Err(e) => {
@@ -54,6 +59,7 @@ where
let rep_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Err(err_str))?;
let mut err_response = Response::new(Body::from(rep_bytes));
*err_response.status_mut() = e.http_status_code();
+ warn!("RPC error ({}): {} ({} ms), request: {}", name, e, (Instant::now()-begin_time).as_millis(), req_str);
Ok(err_response)
}
}
@@ -74,10 +80,11 @@ impl RpcServer {
F: Fn(M, SocketAddr) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<M, Error>> + Send + 'static,
{
+ let name2 = Arc::new(name.clone());
let handler_arc = Arc::new(handler);
let handler = Box::new(move |req: Request<Body>, sockaddr: SocketAddr| {
let handler2 = handler_arc.clone();
- let b: ResponseFuture = Box::pin(handle_func(handler2, req, sockaddr));
+ let b: ResponseFuture = Box::pin(handle_func(handler2, req, sockaddr, name2.clone()));
b
});
self.handlers.insert(name, handler);
@@ -107,7 +114,7 @@ impl RpcServer {
let resp_waiter = tokio::spawn(handler(req, addr));
match resp_waiter.await {
Err(err) => {
- eprintln!("Handler await error: {}", err);
+ warn!("Handler await error: {}", err);
let mut ise = Response::default();
*ise.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
Ok(ise)
@@ -163,7 +170,7 @@ impl RpcServer {
async move {
Ok::<_, Error>(service_fn(move |req: Request<Body>| {
self_arc.clone().handler(req, client_addr).map_err(|e| {
- eprintln!("RPC handler error: {}", e);
+ warn!("RPC handler error: {}", e);
e
})
}))
@@ -173,7 +180,7 @@ impl RpcServer {
let server = Server::builder(incoming).serve(service);
let graceful = server.with_graceful_shutdown(shutdown_signal);
- println!("RPC server listening on http://{}", self.bind_addr);
+ info!("RPC server listening on http://{}", self.bind_addr);
graceful.await?;
} else {
@@ -184,7 +191,7 @@ impl RpcServer {
async move {
Ok::<_, Error>(service_fn(move |req: Request<Body>| {
self_arc.clone().handler(req, client_addr).map_err(|e| {
- eprintln!("RPC handler error: {}", e);
+ warn!("RPC handler error: {}", e);
e
})
}))
@@ -194,7 +201,7 @@ impl RpcServer {
let server = Server::bind(&self.bind_addr).serve(service);
let graceful = server.with_graceful_shutdown(shutdown_signal);
- println!("RPC server listening on http://{}", self.bind_addr);
+ info!("RPC server listening on http://{}", self.bind_addr);
graceful.await?;
}
diff --git a/src/server.rs b/src/server.rs
index 464d550b..542c8675 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -87,10 +87,10 @@ impl Garage {
background: Arc<BackgroundRunner>,
rpc_server: &mut RpcServer,
) -> Arc<Self> {
- println!("Initialize membership management system...");
+ info!("Initialize membership management system...");
let system = System::new(config.clone(), id, background.clone(), rpc_server);
- println!("Initialize block manager...");
+ info!("Initialize block manager...");
let block_manager =
BlockManager::new(&db, config.data_dir.clone(), system.clone(), rpc_server);
@@ -111,7 +111,7 @@ impl Garage {
(system.config.meta_epidemic_factor + 1) / 2,
);
- println!("Initialize block_ref_table...");
+ info!("Initialize block_ref_table...");
let block_ref_table = Table::new(
BlockRefTable {
background: background.clone(),
@@ -125,7 +125,7 @@ impl Garage {
)
.await;
- println!("Initialize version_table...");
+ info!("Initialize version_table...");
let version_table = Table::new(
VersionTable {
background: background.clone(),
@@ -139,7 +139,7 @@ impl Garage {
)
.await;
- println!("Initialize object_table...");
+ info!("Initialize object_table...");
let object_table = Table::new(
ObjectTable {
background: background.clone(),
@@ -153,7 +153,7 @@ impl Garage {
)
.await;
- println!("Initialize bucket_table...");
+ info!("Initialize bucket_table...");
let bucket_table = Table::new(
BucketTable,
control_rep_param.clone(),
@@ -164,7 +164,7 @@ impl Garage {
)
.await;
- println!("Initialize Garage...");
+ info!("Initialize Garage...");
let garage = Arc::new(Self {
db,
system: system.clone(),
@@ -176,10 +176,10 @@ impl Garage {
block_ref_table,
});
- println!("Crate admin RPC handler...");
+ info!("Crate admin RPC handler...");
AdminRpcHandler::new(garage.clone()).register_handler(rpc_server);
- println!("Start block manager background thread...");
+ info!("Start block manager background thread...");
garage.block_manager.garage.swap(Some(garage.clone()));
garage.block_manager.clone().spawn_background_worker().await;
@@ -226,7 +226,7 @@ async fn shutdown_signal(send_cancel: watch::Sender<bool>) -> Result<(), Error>
tokio::signal::ctrl_c()
.await
.expect("failed to install CTRL+C signal handler");
- println!("Received CTRL+C, shutting down.");
+ info!("Received CTRL+C, shutting down.");
send_cancel.broadcast(true)?;
Ok(())
}
@@ -240,51 +240,51 @@ async fn wait_from(mut chan: watch::Receiver<bool>) -> () {
}
pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
- println!("Loading configuration...");
+ info!("Loading configuration...");
let config = read_config(config_file).expect("Unable to read config file");
let id = gen_node_id(&config.metadata_dir).expect("Unable to read or generate node ID");
- println!("Node ID: {}", hex::encode(&id));
+ info!("Node ID: {}", hex::encode(&id));
- println!("Opening database...");
+ info!("Opening database...");
let mut db_path = config.metadata_dir.clone();
db_path.push("db");
let db = sled::open(db_path).expect("Unable to open DB");
- println!("Initialize RPC server...");
+ info!("Initialize RPC server...");
let mut rpc_server = RpcServer::new(config.rpc_bind_addr.clone(), config.rpc_tls.clone());
- println!("Initializing background runner...");
+ info!("Initializing background runner...");
let (send_cancel, watch_cancel) = watch::channel(false);
let background = BackgroundRunner::new(8, watch_cancel.clone());
let garage = Garage::new(config, id, db, background.clone(), &mut rpc_server).await;
- println!("Initializing RPC and API servers...");
+ info!("Initializing RPC and API servers...");
let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone()));
let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
futures::try_join!(
garage.system.clone().bootstrap().map(|rv| {
- println!("Bootstrap done");
+ info!("Bootstrap done");
Ok(rv)
}),
run_rpc_server.map(|rv| {
- println!("RPC server exited");
+ info!("RPC server exited");
rv
}),
api_server.map(|rv| {
- println!("API server exited");
+ info!("API server exited");
rv
}),
background.run().map(|rv| {
- println!("Background runner exited");
+ info!("Background runner exited");
Ok(rv)
}),
shutdown_signal(send_cancel),
)?;
- println!("Cleaning up...");
+ info!("Cleaning up...");
Ok(())
}
diff --git a/src/table.rs b/src/table.rs
index d9b505c9..8bcb3c66 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -489,7 +489,7 @@ where
pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> {
let syncer = self.syncer.load_full().unwrap();
- eprintln!("({}) Deleting range {:?} - {:?}", self.name, begin, end);
+ debug!("({}) Deleting range {:?} - {:?}", self.name, begin, end);
let mut count = 0;
while let Some((key, _value)) = self.store.get_lt(end.as_slice())? {
if key.as_ref() < begin.as_slice() {
@@ -504,7 +504,7 @@ where
count += 1;
}
}
- eprintln!("({}) {} entries deleted", self.name, count);
+ debug!("({}) {} entries deleted", self.name, count);
Ok(())
}
diff --git a/src/table_sync.rs b/src/table_sync.rs
index a750e04e..cb6d87aa 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -134,7 +134,7 @@ where
select! {
new_ring_r = s_ring_recv => {
if let Some(new_ring) = new_ring_r {
- eprintln!("({}) Adding ring difference to syncer todo list", self.table.name);
+ debug!("({}) Adding ring difference to syncer todo list", self.table.name);
self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring);
prev_ring = new_ring;
}
@@ -158,7 +158,7 @@ where
_ = s_timeout => {
if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) {
nothing_to_do_since = None;
- eprintln!("({}) Adding full scan to syncer todo list", self.table.name);
+ debug!("({}) Adding full scan to syncer todo list", self.table.name);
self.todo.lock().await.add_full_scan(&self.table);
}
}
@@ -180,7 +180,7 @@ where
.sync_partition(&partition, &mut must_exit)
.await;
if let Err(e) = res {
- eprintln!(
+ warn!(
"({}) Error while syncing {:?}: {}",
self.table.name, partition, e
);
@@ -198,7 +198,7 @@ where
partition: &TodoPartition,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
- eprintln!("({}) Preparing to sync {:?}...", self.table.name, partition);
+ debug!("({}) Preparing to sync {:?}...", self.table.name, partition);
let root_cks = self
.root_checksum(&partition.begin, &partition.end, must_exit)
.await?;
@@ -226,7 +226,7 @@ where
while let Some(r) = sync_futures.next().await {
if let Err(e) = r {
n_errors += 1;
- eprintln!("({}) Sync error: {}", self.table.name, e);
+ warn!("({}) Sync error: {}", self.table.name, e);
}
}
if n_errors > self.table.replication.max_write_errors() {
@@ -284,7 +284,7 @@ where
drop(cache);
let v = self.range_checksum_inner(&range, must_exit).await?;
- eprintln!(
+ trace!(
"({}) New checksum calculated for {}-{}/{}, {} children",
self.table.name,
hex::encode(&range.begin[..]),
@@ -418,7 +418,7 @@ where
while !todo.is_empty() && !*must_exit.borrow() {
let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y);
- eprintln!(
+ trace!(
"({}) Sync with {:?}: {} ({}) remaining",
self.table.name,
who,
@@ -442,7 +442,7 @@ where
rpc_resp
{
if diff_ranges.len() > 0 || diff_items.len() > 0 {
- eprintln!(
+ info!(
"({}) Sync with {:?}: difference {} ranges, {} items",
self.table.name,
who,
@@ -479,7 +479,7 @@ where
}
async fn send_items(self: Arc<Self>, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
- eprintln!(
+ info!(
"({}) Sending {} items to {:?}",
self.table.name,
item_list.len(),
@@ -594,7 +594,7 @@ where
.map(|x| x.children.len())
.fold(0, |x, y| x + y);
if ret_ranges.len() > 0 || ret_items.len() > 0 {
- eprintln!(
+ trace!(
"({}) Checksum comparison RPC: {} different + {} items for {} received",
self.table.name,
ret_ranges.len(),