aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/data.rs13
-rw-r--r--src/membership.rs10
-rw-r--r--src/rpc_client.rs2
-rw-r--r--src/rpc_server.rs29
-rw-r--r--src/table.rs24
-rw-r--r--src/table_sync.rs88
6 files changed, 129 insertions, 37 deletions
diff --git a/src/data.rs b/src/data.rs
index fd7f9a8b..c1665d2a 100644
--- a/src/data.rs
+++ b/src/data.rs
@@ -111,6 +111,19 @@ where
Ok(wr)
}
+pub fn debug_serialize<T: Serialize>(x: T) -> String {
+ match serde_json::to_string(&x) {
+ Ok(ss) => {
+ if ss.len() > 100 {
+ ss[..100].to_string()
+ } else {
+ ss
+ }
+ }
+ Err(e) => format!("<JSON serialization error: {}>", e),
+ }
+}
+
// Network management
#[derive(Clone, Debug, Serialize, Deserialize)]
diff --git a/src/membership.rs b/src/membership.rs
index 368e9355..f511a4fd 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -131,11 +131,11 @@ impl Ring {
self.ring = new_ring;
self.n_datacenters = datacenters.len();
- eprintln!("RING: --");
- for e in self.ring.iter() {
- eprintln!("{:?}", e);
- }
- eprintln!("END --");
+ // eprintln!("RING: --");
+ // for e in self.ring.iter() {
+ // eprintln!("{:?}", e);
+ // }
+ // eprintln!("END --");
}
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
diff --git a/src/rpc_client.rs b/src/rpc_client.rs
index bb0ca56c..a1c5dde0 100644
--- a/src/rpc_client.rs
+++ b/src/rpc_client.rs
@@ -84,7 +84,7 @@ pub async fn rpc_call(
let status = sys.status.borrow().clone();
match status.nodes.get(to) {
Some(status) => status.addr.clone(),
- None => return Err(Error::Message(format!("Peer ID not found"))),
+ None => return Err(Error::Message(format!("Peer ID not found: {:?}", to))),
}
};
sys.rpc_client.call(&addr, msg, timeout).await
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index 16ea0ca8..3527eda3 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -8,29 +8,16 @@ use futures_util::stream::*;
use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
-use serde::Serialize;
use tokio::net::{TcpListener, TcpStream};
use tokio_rustls::server::TlsStream;
use tokio_rustls::TlsAcceptor;
-use crate::data::rmp_to_vec_all_named;
+use crate::data::{rmp_to_vec_all_named, debug_serialize};
use crate::error::Error;
use crate::proto::Message;
use crate::server::Garage;
use crate::tls_util;
-fn debug_serialize<T: Serialize>(x: T) -> String {
- match serde_json::to_string(&x) {
- Ok(ss) => {
- if ss.len() > 100 {
- ss[..100].to_string()
- } else {
- ss
- }
- }
- Err(e) => format!("<JSON serialization error: {}>", e),
- }
-}
fn err_to_msg(x: Result<Message, Error>) -> Message {
match x {
@@ -53,12 +40,12 @@ async fn handler(
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf())?;
- eprintln!(
- "RPC from {}: {} ({} bytes)",
- addr,
- debug_serialize(&msg),
- whole_body.len()
- );
+ // eprintln!(
+ // "RPC from {}: {} ({} bytes)",
+ // addr,
+ // debug_serialize(&msg),
+ // whole_body.len()
+ // );
let sys = garage.system.clone();
let resp = err_to_msg(match msg {
@@ -99,7 +86,7 @@ async fn handler(
_ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))),
});
- eprintln!("reply to {}: {}", addr, debug_serialize(&resp));
+ // eprintln!("reply to {}: {}", addr, debug_serialize(&resp));
Ok(Response::new(Body::from(rmp_to_vec_all_named(&resp)?)))
}
diff --git a/src/table.rs b/src/table.rs
index 99ac77bb..162f98e6 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -306,7 +306,20 @@ impl<F: TableSchema + 'static> Table<F> {
Ok(resps_vals)
}
- async fn handle(&self, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> {
+ pub async fn rpc_call(&self, who: &UUID, rpc: &TableRPC<F>) -> Result<TableRPC<F>, Error> {
+ let rpc_bytes = rmp_to_vec_all_named(rpc)?;
+ let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes);
+
+ let resp = rpc_call(self.system.clone(), who, &rpc_msg, self.param.timeout).await?;
+ if let Message::TableRPC(tbl, rep_by) = &resp {
+ if *tbl == self.name {
+ return Ok(rmp_serde::decode::from_read_ref(&rep_by)?);
+ }
+ }
+ Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp)))
+ }
+
+ async fn handle(self: &Arc<Self>, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> {
match msg {
TableRPC::ReadEntry(key, sort_key) => {
let value = self.handle_read_entry(&key, &sort_key)?;
@@ -334,7 +347,7 @@ impl<F: TableSchema + 'static> Table<F> {
}
}
- async fn handle_update(&self, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> {
+ async fn handle_update(self: &Arc<Self>, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> {
for update_bytes in entries.drain(..) {
let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?;
@@ -360,7 +373,12 @@ impl<F: TableSchema + 'static> Table<F> {
Ok((old_entry, new_entry))
})?;
- self.instance.updated(old_entry, new_entry).await;
+ if old_entry.as_ref() != Some(&new_entry) {
+ self.instance.updated(old_entry, new_entry).await;
+
+ let syncer = self.syncer.read().await.as_ref().unwrap().clone();
+ self.system.background.spawn(syncer.invalidate(tree_key));
+ }
}
Ok(())
}
diff --git a/src/table_sync.rs b/src/table_sync.rs
index 3dd9df33..92aa8c2a 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -1,5 +1,5 @@
use rand::Rng;
-use std::collections::{BTreeSet, HashMap, VecDeque};
+use std::collections::{BTreeSet, BTreeMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -10,19 +10,21 @@ use futures_util::future::*;
use tokio::sync::watch;
use tokio::sync::Mutex;
use serde::{Serialize, Deserialize};
+use serde_bytes::ByteBuf;
use crate::data::*;
use crate::error::Error;
use crate::membership::Ring;
use crate::table::*;
+const MAX_DEPTH: usize = 16;
const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800);
pub struct TableSyncer<F: TableSchema> {
pub table: Arc<Table<F>>,
pub todo: Mutex<SyncTodo>,
- pub cache: Vec<Mutex<HashMap<SyncRange, RangeChecksum>>>,
+ pub cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksum>>>,
}
pub struct SyncTodo {
@@ -43,6 +45,17 @@ pub struct SyncRange {
pub level: usize,
}
+impl std::cmp::PartialOrd for SyncRange {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ Some(self.cmp(other))
+ }
+}
+impl std::cmp::Ord for SyncRange {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ self.begin.cmp(&other.begin)
+ }
+}
+
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RangeChecksum {
pub bounds: SyncRange,
@@ -59,7 +72,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
let syncer = Arc::new(TableSyncer {
table: table.clone(),
todo: Mutex::new(todo),
- cache: (0..32).map(|_| Mutex::new(HashMap::new())).collect::<Vec<_>>(),
+ cache: (0..MAX_DEPTH).map(|_| Mutex::new(BTreeMap::new())).collect::<Vec<_>>(),
});
let s1 = syncer.clone();
@@ -83,12 +96,14 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
+ tokio::time::delay_for(Duration::from_secs(10));
+
self.todo.lock().await.add_full_scan(&self.table);
let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone();
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone();
- loop {
+ while !*must_exit.borrow() {
let s_ring_recv = ring_recv.recv().fuse();
let s_must_exit = must_exit.recv().fuse();
pin_mut!(s_ring_recv, s_must_exit);
@@ -96,21 +111,24 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
select! {
_ = next_full_scan => {
next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
+ eprintln!("Adding full scan to syncer todo list");
self.todo.lock().await.add_full_scan(&self.table);
}
new_ring_r = s_ring_recv => {
if let Some(new_ring) = new_ring_r {
+ eprintln!("Adding ring difference to syncer todo list");
self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring);
prev_ring = new_ring;
}
}
must_exit_v = s_must_exit => {
if must_exit_v.unwrap_or(false) {
- return Ok(())
+ break;
}
}
}
}
+ Ok(())
}
async fn syncer_task(
@@ -131,6 +149,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
async fn sync_partition(self: Arc<Self>, partition: &Partition, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
+ eprintln!("Calculating root checksum for {:?}...", partition);
let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit).await?;
eprintln!("Root checksum for {:?}: {:?}", partition, root_cks);
@@ -152,7 +171,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
async fn root_checksum(self: &Arc<Self>, begin: &Hash, end: &Hash, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> {
- for i in 1..32 {
+ for i in 1..MAX_DEPTH {
let rc = self.range_checksum(&SyncRange{
begin: begin.to_vec(),
end: end.to_vec(),
@@ -262,13 +281,49 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
todo.push_back(root_ck);
while !todo.is_empty() && !*must_exit.borrow() {
+ eprintln!("Sync with {:?}: {} remaining", who, todo.len());
+
let end = std::cmp::min(16, todo.len());
let step = todo.drain(..end).collect::<Vec<_>>();
- unimplemented!()
+
+ let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::SyncChecksums(step)).await?;
+ if let TableRPC::<F>::SyncDifferentSet(mut s) = rpc_resp {
+ let mut items = vec![];
+ for differing in s.drain(..) {
+ if differing.level == 0 {
+ items.push(differing.begin);
+ } else {
+ let checksum = self.range_checksum(&differing, &mut must_exit).await?;
+ todo.push_back(checksum);
+ }
+ }
+ if items.len() > 0 {
+ self.table.system.background.spawn(self.clone().send_items(who.clone(), items));
+ }
+ } else {
+ return Err(Error::Message(format!("Unexpected response to RPC SyncChecksums: {}", debug_serialize(&rpc_resp))));
+ }
}
Ok(())
}
+ async fn send_items(self: Arc<Self>, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
+ eprintln!("Sending {} items to {:?}", item_list.len(), who);
+
+ let mut values = vec![];
+ for item in item_list.iter() {
+ if let Some(v) = self.table.store.get(&item[..])? {
+ values.push(Arc::new(ByteBuf::from(v.as_ref())));
+ }
+ }
+ let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::Update(values)).await?;
+ if let TableRPC::<F>::Ok = rpc_resp {
+ Ok(())
+ } else {
+ Err(Error::Message(format!("Unexpected response to RPC Update: {}", debug_serialize(&rpc_resp))))
+ }
+ }
+
pub async fn handle_checksum_rpc(self: &Arc<Self>, checksums: &[RangeChecksum], mut must_exit: watch::Receiver<bool>) -> Result<Vec<SyncRange>, Error> {
let mut ret = vec![];
for ckr in checksums.iter() {
@@ -288,6 +343,25 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
Ok(ret)
}
+
+ pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> {
+ for i in 1..MAX_DEPTH {
+ let needle = SyncRange{
+ begin: item_key.to_vec(),
+ end: vec![],
+ level: i,
+ };
+ let mut cache = self.cache[i].lock().await;
+ if let Some(cache_entry) = cache.range(..=needle).rev().next() {
+ if cache_entry.0.begin <= item_key && cache_entry.0.end > item_key {
+ let index = cache_entry.0.clone();
+ drop(cache_entry);
+ cache.remove(&index);
+ }
+ }
+ }
+ Ok(())
+ }
}
impl SyncTodo {