diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/admin_rpc.rs | 17 | ||||
-rw-r--r-- | src/api_server.rs | 6 | ||||
-rw-r--r-- | src/background.rs | 8 | ||||
-rw-r--r-- | src/block.rs | 20 | ||||
-rw-r--r-- | src/main.rs | 13 | ||||
-rw-r--r-- | src/membership.rs | 14 | ||||
-rw-r--r-- | src/rpc_client.rs | 2 | ||||
-rw-r--r-- | src/rpc_server.rs | 19 | ||||
-rw-r--r-- | src/server.rs | 42 | ||||
-rw-r--r-- | src/table.rs | 4 | ||||
-rw-r--r-- | src/table_sync.rs | 20 |
11 files changed, 94 insertions, 71 deletions
diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs index 125f897e..0e9d2338 100644 --- a/src/admin_rpc.rs +++ b/src/admin_rpc.rs @@ -222,12 +222,15 @@ impl AdminRpcHandler { let version_exists = match object { Some(o) => o.versions.iter().any(|x| x.uuid == version.uuid), None => { - eprintln!("No object entry found for version {:?}", version); + warn!( + "Repair versions: object for version {:?} not found", + version + ); false } }; if !version_exists { - eprintln!("Marking deleted version: {:?}", version); + info!("Repair versions: marking version as deleted: {:?}", version); self.garage .version_table .insert(&Version { @@ -265,12 +268,18 @@ impl AdminRpcHandler { let ref_exists = match version { Some(v) => !v.deleted, None => { - eprintln!("No version found for block ref {:?}", block_ref); + warn!( + "Block ref repair: version for block ref {:?} not found", + block_ref + ); false } }; if !ref_exists { - eprintln!("Marking deleted block_ref: {:?}", block_ref); + info!( + "Repair block ref: marking block_ref as deleted: {:?}", + block_ref + ); self.garage .block_ref_table .insert(&BlockRef { diff --git a/src/api_server.rs b/src/api_server.rs index fbff7b2f..6487d5fe 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -42,7 +42,7 @@ pub async fn run_api_server( let server = Server::bind(&addr).serve(service); let graceful = server.with_graceful_shutdown(shutdown_signal); - println!("API server listening on http://{}", addr); + info!("API server listening on http://{}", addr); graceful.await?; Ok(()) @@ -69,7 +69,7 @@ async fn handler_inner( req: Request<Body>, addr: SocketAddr, ) -> Result<Response<BodyType>, Error> { - eprintln!("{} {} {}", addr, req.method(), req.uri()); + info!("{} {} {}", addr, req.method(), req.uri()); let bucket = req .headers() @@ -231,7 +231,7 @@ impl BodyChunker { while !self.read_all && self.buf.len() < self.block_size { if let Some(block) = self.body.next().await { let bytes = block?; - eprintln!("Body next: {} bytes", bytes.len()); + trace!("Body next: {} bytes", bytes.len()); self.buf.extend(&bytes[..]); } else { self.read_all = true; diff --git a/src/background.rs b/src/background.rs index b9a4684a..1f04e49b 100644 --- a/src/background.rs +++ b/src/background.rs @@ -77,9 +77,9 @@ impl BackgroundRunner { let stop_signal = self.stop_signal.clone(); workers.push(tokio::spawn(async move { if let Err(e) = worker(stop_signal).await { - eprintln!("Worker stopped with error: {}, error: {}", name, e); + error!("Worker stopped with error: {}, error: {}", name, e); } else { - println!("Worker exited successfully: {}", name); + info!("Worker exited successfully: {}", name); } })); } @@ -90,11 +90,11 @@ impl BackgroundRunner { let must_exit: bool = *stop_signal.borrow(); if let Some(job) = self.dequeue_job(must_exit).await { if let Err(e) = job.await { - eprintln!("Job failed: {}", e) + error!("Job failed: {}", e) } } else { if must_exit { - eprintln!("Background runner {} exiting", i); + info!("Background runner {} exiting", i); return; } tokio::time::delay_for(Duration::from_secs(1)).await; diff --git a/src/block.rs b/src/block.rs index 81b111ce..ec29db12 100644 --- a/src/block.rs +++ b/src/block.rs @@ -153,7 +153,7 @@ impl BlockManager { if data::hash(&data[..]) != *hash { let _lock = self.lock.lock().await; - eprintln!("Block {:?} is corrupted. Deleting and resyncing.", hash); + warn!("Block {:?} is corrupted. Deleting and resyncing.", hash); fs::remove_file(path).await?; self.put_to_resync(&hash, 0)?; return Err(Error::CorruptData(hash.clone())); @@ -211,7 +211,7 @@ impl BlockManager { fn put_to_resync(&self, hash: &Hash, delay_millis: u64) -> Result<(), Error> { let when = now_msec() + delay_millis; - eprintln!("Put resync_queue: {} {:?}", when, hash); + trace!("Put resync_queue: {} {:?}", when, hash); let mut key = u64::to_be_bytes(when).to_vec(); key.extend(hash.as_ref()); self.resync_queue.insert(key, hash.as_ref())?; @@ -222,7 +222,7 @@ impl BlockManager { while !*must_exit.borrow() { if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { let time_msec = u64_from_bytes(&time_bytes[0..8]); - eprintln!( + trace!( "First in resync queue: {} (now = {})", time_msec, now_msec() @@ -233,7 +233,7 @@ impl BlockManager { let hash = Hash::from(hash); if let Err(e) = self.resync_iter(&hash).await { - eprintln!("Failed to resync block {:?}, retrying later: {}", hash, e); + warn!("Failed to resync block {:?}, retrying later: {}", hash, e); self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?; } continue; @@ -256,10 +256,12 @@ impl BlockManager { .map(|x| u64_from_bytes(x.as_ref()) > 0) .unwrap_or(false); - eprintln!( - "Resync block {:?}: exists {}, needed {}", - hash, exists, needed - ); + if exists != needed { + info!( + "Resync block {:?}: exists {}, needed {}", + hash, exists, needed + ); + } if exists && !needed { let garage = self.garage.load_full().unwrap(); @@ -396,7 +398,7 @@ impl BlockManager { let mut ls_data_dir_2 = match fs::read_dir(data_dir_ent.path()).await { Err(e) => { - eprintln!( + warn!( "Warning: could not list dir {:?}: {}", data_dir_ent.path().to_str(), e diff --git a/src/main.rs b/src/main.rs index 11890e57..9b75740d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,8 @@ #![recursion_limit = "1024"] +#[macro_use] +extern crate log; + mod data; mod error; @@ -190,6 +193,8 @@ pub struct RepairOpt { #[tokio::main] async fn main() { + pretty_env_logger::init(); + let opt = Opt::from_args(); let tls_config = match (opt.ca_cert, opt.client_cert, opt.client_key) { @@ -200,7 +205,7 @@ async fn main() { }), (None, None, None) => None, _ => { - eprintln!("Missing one of: --ca-cert, --node-cert, --node-key. Not using TLS."); + warn!("Missing one of: --ca-cert, --node-cert, --node-key. Not using TLS."); None } }; @@ -215,7 +220,7 @@ async fn main() { Command::Server(server_opt) => { // Abort on panic (same behavior as in Go) std::panic::set_hook(Box::new(|panic_info| { - eprintln!("{}", panic_info.to_string()); + error!("{}", panic_info.to_string()); std::process::abort(); })); @@ -237,7 +242,7 @@ async fn main() { }; if let Err(e) = resp { - eprintln!("Error: {}", e); + error!("Error: {}", e); } } @@ -414,7 +419,7 @@ async fn cmd_admin( println!("{:?}", bucket); } r => { - eprintln!("Unexpected response: {:?}", r); + error!("Unexpected response: {:?}", r); } } Ok(()) diff --git a/src/membership.rs b/src/membership.rs index 193b5936..6f6dd47d 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -133,7 +133,7 @@ impl Status { ); match old_status { None => { - eprintln!("Newly pingable node: {}", hex::encode(&info.id)); + info!("Newly pingable node: {}", hex::encode(&info.id)); true } Some(x) => x.addr != addr, @@ -145,12 +145,12 @@ impl Status { nodes.sort_unstable_by_key(|(id, _status)| *id); let mut hasher = Sha256::new(); - eprintln!("Current set of pingable nodes: --"); + debug!("Current set of pingable nodes: --"); for (id, status) in nodes { - eprintln!("{} {}", hex::encode(&id), status.addr); + debug!("{} {}", hex::encode(&id), status.addr); hasher.input(format!("{} {}\n", hex::encode(&id), status.addr)); } - eprintln!("END --"); + debug!("END --"); self.hash .as_slice_mut() .copy_from_slice(&hasher.result()[..]); @@ -263,7 +263,7 @@ impl System { let net_config = match read_network_config(&config.metadata_dir) { Ok(x) => x, Err(e) => { - println!( + info!( "No valid previous network configuration stored ({}), starting fresh.", e ); @@ -448,7 +448,7 @@ impl System { .map(|x| x.remaining_ping_attempts) .unwrap_or(0); if remaining_attempts == 0 { - eprintln!( + warn!( "Removing node {} after too many failed pings", hex::encode(&id) ); @@ -465,7 +465,7 @@ impl System { status.recalculate_hash(); } if let Err(e) = update_locked.0.broadcast(Arc::new(status)) { - eprintln!("In ping_nodes: could not save status update ({})", e); + error!("In ping_nodes: could not save status update ({})", e); } drop(update_locked); diff --git a/src/rpc_client.rs b/src/rpc_client.rs index b2a0cf22..2b994402 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -221,7 +221,7 @@ impl RpcHttpClient { let resp = tokio::time::timeout(timeout, resp_fut) .await? .map_err(|e| { - eprintln!( + warn!( "RPC HTTP client error when connecting to {}: {}", to_addr, e ); diff --git a/src/rpc_server.rs b/src/rpc_server.rs index f78c27f1..4541e4da 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::pin::Pin; +use std::time::Instant; use std::sync::Arc; use bytes::IntoBuf; @@ -36,17 +37,21 @@ async fn handle_func<M, F, Fut>( handler: Arc<F>, req: Request<Body>, sockaddr: SocketAddr, + name: Arc<String>, ) -> Result<Response<Body>, Error> where M: RpcMessage + 'static, F: Fn(M, SocketAddr) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<M, Error>> + Send + 'static, { + let begin_time = Instant::now(); let whole_body = hyper::body::to_bytes(req.into_body()).await?; let msg = rmp_serde::decode::from_read::<_, M>(whole_body.into_buf())?; + let req_str = debug_serialize(&msg); match handler(msg, sockaddr).await { Ok(resp) => { let resp_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Ok(resp))?; + trace!("]RPC:{},ok ({} ms)", name, (Instant::now()-begin_time).as_millis()); Ok(Response::new(Body::from(resp_bytes))) } Err(e) => { @@ -54,6 +59,7 @@ where let rep_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Err(err_str))?; let mut err_response = Response::new(Body::from(rep_bytes)); *err_response.status_mut() = e.http_status_code(); + warn!("RPC error ({}): {} ({} ms), request: {}", name, e, (Instant::now()-begin_time).as_millis(), req_str); Ok(err_response) } } @@ -74,10 +80,11 @@ impl RpcServer { F: Fn(M, SocketAddr) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<M, Error>> + Send + 'static, { + let name2 = Arc::new(name.clone()); let handler_arc = Arc::new(handler); let handler = Box::new(move |req: Request<Body>, sockaddr: SocketAddr| { let handler2 = handler_arc.clone(); - let b: ResponseFuture = Box::pin(handle_func(handler2, req, sockaddr)); + let b: ResponseFuture = Box::pin(handle_func(handler2, req, sockaddr, name2.clone())); b }); self.handlers.insert(name, handler); @@ -107,7 +114,7 @@ impl RpcServer { let resp_waiter = tokio::spawn(handler(req, addr)); match resp_waiter.await { Err(err) => { - eprintln!("Handler await error: {}", err); + warn!("Handler await error: {}", err); let mut ise = Response::default(); *ise.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; Ok(ise) @@ -163,7 +170,7 @@ impl RpcServer { async move { Ok::<_, Error>(service_fn(move |req: Request<Body>| { self_arc.clone().handler(req, client_addr).map_err(|e| { - eprintln!("RPC handler error: {}", e); + warn!("RPC handler error: {}", e); e }) })) @@ -173,7 +180,7 @@ impl RpcServer { let server = Server::builder(incoming).serve(service); let graceful = server.with_graceful_shutdown(shutdown_signal); - println!("RPC server listening on http://{}", self.bind_addr); + info!("RPC server listening on http://{}", self.bind_addr); graceful.await?; } else { @@ -184,7 +191,7 @@ impl RpcServer { async move { Ok::<_, Error>(service_fn(move |req: Request<Body>| { self_arc.clone().handler(req, client_addr).map_err(|e| { - eprintln!("RPC handler error: {}", e); + warn!("RPC handler error: {}", e); e }) })) @@ -194,7 +201,7 @@ impl RpcServer { let server = Server::bind(&self.bind_addr).serve(service); let graceful = server.with_graceful_shutdown(shutdown_signal); - println!("RPC server listening on http://{}", self.bind_addr); + info!("RPC server listening on http://{}", self.bind_addr); graceful.await?; } diff --git a/src/server.rs b/src/server.rs index 464d550b..542c8675 100644 --- a/src/server.rs +++ b/src/server.rs @@ -87,10 +87,10 @@ impl Garage { background: Arc<BackgroundRunner>, rpc_server: &mut RpcServer, ) -> Arc<Self> { - println!("Initialize membership management system..."); + info!("Initialize membership management system..."); let system = System::new(config.clone(), id, background.clone(), rpc_server); - println!("Initialize block manager..."); + info!("Initialize block manager..."); let block_manager = BlockManager::new(&db, config.data_dir.clone(), system.clone(), rpc_server); @@ -111,7 +111,7 @@ impl Garage { (system.config.meta_epidemic_factor + 1) / 2, ); - println!("Initialize block_ref_table..."); + info!("Initialize block_ref_table..."); let block_ref_table = Table::new( BlockRefTable { background: background.clone(), @@ -125,7 +125,7 @@ impl Garage { ) .await; - println!("Initialize version_table..."); + info!("Initialize version_table..."); let version_table = Table::new( VersionTable { background: background.clone(), @@ -139,7 +139,7 @@ impl Garage { ) .await; - println!("Initialize object_table..."); + info!("Initialize object_table..."); let object_table = Table::new( ObjectTable { background: background.clone(), @@ -153,7 +153,7 @@ impl Garage { ) .await; - println!("Initialize bucket_table..."); + info!("Initialize bucket_table..."); let bucket_table = Table::new( BucketTable, control_rep_param.clone(), @@ -164,7 +164,7 @@ impl Garage { ) .await; - println!("Initialize Garage..."); + info!("Initialize Garage..."); let garage = Arc::new(Self { db, system: system.clone(), @@ -176,10 +176,10 @@ impl Garage { block_ref_table, }); - println!("Crate admin RPC handler..."); + info!("Crate admin RPC handler..."); AdminRpcHandler::new(garage.clone()).register_handler(rpc_server); - println!("Start block manager background thread..."); + info!("Start block manager background thread..."); garage.block_manager.garage.swap(Some(garage.clone())); garage.block_manager.clone().spawn_background_worker().await; @@ -226,7 +226,7 @@ async fn shutdown_signal(send_cancel: watch::Sender<bool>) -> Result<(), Error> tokio::signal::ctrl_c() .await .expect("failed to install CTRL+C signal handler"); - println!("Received CTRL+C, shutting down."); + info!("Received CTRL+C, shutting down."); send_cancel.broadcast(true)?; Ok(()) } @@ -240,51 +240,51 @@ async fn wait_from(mut chan: watch::Receiver<bool>) -> () { } pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { - println!("Loading configuration..."); + info!("Loading configuration..."); let config = read_config(config_file).expect("Unable to read config file"); let id = gen_node_id(&config.metadata_dir).expect("Unable to read or generate node ID"); - println!("Node ID: {}", hex::encode(&id)); + info!("Node ID: {}", hex::encode(&id)); - println!("Opening database..."); + info!("Opening database..."); let mut db_path = config.metadata_dir.clone(); db_path.push("db"); let db = sled::open(db_path).expect("Unable to open DB"); - println!("Initialize RPC server..."); + info!("Initialize RPC server..."); let mut rpc_server = RpcServer::new(config.rpc_bind_addr.clone(), config.rpc_tls.clone()); - println!("Initializing background runner..."); + info!("Initializing background runner..."); let (send_cancel, watch_cancel) = watch::channel(false); let background = BackgroundRunner::new(8, watch_cancel.clone()); let garage = Garage::new(config, id, db, background.clone(), &mut rpc_server).await; - println!("Initializing RPC and API servers..."); + info!("Initializing RPC and API servers..."); let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone())); let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone())); futures::try_join!( garage.system.clone().bootstrap().map(|rv| { - println!("Bootstrap done"); + info!("Bootstrap done"); Ok(rv) }), run_rpc_server.map(|rv| { - println!("RPC server exited"); + info!("RPC server exited"); rv }), api_server.map(|rv| { - println!("API server exited"); + info!("API server exited"); rv }), background.run().map(|rv| { - println!("Background runner exited"); + info!("Background runner exited"); Ok(rv) }), shutdown_signal(send_cancel), )?; - println!("Cleaning up..."); + info!("Cleaning up..."); Ok(()) } diff --git a/src/table.rs b/src/table.rs index d9b505c9..8bcb3c66 100644 --- a/src/table.rs +++ b/src/table.rs @@ -489,7 +489,7 @@ where pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> { let syncer = self.syncer.load_full().unwrap(); - eprintln!("({}) Deleting range {:?} - {:?}", self.name, begin, end); + debug!("({}) Deleting range {:?} - {:?}", self.name, begin, end); let mut count = 0; while let Some((key, _value)) = self.store.get_lt(end.as_slice())? { if key.as_ref() < begin.as_slice() { @@ -504,7 +504,7 @@ where count += 1; } } - eprintln!("({}) {} entries deleted", self.name, count); + debug!("({}) {} entries deleted", self.name, count); Ok(()) } diff --git a/src/table_sync.rs b/src/table_sync.rs index a750e04e..cb6d87aa 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -134,7 +134,7 @@ where select! { new_ring_r = s_ring_recv => { if let Some(new_ring) = new_ring_r { - eprintln!("({}) Adding ring difference to syncer todo list", self.table.name); + debug!("({}) Adding ring difference to syncer todo list", self.table.name); self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring); prev_ring = new_ring; } @@ -158,7 +158,7 @@ where _ = s_timeout => { if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) { nothing_to_do_since = None; - eprintln!("({}) Adding full scan to syncer todo list", self.table.name); + debug!("({}) Adding full scan to syncer todo list", self.table.name); self.todo.lock().await.add_full_scan(&self.table); } } @@ -180,7 +180,7 @@ where .sync_partition(&partition, &mut must_exit) .await; if let Err(e) = res { - eprintln!( + warn!( "({}) Error while syncing {:?}: {}", self.table.name, partition, e ); @@ -198,7 +198,7 @@ where partition: &TodoPartition, must_exit: &mut watch::Receiver<bool>, ) -> Result<(), Error> { - eprintln!("({}) Preparing to sync {:?}...", self.table.name, partition); + debug!("({}) Preparing to sync {:?}...", self.table.name, partition); let root_cks = self .root_checksum(&partition.begin, &partition.end, must_exit) .await?; @@ -226,7 +226,7 @@ where while let Some(r) = sync_futures.next().await { if let Err(e) = r { n_errors += 1; - eprintln!("({}) Sync error: {}", self.table.name, e); + warn!("({}) Sync error: {}", self.table.name, e); } } if n_errors > self.table.replication.max_write_errors() { @@ -284,7 +284,7 @@ where drop(cache); let v = self.range_checksum_inner(&range, must_exit).await?; - eprintln!( + trace!( "({}) New checksum calculated for {}-{}/{}, {} children", self.table.name, hex::encode(&range.begin[..]), @@ -418,7 +418,7 @@ where while !todo.is_empty() && !*must_exit.borrow() { let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y); - eprintln!( + trace!( "({}) Sync with {:?}: {} ({}) remaining", self.table.name, who, @@ -442,7 +442,7 @@ where rpc_resp { if diff_ranges.len() > 0 || diff_items.len() > 0 { - eprintln!( + info!( "({}) Sync with {:?}: difference {} ranges, {} items", self.table.name, who, @@ -479,7 +479,7 @@ where } async fn send_items(self: Arc<Self>, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> { - eprintln!( + info!( "({}) Sending {} items to {:?}", self.table.name, item_list.len(), @@ -594,7 +594,7 @@ where .map(|x| x.children.len()) .fold(0, |x, y| x + y); if ret_ranges.len() > 0 || ret_items.len() > 0 { - eprintln!( + trace!( "({}) Checksum comparison RPC: {} different + {} items for {} received", self.table.name, ret_ranges.len(), |