aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/admin_rpc.rs10
-rw-r--r--src/api_server.rs4
-rw-r--r--src/background.rs2
-rw-r--r--src/block.rs4
-rw-r--r--src/error.rs6
-rw-r--r--src/main.rs6
-rw-r--r--src/membership.rs4
-rw-r--r--src/rpc_server.rs5
-rw-r--r--src/server.rs32
-rw-r--r--src/table.rs12
-rw-r--r--src/table_sync.rs51
11 files changed, 92 insertions, 44 deletions
diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs
index 8a274c26..29037c6c 100644
--- a/src/admin_rpc.rs
+++ b/src/admin_rpc.rs
@@ -18,7 +18,7 @@ pub enum AdminRPC {
BucketOperation(BucketOperation),
// Replies
- Ok,
+ Ok(String),
BucketList(Vec<String>),
BucketInfo(Bucket),
}
@@ -86,13 +86,13 @@ impl AdminRpcHandler {
self.garage
.bucket_table
.insert(&Bucket {
- name: query.name,
+ name: query.name.clone(),
timestamp: new_time,
deleted: false,
authorized_keys: vec![],
})
.await?;
- Ok(AdminRPC::Ok)
+ Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name)))
}
BucketOperation::Delete(query) => {
let bucket = match self
@@ -129,13 +129,13 @@ impl AdminRpcHandler {
self.garage
.bucket_table
.insert(&Bucket {
- name: query.name,
+ name: query.name.clone(),
timestamp: std::cmp::max(bucket.timestamp + 1, now_msec()),
deleted: true,
authorized_keys: vec![],
})
.await?;
- Ok(AdminRPC::Ok)
+ Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name)))
}
_ => {
// TODO
diff --git a/src/api_server.rs b/src/api_server.rs
index 52464f07..fbff7b2f 100644
--- a/src/api_server.rs
+++ b/src/api_server.rs
@@ -1,5 +1,5 @@
use std::collections::VecDeque;
-use std::net::{Ipv6Addr, SocketAddr};
+use std::net::SocketAddr;
use std::sync::Arc;
use futures::future::Future;
@@ -26,7 +26,7 @@ pub async fn run_api_server(
garage: Arc<Garage>,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), Error> {
- let addr = (Ipv6Addr::LOCALHOST, garage.system.config.api_port).into();
+ let addr = &garage.system.config.api_bind_addr;
let service = make_service_fn(|conn: &AddrStream| {
let garage = garage.clone();
diff --git a/src/background.rs b/src/background.rs
index f4b889ea..34e96ff7 100644
--- a/src/background.rs
+++ b/src/background.rs
@@ -78,6 +78,8 @@ impl BackgroundRunner {
workers.push(tokio::spawn(async move {
if let Err(e) = worker(stop_signal).await {
eprintln!("Worker stopped with error: {}", e);
+ } else {
+ println!("A worker exited successfully (which one?)");
}
}));
}
diff --git a/src/block.rs b/src/block.rs
index c84f193b..489dc33e 100644
--- a/src/block.rs
+++ b/src/block.rs
@@ -193,7 +193,7 @@ impl BlockManager {
let old_rc = self.rc.get(&hash)?;
self.rc.merge(&hash, vec![1])?;
if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
- self.put_to_resync(&hash, 2 * BLOCK_RW_TIMEOUT.as_millis() as u64)?;
+ self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?;
}
Ok(())
}
@@ -201,7 +201,7 @@ impl BlockManager {
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
let new_rc = self.rc.merge(&hash, vec![0])?;
if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
- self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?;
+ self.put_to_resync(&hash, 0)?;
}
Ok(())
}
diff --git a/src/error.rs b/src/error.rs
index 678ab72d..50a0a44b 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -83,3 +83,9 @@ impl<T> From<tokio::sync::watch::error::SendError<T>> for Error {
Error::Message(format!("Watch send error"))
}
}
+
+impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
+ fn from(_e: tokio::sync::mpsc::error::SendError<T>) -> Error {
+ Error::Message(format!("MPSC send error"))
+ }
+}
diff --git a/src/main.rs b/src/main.rs
index 0aab9e2a..1d582c25 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,3 +1,5 @@
+#![recursion_limit = "1024"]
+
mod data;
mod error;
@@ -387,8 +389,8 @@ async fn cmd_admin(
args: AdminRPC,
) -> Result<(), Error> {
match rpc_cli.call(&rpc_host, args, DEFAULT_TIMEOUT).await? {
- AdminRPC::Ok => {
- println!("Ok.");
+ AdminRPC::Ok(msg) => {
+ println!("{}", msg);
}
AdminRPC::BucketList(bl) => {
println!("List of buckets:");
diff --git a/src/membership.rs b/src/membership.rs
index 412a83f8..fc21d0b5 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -362,7 +362,7 @@ impl System {
let ring = self.ring.borrow().clone();
Message::Ping(PingMessage {
id: self.id.clone(),
- rpc_port: self.config.rpc_port,
+ rpc_port: self.config.rpc_bind_addr.port(),
status_hash: status.hash.clone(),
config_version: ring.config.version,
state_info: self.state_info.clone(),
@@ -539,7 +539,7 @@ impl System {
for node in adv.iter() {
if node.id == self.id {
// learn our own ip address
- let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port);
+ let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_bind_addr.port());
let old_self = status.nodes.insert(
node.id.clone(),
StatusEntry {
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index 7a6a57ee..f78c27f1 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -145,10 +145,7 @@ impl RpcServer {
match socket {
Ok(stream) => match tls_acceptor.clone().accept(stream).await {
Ok(x) => Some(Ok::<_, hyper::Error>(x)),
- Err(e) => {
- eprintln!("RPC server TLS error: {}", e);
- None
- }
+ Err(_e) => None,
},
Err(_) => None,
}
diff --git a/src/server.rs b/src/server.rs
index 979d76f9..464d550b 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -3,7 +3,7 @@ use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
-pub use futures_util::future::FutureExt;
+use futures_util::future::*;
use serde::Deserialize;
use tokio::sync::watch;
@@ -30,8 +30,8 @@ pub struct Config {
pub metadata_dir: PathBuf,
pub data_dir: PathBuf,
- pub api_port: u16,
- pub rpc_port: u16,
+ pub api_bind_addr: SocketAddr,
+ pub rpc_bind_addr: SocketAddr,
pub bootstrap_peers: Vec<SocketAddr>,
@@ -252,8 +252,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
let db = sled::open(db_path).expect("Unable to open DB");
println!("Initialize RPC server...");
- let rpc_bind_addr = ([0, 0, 0, 0, 0, 0, 0, 0], config.rpc_port).into();
- let mut rpc_server = RpcServer::new(rpc_bind_addr, config.rpc_tls.clone());
+ let mut rpc_server = RpcServer::new(config.rpc_bind_addr.clone(), config.rpc_tls.clone());
println!("Initializing background runner...");
let (send_cancel, watch_cancel) = watch::channel(false);
@@ -266,11 +265,26 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
futures::try_join!(
- garage.system.clone().bootstrap().map(Ok),
- run_rpc_server,
- api_server,
- background.run().map(Ok),
+ garage.system.clone().bootstrap().map(|rv| {
+ println!("Bootstrap done");
+ Ok(rv)
+ }),
+ run_rpc_server.map(|rv| {
+ println!("RPC server exited");
+ rv
+ }),
+ api_server.map(|rv| {
+ println!("API server exited");
+ rv
+ }),
+ background.run().map(|rv| {
+ println!("Background runner exited");
+ Ok(rv)
+ }),
shutdown_signal(send_cancel),
)?;
+
+ println!("Cleaning up...");
+
Ok(())
}
diff --git a/src/table.rs b/src/table.rs
index 37fb2f51..80364d17 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -436,6 +436,7 @@ where
self: &Arc<Self>,
mut entries: Vec<Arc<ByteBuf>>,
) -> Result<(), Error> {
+ let syncer = self.syncer.load_full().unwrap();
let mut epidemic_propagate = vec![];
for update_bytes in entries.drain(..) {
@@ -469,9 +470,9 @@ where
}
self.instance.updated(old_entry, Some(new_entry)).await;
-
- let syncer = self.syncer.load_full().unwrap();
- self.system.background.spawn(syncer.invalidate(tree_key));
+ self.system
+ .background
+ .spawn(syncer.clone().invalidate(tree_key));
}
}
@@ -486,6 +487,8 @@ where
}
pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> {
+ let syncer = self.syncer.load_full().unwrap();
+
eprintln!("({}) Deleting range {:?} - {:?}", self.name, begin, end);
let mut count = 0;
while let Some((key, _value)) = self.store.get_lt(end.as_slice())? {
@@ -495,6 +498,9 @@ where
if let Some(old_val) = self.store.remove(&key)? {
let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?;
self.instance.updated(Some(old_entry), None).await;
+ self.system
+ .background
+ .spawn(syncer.clone().invalidate(key.to_vec()));
count += 1;
}
}
diff --git a/src/table_sync.rs b/src/table_sync.rs
index 0f3e90d2..6f45969b 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -9,8 +9,8 @@ use futures_util::future::*;
use futures_util::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
-use tokio::sync::watch;
use tokio::sync::Mutex;
+use tokio::sync::{mpsc, watch};
use crate::data::*;
use crate::error::Error;
@@ -18,9 +18,8 @@ use crate::membership::Ring;
use crate::table::*;
const MAX_DEPTH: usize = 16;
-const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
+const SCAN_INTERVAL: Duration = Duration::from_secs(60);
const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800);
-
const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(10);
pub struct TableSyncer<F: TableSchema, R: TableReplication> {
@@ -91,18 +90,24 @@ where
.collect::<Vec<_>>(),
});
+ let (busy_tx, busy_rx) = mpsc::unbounded_channel();
+
let s1 = syncer.clone();
table
.system
.background
- .spawn_worker(move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit))
+ .spawn_worker(move |must_exit: watch::Receiver<bool>| {
+ s1.watcher_task(must_exit, busy_rx)
+ })
.await;
let s2 = syncer.clone();
table
.system
.background
- .spawn_worker(move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit))
+ .spawn_worker(move |must_exit: watch::Receiver<bool>| {
+ s2.syncer_task(must_exit, busy_tx)
+ })
.await;
syncer
@@ -111,25 +116,20 @@ where
async fn watcher_task(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
+ mut busy_rx: mpsc::UnboundedReceiver<bool>,
) -> Result<(), Error> {
- tokio::time::delay_for(Duration::from_secs(10)).await;
-
- self.todo.lock().await.add_full_scan(&self.table);
- let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone();
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone();
+ let mut nothing_to_do_since = Some(Instant::now());
while !*must_exit.borrow() {
let s_ring_recv = ring_recv.recv().fuse();
+ let s_busy = busy_rx.recv().fuse();
let s_must_exit = must_exit.recv().fuse();
- pin_mut!(s_ring_recv, s_must_exit);
+ let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse();
+ pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout);
select! {
- _ = next_full_scan => {
- next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
- eprintln!("({}) Adding full scan to syncer todo list", self.table.name);
- self.todo.lock().await.add_full_scan(&self.table);
- }
new_ring_r = s_ring_recv => {
if let Some(new_ring) = new_ring_r {
eprintln!("({}) Adding ring difference to syncer todo list", self.table.name);
@@ -137,11 +137,29 @@ where
prev_ring = new_ring;
}
}
+ busy_opt = s_busy => {
+ if let Some(busy) = busy_opt {
+ if busy {
+ nothing_to_do_since = None;
+ } else {
+ if nothing_to_do_since.is_none() {
+ nothing_to_do_since = Some(Instant::now());
+ }
+ }
+ }
+ }
must_exit_v = s_must_exit => {
if must_exit_v.unwrap_or(false) {
break;
}
}
+ _ = s_timeout => {
+ if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) {
+ nothing_to_do_since = None;
+ eprintln!("({}) Adding full scan to syncer todo list", self.table.name);
+ self.todo.lock().await.add_full_scan(&self.table);
+ }
+ }
}
}
Ok(())
@@ -150,9 +168,11 @@ where
async fn syncer_task(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
+ busy_tx: mpsc::UnboundedSender<bool>,
) -> Result<(), Error> {
while !*must_exit.borrow() {
if let Some(partition) = self.todo.lock().await.pop_task() {
+ busy_tx.send(true)?;
let res = self
.clone()
.sync_partition(&partition, &mut must_exit)
@@ -164,6 +184,7 @@ where
);
}
} else {
+ busy_tx.send(false)?;
tokio::time::delay_for(Duration::from_secs(1)).await;
}
}