aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/background.rs2
-rw-r--r--src/main.rs19
-rw-r--r--src/membership.rs8
-rw-r--r--src/rpc_client.rs6
-rw-r--r--src/rpc_server.rs4
-rw-r--r--src/server.rs6
-rw-r--r--src/table.rs35
-rw-r--r--src/table_sync.rs204
-rw-r--r--src/tls_util.rs107
9 files changed, 297 insertions, 94 deletions
diff --git a/src/background.rs b/src/background.rs
index de0f07af..772745a6 100644
--- a/src/background.rs
+++ b/src/background.rs
@@ -68,7 +68,7 @@ impl BackgroundRunner {
let _: Result<_, _> = self.queue_in.clone().send((boxed, true));
}
- pub async fn spawn_worker<F, T>(self: Arc<Self>, worker: F)
+ pub async fn spawn_worker<F, T>(&self, worker: F)
where
F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
T: Future<Output = JobOutput> + Send + 'static,
diff --git a/src/main.rs b/src/main.rs
index ea6124b5..8b124bff 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -5,6 +5,7 @@ mod proto;
mod background;
mod membership;
mod table;
+mod table_sync;
mod block;
mod block_ref_table;
@@ -36,11 +37,11 @@ pub struct Opt {
#[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")]
rpc_host: SocketAddr,
- #[structopt(long="ca-cert")]
+ #[structopt(long = "ca-cert")]
ca_cert: Option<String>,
- #[structopt(long="client-cert")]
+ #[structopt(long = "client-cert")]
client_cert: Option<String>,
- #[structopt(long="client-key")]
+ #[structopt(long = "client-key")]
client_key: Option<String>,
#[structopt(subcommand)]
@@ -86,13 +87,11 @@ async fn main() {
let opt = Opt::from_args();
let tls_config = match (opt.ca_cert, opt.client_cert, opt.client_key) {
- (Some(ca_cert), Some(client_cert), Some(client_key)) => {
- Some(TlsConfig{
- ca_cert,
- node_cert: client_cert,
- node_key: client_key,
- })
- }
+ (Some(ca_cert), Some(client_cert), Some(client_key)) => Some(TlsConfig {
+ ca_cert,
+ node_cert: client_cert,
+ node_key: client_key,
+ }),
(None, None, None) => None,
_ => {
eprintln!("Missing one of: --ca-cert, --node-cert, --node-key. Not using TLS.");
diff --git a/src/membership.rs b/src/membership.rs
index 89550b67..368e9355 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -157,7 +157,7 @@ impl Ring {
self.walk_ring_from_pos(start, n)
}
- fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec<UUID> {
+ pub fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec<UUID> {
let mut ret = vec![];
let mut datacenters = vec![];
@@ -282,13 +282,13 @@ impl System {
.collect::<Vec<_>>();
self.clone().ping_nodes(bootstrap_peers).await;
- self.background
- .clone()
+ self.clone()
+ .background
.spawn_worker(|stop_signal| self.ping_loop(stop_signal).map(Ok))
.await;
}
- pub async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
+ async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
let ping_msg = self.make_ping();
let ping_resps = join_all(peers.iter().map(|(addr, id_option)| {
let sys = self.clone();
diff --git a/src/rpc_client.rs b/src/rpc_client.rs
index 6f897a90..bb0ca56c 100644
--- a/src/rpc_client.rs
+++ b/src/rpc_client.rs
@@ -142,7 +142,10 @@ impl RpcClient {
let resp = tokio::time::timeout(timeout, resp_fut)
.await?
.map_err(|e| {
- eprintln!("RPC HTTP client error when connecting to {}: {}", to_addr, e);
+ eprintln!(
+ "RPC HTTP client error when connecting to {}: {}",
+ to_addr, e
+ );
e
})?;
@@ -158,4 +161,3 @@ impl RpcClient {
}
}
}
-
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index b75d67fd..16ea0ca8 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -76,9 +76,7 @@ async fn handler(
// and the request handler simply sits there waiting for the task to finish.
// (if it's cancelled, that's not an issue)
// (TODO FIXME except if garage happens to shut down at that point)
- let write_fut = async move {
- garage.block_manager.write_block(&m.hash, &m.data).await
- };
+ let write_fut = async move { garage.block_manager.write_block(&m.hash, &m.data).await };
tokio::spawn(write_fut).await?
}
Message::GetBlock(h) => garage.block_manager.read_block(&h).await,
diff --git a/src/server.rs b/src/server.rs
index 8b49f105..af58ded1 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -93,7 +93,7 @@ impl Garage {
&db,
"block_ref".to_string(),
data_rep_param.clone(),
- );
+ ).await;
let version_table = Table::new(
VersionTable {
background: background.clone(),
@@ -103,7 +103,7 @@ impl Garage {
&db,
"version".to_string(),
meta_rep_param.clone(),
- );
+ ).await;
let object_table = Table::new(
ObjectTable {
background: background.clone(),
@@ -113,7 +113,7 @@ impl Garage {
&db,
"object".to_string(),
meta_rep_param.clone(),
- );
+ ).await;
let mut garage = Self {
db,
diff --git a/src/table.rs b/src/table.rs
index 69d818c2..533b4291 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -12,6 +12,7 @@ use crate::error::Error;
use crate::membership::System;
use crate::proto::*;
use crate::rpc_client::*;
+use crate::table_sync::TableSyncer;
pub struct Table<F: TableSchema> {
pub instance: F,
@@ -20,7 +21,6 @@ pub struct Table<F: TableSchema> {
pub system: Arc<System>,
pub store: sled::Tree,
- pub partitions: Vec<Partition>,
pub param: TableReplicationParams,
}
@@ -61,12 +61,6 @@ pub enum TableRPC<F: TableSchema> {
Update(Vec<Arc<ByteBuf>>),
}
-pub struct Partition {
- pub begin: Hash,
- pub end: Hash,
- pub other_nodes: Vec<UUID>,
-}
-
pub trait PartitionKey {
fn hash(&self) -> Hash;
}
@@ -124,7 +118,7 @@ pub trait TableSchema: Send + Sync {
}
impl<F: TableSchema + 'static> Table<F> {
- pub fn new(
+ pub async fn new(
instance: F,
system: Arc<System>,
db: &sled::Db,
@@ -132,14 +126,15 @@ impl<F: TableSchema + 'static> Table<F> {
param: TableReplicationParams,
) -> Arc<Self> {
let store = db.open_tree(&name).expect("Unable to open DB tree");
- Arc::new(Self {
+ let table = Arc::new(Self {
instance,
name,
system,
store,
- partitions: Vec::new(),
param,
- })
+ });
+ TableSyncer::launch(table.clone()).await;
+ table
}
pub fn rpc_handler(self: Arc<Self>) -> Box<dyn TableRpcHandler + Send + Sync> {
@@ -207,7 +202,11 @@ impl<F: TableSchema + 'static> Table<F> {
}
}
- pub async fn get(&self, partition_key: &F::P, sort_key: &F::S) -> Result<Option<F::E>, Error> {
+ pub async fn get(
+ self: &Arc<Self>,
+ partition_key: &F::P,
+ sort_key: &F::S,
+ ) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash();
let who = self
.system
@@ -245,17 +244,19 @@ impl<F: TableSchema + 'static> Table<F> {
}
if let Some(ret_entry) = &ret {
if not_all_same {
- let _: Result<_, _> = self.repair_on_read(&who[..], &ret_entry).await;
+ self.system
+ .background
+ .spawn(self.clone().repair_on_read(who, ret_entry.clone()));
}
}
Ok(ret)
}
- async fn repair_on_read(&self, who: &[UUID], what: &F::E) -> Result<(), Error> {
- let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(what)?));
+ async fn repair_on_read(self: Arc<Self>, who: Vec<UUID>, what: F::E) -> Result<(), Error> {
+ let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
self.rpc_try_call_many(&who[..], &TableRPC::<F>::Update(vec![what_enc]), who.len())
- .await
- .map(|_| ())
+ .await?;
+ Ok(())
}
async fn rpc_try_call_many(
diff --git a/src/table_sync.rs b/src/table_sync.rs
new file mode 100644
index 00000000..5097c1b0
--- /dev/null
+++ b/src/table_sync.rs
@@ -0,0 +1,204 @@
+use rand::Rng;
+use std::sync::Arc;
+use std::time::Duration;
+use std::collections::BTreeSet;
+
+use futures::{pin_mut, select};
+use futures_util::future::*;
+use tokio::sync::watch;
+use tokio::sync::Mutex;
+
+use crate::data::*;
+use crate::error::Error;
+use crate::membership::{Ring, System};
+use crate::table::*;
+
+const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
+
+pub struct TableSyncer<F: TableSchema> {
+ pub table: Arc<Table<F>>,
+
+ pub todo: Mutex<SyncTodo>,
+}
+
+pub struct SyncTodo {
+ pub todo: Vec<Partition>,
+}
+
+#[derive(Debug, Clone)]
+pub struct Partition {
+ pub begin: Hash,
+ pub end: Hash,
+}
+
+impl<F: TableSchema + 'static> TableSyncer<F> {
+ pub async fn launch(table: Arc<Table<F>>) -> Arc<Self> {
+ let todo = SyncTodo { todo: Vec::new() };
+ let syncer = Arc::new(TableSyncer {
+ table: table.clone(),
+ todo: Mutex::new(todo),
+ });
+
+ let s1 = syncer.clone();
+ table
+ .system
+ .background
+ .spawn_worker(move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit))
+ .await;
+
+ let s2 = syncer.clone();
+ table
+ .system
+ .background
+ .spawn_worker(move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit))
+ .await;
+
+ syncer
+ }
+
+ async fn watcher_task(
+ self: Arc<Self>,
+ mut must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ self.todo.lock().await.add_full_scan(&self.table);
+ let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
+ let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone();
+ let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone();
+
+ loop {
+ let s_ring_recv = ring_recv.recv().fuse();
+ let s_must_exit = must_exit.recv().fuse();
+ pin_mut!(s_ring_recv, s_must_exit);
+
+ select! {
+ _ = next_full_scan => {
+ next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
+ self.todo.lock().await.add_full_scan(&self.table);
+ }
+ new_ring_r = s_ring_recv => {
+ if let Some(new_ring) = new_ring_r {
+ self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring);
+ prev_ring = new_ring;
+ }
+ }
+ must_exit_v = s_must_exit => {
+ if must_exit_v.unwrap_or(false) {
+ return Ok(())
+ }
+ }
+ }
+ }
+ }
+
+ async fn syncer_task(
+ self: Arc<Self>,
+ mut must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ loop {
+ let s_pop_task = self.pop_task().fuse();
+ let s_must_exit = must_exit.recv().fuse();
+ pin_mut!(s_must_exit, s_pop_task);
+
+ select! {
+ task = s_pop_task => {
+ if let Some(partition) = task {
+ let res = self.sync_partition(&partition).await;
+ if let Err(e) = res {
+ eprintln!("Error while syncing {:?}: {}", partition, e);
+ }
+ } else {
+ tokio::time::delay_for(Duration::from_secs(1)).await;
+ }
+ }
+ must_exit_v = s_must_exit => {
+ if must_exit_v.unwrap_or(false) {
+ return Ok(())
+ }
+ }
+ }
+ }
+ }
+
+ async fn pop_task(&self) -> Option<Partition> {
+ self.todo.lock().await.pop_task()
+ }
+
+ async fn sync_partition(self: &Arc<Self>, partition: &Partition) -> Result<(), Error> {
+ unimplemented!()
+ }
+}
+
+impl SyncTodo {
+ fn add_full_scan<F: TableSchema>(&mut self, table: &Table<F>) {
+ let my_id = table.system.id.clone();
+
+ self.todo.clear();
+
+ let ring: Arc<Ring> = table.system.ring.borrow().clone();
+ for i in 0..ring.ring.len() {
+ let nodes = ring.walk_ring_from_pos(i, table.param.replication_factor);
+ let begin = ring.ring[i].location.clone();
+
+ if i == ring.ring.len() - 1 {
+ let end = ring.ring[0].location.clone();
+ self.add_full_scan_aux(table, begin, [0xffu8; 32].into(), &nodes[..], &my_id);
+ self.add_full_scan_aux(table, [0u8; 32].into(), end, &nodes[..], &my_id);
+ } else {
+ let end = ring.ring[i + 1].location.clone();
+ self.add_full_scan_aux(table, begin, end, &nodes[..], &my_id);
+ }
+ }
+ }
+
+ fn add_full_scan_aux<F: TableSchema>(
+ &mut self,
+ table: &Table<F>,
+ begin: Hash,
+ end: Hash,
+ nodes: &[UUID],
+ my_id: &UUID,
+ ) {
+ if !nodes.contains(my_id) {
+ // Check if we have some data to send, otherwise skip
+ if table
+ .store
+ .range(begin.clone()..end.clone())
+ .next()
+ .is_none()
+ {}
+ }
+
+ self.todo.push(Partition { begin, end });
+ }
+
+ fn add_ring_difference<F: TableSchema>(&mut self, table: &Table<F>, old: &Ring, new: &Ring) {
+ let old_ring = ring_points(old);
+ let new_ring = ring_points(new);
+ unimplemented!()
+ }
+
+ fn pop_task(&mut self) -> Option<Partition> {
+ if self.todo.is_empty() {
+ return None;
+ }
+
+ let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len());
+ if i == self.todo.len() - 1 {
+ self.todo.pop()
+ } else {
+ let replacement = self.todo.pop().unwrap();
+ let ret = std::mem::replace(&mut self.todo[i], replacement);
+ Some(ret)
+ }
+ }
+}
+
+fn ring_points(ring: &Ring) -> BTreeSet<Hash> {
+ let mut ret = BTreeSet::new();
+ ret.insert([0u8; 32].into());
+ ret.insert([0xFFu8; 32].into());
+ for i in 0..ring.ring.len() {
+ ret.insert(ring.ring[i].location.clone());
+ }
+ ret
+}
diff --git a/src/tls_util.rs b/src/tls_util.rs
index dfc4e716..52c52110 100644
--- a/src/tls_util.rs
+++ b/src/tls_util.rs
@@ -1,17 +1,17 @@
-use std::{fs, io};
-use core::task::{Poll, Context};
+use core::future::Future;
+use core::task::{Context, Poll};
use std::pin::Pin;
use std::sync::Arc;
-use core::future::Future;
+use std::{fs, io};
use futures_util::future::*;
-use tokio::io::{AsyncRead, AsyncWrite};
-use rustls::internal::pemfile;
-use hyper::client::HttpConnector;
use hyper::client::connect::Connection;
+use hyper::client::HttpConnector;
use hyper::service::Service;
use hyper::Uri;
use hyper_rustls::MaybeHttpsStream;
+use rustls::internal::pemfile;
+use tokio::io::{AsyncRead, AsyncWrite};
use tokio_rustls::TlsConnector;
use webpki::DNSNameRef;
@@ -58,7 +58,6 @@ pub fn load_private_key(filename: &str) -> Result<rustls::PrivateKey, Error> {
Ok(keys[0].clone())
}
-
// ---- AWFUL COPYPASTA FROM HYPER-RUSTLS connector.rs
// ---- ALWAYS USE `garage` AS HOSTNAME FOR TLS VERIFICATION
@@ -85,56 +84,56 @@ impl HttpsConnectorFixedDnsname<HttpConnector> {
}
impl<T> Service<Uri> for HttpsConnectorFixedDnsname<T>
- where
+where
T: Service<Uri>,
T::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
T::Future: Send + 'static,
T::Error: Into<BoxError>,
{
- type Response = MaybeHttpsStream<T::Response>;
- type Error = BoxError;
-
- #[allow(clippy::type_complexity)]
- type Future =
- Pin<Box<dyn Future<Output = Result<MaybeHttpsStream<T::Response>, BoxError>> + Send>>;
-
- fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- match self.http.poll_ready(cx) {
- Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
- Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
- Poll::Pending => Poll::Pending,
- }
- }
-
- fn call(&mut self, dst: Uri) -> Self::Future {
- let is_https = dst.scheme_str() == Some("https");
-
- if !is_https {
- let connecting_future = self.http.call(dst);
-
- let f = async move {
- let tcp = connecting_future.await.map_err(Into::into)?;
-
- Ok(MaybeHttpsStream::Http(tcp))
- };
- f.boxed()
- } else {
- let cfg = self.tls_config.clone();
- let connecting_future = self.http.call(dst);
-
- let dnsname = DNSNameRef::try_from_ascii_str(self.fixed_dnsname)
- .expect("Invalid fixed dnsname");
-
- let f = async move {
- let tcp = connecting_future.await.map_err(Into::into)?;
- let connector = TlsConnector::from(cfg);
- let tls = connector
- .connect(dnsname, tcp)
- .await
- .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
- Ok(MaybeHttpsStream::Https(tls))
- };
- f.boxed()
- }
- }
+ type Response = MaybeHttpsStream<T::Response>;
+ type Error = BoxError;
+
+ #[allow(clippy::type_complexity)]
+ type Future =
+ Pin<Box<dyn Future<Output = Result<MaybeHttpsStream<T::Response>, BoxError>> + Send>>;
+
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ match self.http.poll_ready(cx) {
+ Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
+ Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
+ Poll::Pending => Poll::Pending,
+ }
+ }
+
+ fn call(&mut self, dst: Uri) -> Self::Future {
+ let is_https = dst.scheme_str() == Some("https");
+
+ if !is_https {
+ let connecting_future = self.http.call(dst);
+
+ let f = async move {
+ let tcp = connecting_future.await.map_err(Into::into)?;
+
+ Ok(MaybeHttpsStream::Http(tcp))
+ };
+ f.boxed()
+ } else {
+ let cfg = self.tls_config.clone();
+ let connecting_future = self.http.call(dst);
+
+ let dnsname =
+ DNSNameRef::try_from_ascii_str(self.fixed_dnsname).expect("Invalid fixed dnsname");
+
+ let f = async move {
+ let tcp = connecting_future.await.map_err(Into::into)?;
+ let connector = TlsConnector::from(cfg);
+ let tls = connector
+ .connect(dnsname, tcp)
+ .await
+ .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
+ Ok(MaybeHttpsStream::Https(tls))
+ };
+ f.boxed()
+ }
+ }
}