diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/admin_rpc.rs | 10 | ||||
-rw-r--r-- | src/api_server.rs | 4 | ||||
-rw-r--r-- | src/background.rs | 2 | ||||
-rw-r--r-- | src/block.rs | 4 | ||||
-rw-r--r-- | src/error.rs | 6 | ||||
-rw-r--r-- | src/main.rs | 6 | ||||
-rw-r--r-- | src/membership.rs | 4 | ||||
-rw-r--r-- | src/rpc_server.rs | 5 | ||||
-rw-r--r-- | src/server.rs | 32 | ||||
-rw-r--r-- | src/table.rs | 12 | ||||
-rw-r--r-- | src/table_sync.rs | 51 |
11 files changed, 92 insertions, 44 deletions
diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs index 8a274c26..29037c6c 100644 --- a/src/admin_rpc.rs +++ b/src/admin_rpc.rs @@ -18,7 +18,7 @@ pub enum AdminRPC { BucketOperation(BucketOperation), // Replies - Ok, + Ok(String), BucketList(Vec<String>), BucketInfo(Bucket), } @@ -86,13 +86,13 @@ impl AdminRpcHandler { self.garage .bucket_table .insert(&Bucket { - name: query.name, + name: query.name.clone(), timestamp: new_time, deleted: false, authorized_keys: vec![], }) .await?; - Ok(AdminRPC::Ok) + Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name))) } BucketOperation::Delete(query) => { let bucket = match self @@ -129,13 +129,13 @@ impl AdminRpcHandler { self.garage .bucket_table .insert(&Bucket { - name: query.name, + name: query.name.clone(), timestamp: std::cmp::max(bucket.timestamp + 1, now_msec()), deleted: true, authorized_keys: vec![], }) .await?; - Ok(AdminRPC::Ok) + Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name))) } _ => { // TODO diff --git a/src/api_server.rs b/src/api_server.rs index 52464f07..fbff7b2f 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -1,5 +1,5 @@ use std::collections::VecDeque; -use std::net::{Ipv6Addr, SocketAddr}; +use std::net::SocketAddr; use std::sync::Arc; use futures::future::Future; @@ -26,7 +26,7 @@ pub async fn run_api_server( garage: Arc<Garage>, shutdown_signal: impl Future<Output = ()>, ) -> Result<(), Error> { - let addr = (Ipv6Addr::LOCALHOST, garage.system.config.api_port).into(); + let addr = &garage.system.config.api_bind_addr; let service = make_service_fn(|conn: &AddrStream| { let garage = garage.clone(); diff --git a/src/background.rs b/src/background.rs index f4b889ea..34e96ff7 100644 --- a/src/background.rs +++ b/src/background.rs @@ -78,6 +78,8 @@ impl BackgroundRunner { workers.push(tokio::spawn(async move { if let Err(e) = worker(stop_signal).await { eprintln!("Worker stopped with error: {}", e); + } else { + println!("A worker exited successfully (which one?)"); } })); } diff --git a/src/block.rs b/src/block.rs index c84f193b..489dc33e 100644 --- a/src/block.rs +++ b/src/block.rs @@ -193,7 +193,7 @@ impl BlockManager { let old_rc = self.rc.get(&hash)?; self.rc.merge(&hash, vec![1])?; if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { - self.put_to_resync(&hash, 2 * BLOCK_RW_TIMEOUT.as_millis() as u64)?; + self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?; } Ok(()) } @@ -201,7 +201,7 @@ impl BlockManager { pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { let new_rc = self.rc.merge(&hash, vec![0])?; if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { - self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?; + self.put_to_resync(&hash, 0)?; } Ok(()) } diff --git a/src/error.rs b/src/error.rs index 678ab72d..50a0a44b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -83,3 +83,9 @@ impl<T> From<tokio::sync::watch::error::SendError<T>> for Error { Error::Message(format!("Watch send error")) } } + +impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error { + fn from(_e: tokio::sync::mpsc::error::SendError<T>) -> Error { + Error::Message(format!("MPSC send error")) + } +} diff --git a/src/main.rs b/src/main.rs index 0aab9e2a..1d582c25 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,5 @@ +#![recursion_limit = "1024"] + mod data; mod error; @@ -387,8 +389,8 @@ async fn cmd_admin( args: AdminRPC, ) -> Result<(), Error> { match rpc_cli.call(&rpc_host, args, DEFAULT_TIMEOUT).await? { - AdminRPC::Ok => { - println!("Ok."); + AdminRPC::Ok(msg) => { + println!("{}", msg); } AdminRPC::BucketList(bl) => { println!("List of buckets:"); diff --git a/src/membership.rs b/src/membership.rs index 412a83f8..fc21d0b5 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -362,7 +362,7 @@ impl System { let ring = self.ring.borrow().clone(); Message::Ping(PingMessage { id: self.id.clone(), - rpc_port: self.config.rpc_port, + rpc_port: self.config.rpc_bind_addr.port(), status_hash: status.hash.clone(), config_version: ring.config.version, state_info: self.state_info.clone(), @@ -539,7 +539,7 @@ impl System { for node in adv.iter() { if node.id == self.id { // learn our own ip address - let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port); + let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_bind_addr.port()); let old_self = status.nodes.insert( node.id.clone(), StatusEntry { diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 7a6a57ee..f78c27f1 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -145,10 +145,7 @@ impl RpcServer { match socket { Ok(stream) => match tls_acceptor.clone().accept(stream).await { Ok(x) => Some(Ok::<_, hyper::Error>(x)), - Err(e) => { - eprintln!("RPC server TLS error: {}", e); - None - } + Err(_e) => None, }, Err(_) => None, } diff --git a/src/server.rs b/src/server.rs index 979d76f9..464d550b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,7 +3,7 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; -pub use futures_util::future::FutureExt; +use futures_util::future::*; use serde::Deserialize; use tokio::sync::watch; @@ -30,8 +30,8 @@ pub struct Config { pub metadata_dir: PathBuf, pub data_dir: PathBuf, - pub api_port: u16, - pub rpc_port: u16, + pub api_bind_addr: SocketAddr, + pub rpc_bind_addr: SocketAddr, pub bootstrap_peers: Vec<SocketAddr>, @@ -252,8 +252,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { let db = sled::open(db_path).expect("Unable to open DB"); println!("Initialize RPC server..."); - let rpc_bind_addr = ([0, 0, 0, 0, 0, 0, 0, 0], config.rpc_port).into(); - let mut rpc_server = RpcServer::new(rpc_bind_addr, config.rpc_tls.clone()); + let mut rpc_server = RpcServer::new(config.rpc_bind_addr.clone(), config.rpc_tls.clone()); println!("Initializing background runner..."); let (send_cancel, watch_cancel) = watch::channel(false); @@ -266,11 +265,26 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone())); futures::try_join!( - garage.system.clone().bootstrap().map(Ok), - run_rpc_server, - api_server, - background.run().map(Ok), + garage.system.clone().bootstrap().map(|rv| { + println!("Bootstrap done"); + Ok(rv) + }), + run_rpc_server.map(|rv| { + println!("RPC server exited"); + rv + }), + api_server.map(|rv| { + println!("API server exited"); + rv + }), + background.run().map(|rv| { + println!("Background runner exited"); + Ok(rv) + }), shutdown_signal(send_cancel), )?; + + println!("Cleaning up..."); + Ok(()) } diff --git a/src/table.rs b/src/table.rs index 37fb2f51..80364d17 100644 --- a/src/table.rs +++ b/src/table.rs @@ -436,6 +436,7 @@ where self: &Arc<Self>, mut entries: Vec<Arc<ByteBuf>>, ) -> Result<(), Error> { + let syncer = self.syncer.load_full().unwrap(); let mut epidemic_propagate = vec![]; for update_bytes in entries.drain(..) { @@ -469,9 +470,9 @@ where } self.instance.updated(old_entry, Some(new_entry)).await; - - let syncer = self.syncer.load_full().unwrap(); - self.system.background.spawn(syncer.invalidate(tree_key)); + self.system + .background + .spawn(syncer.clone().invalidate(tree_key)); } } @@ -486,6 +487,8 @@ where } pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> { + let syncer = self.syncer.load_full().unwrap(); + eprintln!("({}) Deleting range {:?} - {:?}", self.name, begin, end); let mut count = 0; while let Some((key, _value)) = self.store.get_lt(end.as_slice())? { @@ -495,6 +498,9 @@ where if let Some(old_val) = self.store.remove(&key)? { let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?; self.instance.updated(Some(old_entry), None).await; + self.system + .background + .spawn(syncer.clone().invalidate(key.to_vec())); count += 1; } } diff --git a/src/table_sync.rs b/src/table_sync.rs index 0f3e90d2..6f45969b 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -9,8 +9,8 @@ use futures_util::future::*; use futures_util::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; -use tokio::sync::watch; use tokio::sync::Mutex; +use tokio::sync::{mpsc, watch}; use crate::data::*; use crate::error::Error; @@ -18,9 +18,8 @@ use crate::membership::Ring; use crate::table::*; const MAX_DEPTH: usize = 16; -const SCAN_INTERVAL: Duration = Duration::from_secs(3600); +const SCAN_INTERVAL: Duration = Duration::from_secs(60); const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800); - const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(10); pub struct TableSyncer<F: TableSchema, R: TableReplication> { @@ -91,18 +90,24 @@ where .collect::<Vec<_>>(), }); + let (busy_tx, busy_rx) = mpsc::unbounded_channel(); + let s1 = syncer.clone(); table .system .background - .spawn_worker(move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit)) + .spawn_worker(move |must_exit: watch::Receiver<bool>| { + s1.watcher_task(must_exit, busy_rx) + }) .await; let s2 = syncer.clone(); table .system .background - .spawn_worker(move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit)) + .spawn_worker(move |must_exit: watch::Receiver<bool>| { + s2.syncer_task(must_exit, busy_tx) + }) .await; syncer @@ -111,25 +116,20 @@ where async fn watcher_task( self: Arc<Self>, mut must_exit: watch::Receiver<bool>, + mut busy_rx: mpsc::UnboundedReceiver<bool>, ) -> Result<(), Error> { - tokio::time::delay_for(Duration::from_secs(10)).await; - - self.todo.lock().await.add_full_scan(&self.table); - let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse(); let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone(); let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone(); + let mut nothing_to_do_since = Some(Instant::now()); while !*must_exit.borrow() { let s_ring_recv = ring_recv.recv().fuse(); + let s_busy = busy_rx.recv().fuse(); let s_must_exit = must_exit.recv().fuse(); - pin_mut!(s_ring_recv, s_must_exit); + let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse(); + pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout); select! { - _ = next_full_scan => { - next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse(); - eprintln!("({}) Adding full scan to syncer todo list", self.table.name); - self.todo.lock().await.add_full_scan(&self.table); - } new_ring_r = s_ring_recv => { if let Some(new_ring) = new_ring_r { eprintln!("({}) Adding ring difference to syncer todo list", self.table.name); @@ -137,11 +137,29 @@ where prev_ring = new_ring; } } + busy_opt = s_busy => { + if let Some(busy) = busy_opt { + if busy { + nothing_to_do_since = None; + } else { + if nothing_to_do_since.is_none() { + nothing_to_do_since = Some(Instant::now()); + } + } + } + } must_exit_v = s_must_exit => { if must_exit_v.unwrap_or(false) { break; } } + _ = s_timeout => { + if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) { + nothing_to_do_since = None; + eprintln!("({}) Adding full scan to syncer todo list", self.table.name); + self.todo.lock().await.add_full_scan(&self.table); + } + } } } Ok(()) @@ -150,9 +168,11 @@ where async fn syncer_task( self: Arc<Self>, mut must_exit: watch::Receiver<bool>, + busy_tx: mpsc::UnboundedSender<bool>, ) -> Result<(), Error> { while !*must_exit.borrow() { if let Some(partition) = self.todo.lock().await.pop_task() { + busy_tx.send(true)?; let res = self .clone() .sync_partition(&partition, &mut must_exit) @@ -164,6 +184,7 @@ where ); } } else { + busy_tx.send(false)?; tokio::time::delay_for(Duration::from_secs(1)).await; } } |