diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-16 19:28:02 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-16 19:28:02 +0200 |
commit | 2f3b1a072f63330c101d4e2703a24e0f26b98674 (patch) | |
tree | bfd11768154f81d8904a932f17f2cf5fe36038b1 | |
parent | 2832be4396d2ed40bfb0bd4358bc14ef7432decb (diff) | |
download | garage-2f3b1a072f63330c101d4e2703a24e0f26b98674.tar.gz garage-2f3b1a072f63330c101d4e2703a24e0f26b98674.zip |
WIP
-rw-r--r-- | src/data.rs | 13 | ||||
-rw-r--r-- | src/membership.rs | 10 | ||||
-rw-r--r-- | src/rpc_client.rs | 2 | ||||
-rw-r--r-- | src/rpc_server.rs | 29 | ||||
-rw-r--r-- | src/table.rs | 24 | ||||
-rw-r--r-- | src/table_sync.rs | 88 |
6 files changed, 129 insertions, 37 deletions
diff --git a/src/data.rs b/src/data.rs index fd7f9a8b..c1665d2a 100644 --- a/src/data.rs +++ b/src/data.rs @@ -111,6 +111,19 @@ where Ok(wr) } +pub fn debug_serialize<T: Serialize>(x: T) -> String { + match serde_json::to_string(&x) { + Ok(ss) => { + if ss.len() > 100 { + ss[..100].to_string() + } else { + ss + } + } + Err(e) => format!("<JSON serialization error: {}>", e), + } +} + // Network management #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/src/membership.rs b/src/membership.rs index 368e9355..f511a4fd 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -131,11 +131,11 @@ impl Ring { self.ring = new_ring; self.n_datacenters = datacenters.len(); - eprintln!("RING: --"); - for e in self.ring.iter() { - eprintln!("{:?}", e); - } - eprintln!("END --"); + // eprintln!("RING: --"); + // for e in self.ring.iter() { + // eprintln!("{:?}", e); + // } + // eprintln!("END --"); } pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> { diff --git a/src/rpc_client.rs b/src/rpc_client.rs index bb0ca56c..a1c5dde0 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -84,7 +84,7 @@ pub async fn rpc_call( let status = sys.status.borrow().clone(); match status.nodes.get(to) { Some(status) => status.addr.clone(), - None => return Err(Error::Message(format!("Peer ID not found"))), + None => return Err(Error::Message(format!("Peer ID not found: {:?}", to))), } }; sys.rpc_client.call(&addr, msg, timeout).await diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 16ea0ca8..3527eda3 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -8,29 +8,16 @@ use futures_util::stream::*; use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; -use serde::Serialize; use tokio::net::{TcpListener, TcpStream}; use tokio_rustls::server::TlsStream; use tokio_rustls::TlsAcceptor; -use crate::data::rmp_to_vec_all_named; +use crate::data::{rmp_to_vec_all_named, debug_serialize}; use crate::error::Error; use crate::proto::Message; use crate::server::Garage; use crate::tls_util; -fn debug_serialize<T: Serialize>(x: T) -> String { - match serde_json::to_string(&x) { - Ok(ss) => { - if ss.len() > 100 { - ss[..100].to_string() - } else { - ss - } - } - Err(e) => format!("<JSON serialization error: {}>", e), - } -} fn err_to_msg(x: Result<Message, Error>) -> Message { match x { @@ -53,12 +40,12 @@ async fn handler( let whole_body = hyper::body::to_bytes(req.into_body()).await?; let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf())?; - eprintln!( - "RPC from {}: {} ({} bytes)", - addr, - debug_serialize(&msg), - whole_body.len() - ); + // eprintln!( + // "RPC from {}: {} ({} bytes)", + // addr, + // debug_serialize(&msg), + // whole_body.len() + // ); let sys = garage.system.clone(); let resp = err_to_msg(match msg { @@ -99,7 +86,7 @@ async fn handler( _ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))), }); - eprintln!("reply to {}: {}", addr, debug_serialize(&resp)); + // eprintln!("reply to {}: {}", addr, debug_serialize(&resp)); Ok(Response::new(Body::from(rmp_to_vec_all_named(&resp)?))) } diff --git a/src/table.rs b/src/table.rs index 99ac77bb..162f98e6 100644 --- a/src/table.rs +++ b/src/table.rs @@ -306,7 +306,20 @@ impl<F: TableSchema + 'static> Table<F> { Ok(resps_vals) } - async fn handle(&self, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> { + pub async fn rpc_call(&self, who: &UUID, rpc: &TableRPC<F>) -> Result<TableRPC<F>, Error> { + let rpc_bytes = rmp_to_vec_all_named(rpc)?; + let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes); + + let resp = rpc_call(self.system.clone(), who, &rpc_msg, self.param.timeout).await?; + if let Message::TableRPC(tbl, rep_by) = &resp { + if *tbl == self.name { + return Ok(rmp_serde::decode::from_read_ref(&rep_by)?); + } + } + Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp))) + } + + async fn handle(self: &Arc<Self>, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> { match msg { TableRPC::ReadEntry(key, sort_key) => { let value = self.handle_read_entry(&key, &sort_key)?; @@ -334,7 +347,7 @@ impl<F: TableSchema + 'static> Table<F> { } } - async fn handle_update(&self, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> { + async fn handle_update(self: &Arc<Self>, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> { for update_bytes in entries.drain(..) { let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?; @@ -360,7 +373,12 @@ impl<F: TableSchema + 'static> Table<F> { Ok((old_entry, new_entry)) })?; - self.instance.updated(old_entry, new_entry).await; + if old_entry.as_ref() != Some(&new_entry) { + self.instance.updated(old_entry, new_entry).await; + + let syncer = self.syncer.read().await.as_ref().unwrap().clone(); + self.system.background.spawn(syncer.invalidate(tree_key)); + } } Ok(()) } diff --git a/src/table_sync.rs b/src/table_sync.rs index 3dd9df33..92aa8c2a 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -1,5 +1,5 @@ use rand::Rng; -use std::collections::{BTreeSet, HashMap, VecDeque}; +use std::collections::{BTreeSet, BTreeMap, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -10,19 +10,21 @@ use futures_util::future::*; use tokio::sync::watch; use tokio::sync::Mutex; use serde::{Serialize, Deserialize}; +use serde_bytes::ByteBuf; use crate::data::*; use crate::error::Error; use crate::membership::Ring; use crate::table::*; +const MAX_DEPTH: usize = 16; const SCAN_INTERVAL: Duration = Duration::from_secs(3600); const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800); pub struct TableSyncer<F: TableSchema> { pub table: Arc<Table<F>>, pub todo: Mutex<SyncTodo>, - pub cache: Vec<Mutex<HashMap<SyncRange, RangeChecksum>>>, + pub cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksum>>>, } pub struct SyncTodo { @@ -43,6 +45,17 @@ pub struct SyncRange { pub level: usize, } +impl std::cmp::PartialOrd for SyncRange { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} +impl std::cmp::Ord for SyncRange { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.begin.cmp(&other.begin) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RangeChecksum { pub bounds: SyncRange, @@ -59,7 +72,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { let syncer = Arc::new(TableSyncer { table: table.clone(), todo: Mutex::new(todo), - cache: (0..32).map(|_| Mutex::new(HashMap::new())).collect::<Vec<_>>(), + cache: (0..MAX_DEPTH).map(|_| Mutex::new(BTreeMap::new())).collect::<Vec<_>>(), }); let s1 = syncer.clone(); @@ -83,12 +96,14 @@ impl<F: TableSchema + 'static> TableSyncer<F> { self: Arc<Self>, mut must_exit: watch::Receiver<bool>, ) -> Result<(), Error> { + tokio::time::delay_for(Duration::from_secs(10)); + self.todo.lock().await.add_full_scan(&self.table); let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse(); let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone(); let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone(); - loop { + while !*must_exit.borrow() { let s_ring_recv = ring_recv.recv().fuse(); let s_must_exit = must_exit.recv().fuse(); pin_mut!(s_ring_recv, s_must_exit); @@ -96,21 +111,24 @@ impl<F: TableSchema + 'static> TableSyncer<F> { select! { _ = next_full_scan => { next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse(); + eprintln!("Adding full scan to syncer todo list"); self.todo.lock().await.add_full_scan(&self.table); } new_ring_r = s_ring_recv => { if let Some(new_ring) = new_ring_r { + eprintln!("Adding ring difference to syncer todo list"); self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring); prev_ring = new_ring; } } must_exit_v = s_must_exit => { if must_exit_v.unwrap_or(false) { - return Ok(()) + break; } } } } + Ok(()) } async fn syncer_task( @@ -131,6 +149,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } async fn sync_partition(self: Arc<Self>, partition: &Partition, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> { + eprintln!("Calculating root checksum for {:?}...", partition); let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit).await?; eprintln!("Root checksum for {:?}: {:?}", partition, root_cks); @@ -152,7 +171,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } async fn root_checksum(self: &Arc<Self>, begin: &Hash, end: &Hash, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> { - for i in 1..32 { + for i in 1..MAX_DEPTH { let rc = self.range_checksum(&SyncRange{ begin: begin.to_vec(), end: end.to_vec(), @@ -262,13 +281,49 @@ impl<F: TableSchema + 'static> TableSyncer<F> { todo.push_back(root_ck); while !todo.is_empty() && !*must_exit.borrow() { + eprintln!("Sync with {:?}: {} remaining", who, todo.len()); + let end = std::cmp::min(16, todo.len()); let step = todo.drain(..end).collect::<Vec<_>>(); - unimplemented!() + + let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::SyncChecksums(step)).await?; + if let TableRPC::<F>::SyncDifferentSet(mut s) = rpc_resp { + let mut items = vec![]; + for differing in s.drain(..) { + if differing.level == 0 { + items.push(differing.begin); + } else { + let checksum = self.range_checksum(&differing, &mut must_exit).await?; + todo.push_back(checksum); + } + } + if items.len() > 0 { + self.table.system.background.spawn(self.clone().send_items(who.clone(), items)); + } + } else { + return Err(Error::Message(format!("Unexpected response to RPC SyncChecksums: {}", debug_serialize(&rpc_resp)))); + } } Ok(()) } + async fn send_items(self: Arc<Self>, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> { + eprintln!("Sending {} items to {:?}", item_list.len(), who); + + let mut values = vec![]; + for item in item_list.iter() { + if let Some(v) = self.table.store.get(&item[..])? { + values.push(Arc::new(ByteBuf::from(v.as_ref()))); + } + } + let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::Update(values)).await?; + if let TableRPC::<F>::Ok = rpc_resp { + Ok(()) + } else { + Err(Error::Message(format!("Unexpected response to RPC Update: {}", debug_serialize(&rpc_resp)))) + } + } + pub async fn handle_checksum_rpc(self: &Arc<Self>, checksums: &[RangeChecksum], mut must_exit: watch::Receiver<bool>) -> Result<Vec<SyncRange>, Error> { let mut ret = vec![]; for ckr in checksums.iter() { @@ -288,6 +343,25 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } Ok(ret) } + + pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> { + for i in 1..MAX_DEPTH { + let needle = SyncRange{ + begin: item_key.to_vec(), + end: vec![], + level: i, + }; + let mut cache = self.cache[i].lock().await; + if let Some(cache_entry) = cache.range(..=needle).rev().next() { + if cache_entry.0.begin <= item_key && cache_entry.0.end > item_key { + let index = cache_entry.0.clone(); + drop(cache_entry); + cache.remove(&index); + } + } + } + Ok(()) + } } impl SyncTodo { |