diff options
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | src/block_ref_table.rs | 2 | ||||
-rw-r--r-- | src/object_table.rs | 4 | ||||
-rw-r--r-- | src/server.rs | 9 | ||||
-rw-r--r-- | src/table.rs | 101 | ||||
-rw-r--r-- | src/version_table.rs | 21 |
6 files changed, 104 insertions, 35 deletions
@@ -16,7 +16,7 @@ futures-core = "0.3" futures-channel = "0.3" futures-util = "0.3" tokio = { version = "0.2", features = ["full"] } -serde = { version = "1.0", features = ["derive"] } +serde = { version = "1.0", features = ["derive", "rc"] } serde_bytes = "0.11" bincode = "1.2.1" err-derive = "0.2.3" diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs index 9ba87f0c..b4bff937 100644 --- a/src/block_ref_table.rs +++ b/src/block_ref_table.rs @@ -44,7 +44,7 @@ impl TableFormat for BlockRefTable { type S = UUID; type E = BlockRef; - async fn updated(&self, old: Option<&Self::E>, new: &Self::E) { + async fn updated(&self, old: Option<Self::E>, new: Self::E) { //unimplemented!() // TODO } diff --git a/src/object_table.rs b/src/object_table.rs index 7add9968..63dabf20 100644 --- a/src/object_table.rs +++ b/src/object_table.rs @@ -97,9 +97,7 @@ impl TableFormat for ObjectTable { type S = String; type E = Object; - async fn updated(&self, old: Option<&Self::E>, new: &Self::E) { - let old = old.cloned(); - let new = new.clone(); + async fn updated(&self, old: Option<Self::E>, new: Self::E) { let garage = self.garage.read().await.as_ref().cloned().unwrap(); garage.clone().background.spawn(async move { // Propagate deletion of old versions diff --git a/src/server.rs b/src/server.rs index 1f926bf0..e1f6dc80 100644 --- a/src/server.rs +++ b/src/server.rs @@ -30,10 +30,10 @@ pub struct Config { #[serde(default = "default_block_size")] pub block_size: usize, - #[serde(default = "default_meta_replication_factor")] + #[serde(default = "default_replication_factor")] pub meta_replication_factor: usize, - #[serde(default = "default_data_replication_factor")] + #[serde(default = "default_replication_factor")] pub data_replication_factor: usize, } @@ -139,10 +139,7 @@ impl Garage { fn default_block_size() -> usize { 1048576 } -fn default_meta_replication_factor() -> usize { - 3 -} -fn default_data_replication_factor() -> usize { +fn default_replication_factor() -> usize { 3 } diff --git a/src/table.rs b/src/table.rs index 36c279e8..9f5eca33 100644 --- a/src/table.rs +++ b/src/table.rs @@ -1,8 +1,12 @@ -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use serde_bytes::ByteBuf; +use futures::stream::*; + use crate::data::*; use crate::error::Error; use crate::membership::System; @@ -52,9 +56,9 @@ pub enum TableRPC<F: TableFormat> { Ok, ReadEntry(F::P, F::S), - ReadEntryResponse(Option<F::E>), + ReadEntryResponse(Option<ByteBuf>), - Update(Vec<F::E>), + Update(Vec<Arc<ByteBuf>>), } pub struct Partition { @@ -116,7 +120,7 @@ pub trait TableFormat: Send + Sync { type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; type E: Entry<Self::P, Self::S>; - async fn updated(&self, old: Option<&Self::E>, new: &Self::E); + async fn updated(&self, old: Option<Self::E>, new: Self::E); } impl<F: TableFormat + 'static> Table<F> { @@ -152,13 +156,63 @@ impl<F: TableFormat + 'static> Table<F> { .walk_ring(&hash, self.param.replication_factor); eprintln!("insert who: {:?}", who); - let rpc = &TableRPC::<F>::Update(vec![e.clone()]); + let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); + let rpc = &TableRPC::<F>::Update(vec![e_enc]); self.rpc_try_call_many(&who[..], &rpc, self.param.write_quorum) .await?; Ok(()) } + pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { + let mut call_list = HashMap::new(); + + for entry in entries.iter() { + let hash = entry.partition_key().hash(); + let who = self + .system + .members + .read() + .await + .walk_ring(&hash, self.param.replication_factor); + let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); + for node in who { + if !call_list.contains_key(&node) { + call_list.insert(node.clone(), vec![]); + } + call_list.get_mut(&node).unwrap().push(e_enc.clone()); + } + } + + let call_futures = call_list.drain() + .map(|(node, entries)| async move { + let rpc = TableRPC::<F>::Update(entries); + let rpc_bytes = rmp_to_vec_all_named(&rpc)?; + let message = Message::TableRPC(self.name.to_string(), rpc_bytes); + + let resp = rpc_call( + self.system.clone(), + &node, + &message, + self.param.timeout + ).await?; + Ok::<_, Error>((node, resp)) + }); + let mut resps = call_futures.collect::<FuturesUnordered<_>>(); + let mut errors = vec![]; + + while let Some(resp) = resps.next().await { + if let Err(e) = resp { + errors.push(e); + } + } + if errors.len() > self.param.replication_factor - self.param.write_quorum { + Err(Error::Message("Too many errors".into())) + } else { + Ok(()) + } + } + pub async fn get(&self, partition_key: &F::P, sort_key: &F::S) -> Result<Option<F::E>, Error> { let hash = partition_key.hash(); let who = self @@ -178,7 +232,8 @@ impl<F: TableFormat + 'static> Table<F> { let mut not_all_same = false; for resp in resps { if let TableRPC::ReadEntryResponse(value) = resp { - if let Some(v) = value { + if let Some(v_bytes) = value { + let v = rmp_serde::decode::from_read_ref::<_, F::E>(v_bytes.as_slice())?; ret = match ret { None => Some(v), Some(mut x) => { @@ -196,19 +251,22 @@ impl<F: TableFormat + 'static> Table<F> { } if let Some(ret_entry) = &ret { if not_all_same { - // Repair on read - let _: Result<_, _> = self - .rpc_try_call_many( - &who[..], - &TableRPC::<F>::Update(vec![ret_entry.clone()]), - who.len(), - ) - .await; + let _: Result<_, _> = self.repair_on_read(&who[..], &ret_entry).await; } } Ok(ret) } + async fn repair_on_read(&self, who: &[UUID], what: &F::E) -> Result<(), Error> { + let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(what)?)); + self.rpc_try_call_many(&who[..], + &TableRPC::<F>::Update(vec![what_enc]), + who.len(), + ) + .await + .map(|_|()) + } + async fn rpc_try_call_many( &self, who: &[UUID], @@ -263,18 +321,19 @@ impl<F: TableFormat + 'static> Table<F> { } } - fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<F::E>, Error> { + fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> { let tree_key = self.tree_key(p, s); if let Some(bytes) = self.store.get(&tree_key)? { - let e = rmp_serde::decode::from_read_ref::<_, F::E>(&bytes)?; - Ok(Some(e)) + Ok(Some(ByteBuf::from(bytes.to_vec()))) } else { Ok(None) } } - async fn handle_update(&self, mut entries: Vec<F::E>) -> Result<(), Error> { - for update in entries.drain(..) { + async fn handle_update(&self, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> { + for update_bytes in entries.drain(..) { + let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?; + let tree_key = self.tree_key(update.partition_key(), update.sort_key()); let (old_entry, new_entry) = self.store.transaction(|db| { @@ -297,7 +356,7 @@ impl<F: TableFormat + 'static> Table<F> { Ok((old_entry, new_entry)) })?; - self.instance.updated(old_entry.as_ref(), &new_entry).await; + self.instance.updated(old_entry, new_entry).await; } Ok(()) } diff --git a/src/version_table.rs b/src/version_table.rs index d037d344..d9d2b675 100644 --- a/src/version_table.rs +++ b/src/version_table.rs @@ -63,8 +63,23 @@ impl TableFormat for VersionTable { type S = EmptySortKey; type E = Version; - async fn updated(&self, old: Option<&Self::E>, new: &Self::E) { - //unimplemented!() - // TODO + async fn updated(&self, old: Option<Self::E>, new: Self::E) { + let garage = self.garage.read().await.as_ref().cloned().unwrap(); + garage.clone().background.spawn(async move { + // Propagate deletion of version blocks + if let Some(old_v) = old { + if new.deleted && !old_v.deleted { + let deleted_block_refs = old_v.blocks.iter() + .map(|vb| BlockRef{ + block: vb.hash.clone(), + version: old_v.uuid.clone(), + deleted: true, + }) + .collect::<Vec<_>>(); + garage.block_ref_table.insert_many(&deleted_block_refs[..]).await?; + } + } + Ok(()) + }); } } |