diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/error.rs | 6 | ||||
-rw-r--r-- | src/rpc_server.rs | 1 | ||||
-rw-r--r-- | src/server.rs | 9 | ||||
-rw-r--r-- | src/table.rs | 122 | ||||
-rw-r--r-- | src/version_table.rs | 7 |
5 files changed, 124 insertions, 21 deletions
diff --git a/src/error.rs b/src/error.rs index 30f7dac6..0cfafca3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -15,6 +15,9 @@ pub enum Error { #[error(display = "Invalid HTTP header value: {}", _0)] HTTPHeader(#[error(source)] http::header::ToStrError), + #[error(display = "Sled error: {}", _0)] + Sled(#[error(source)] sled::Error), + #[error(display = "Messagepack encode error: {}", _0)] RMPEncode(#[error(source)] rmp_serde::encode::Error), #[error(display = "Messagepack decode error: {}", _0)] @@ -32,6 +35,9 @@ pub enum Error { #[error(display = "{}", _0)] BadRequest(String), + #[error(display = "Entry not found")] + NotFound, + #[error(display = "{}", _0)] Message(String), } diff --git a/src/rpc_server.rs b/src/rpc_server.rs index eda300c4..0ac26141 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -1,7 +1,6 @@ use std::net::SocketAddr; use std::sync::Arc; -use futures_util::future::FutureExt; use bytes::IntoBuf; use hyper::service::{make_service_fn, service_fn}; use hyper::server::conn::AddrStream; diff --git a/src/server.rs b/src/server.rs index 31f1cc28..1f1ac2af 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,6 +5,7 @@ use std::net::SocketAddr; use std::path::PathBuf; use futures::channel::oneshot; use serde::Deserialize; +use tokio::sync::RwLock; use crate::data::*; use crate::proto::*; @@ -24,7 +25,7 @@ pub struct Garage { } impl Garage { - pub fn new(config: Config, id: UUID, db: sled::Db) -> Self { + pub async fn new(config: Config, id: UUID, db: sled::Db) -> Arc<Self> { let system = Arc::new(System::new(config, id)); let meta_rep_param = TableReplicationParams{ @@ -35,6 +36,7 @@ impl Garage { }; let version_table = Arc::new(Table::new( + VersionTable{garage: RwLock::new(None)}, system.clone(), &db, "version".to_string(), @@ -49,6 +51,9 @@ impl Garage { garage.table_rpc_handlers.insert( garage.version_table.name.clone(), garage.version_table.clone().rpc_handler()); + + let garage = Arc::new(garage); + *garage.version_table.instance.garage.write().await = Some(garage.clone()); garage } } @@ -139,7 +144,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { .expect("Unable to read or generate node ID"); println!("Node ID: {}", hex::encode(&id)); - let garage = Arc::new(Garage::new(config, id, db)); + let garage = Garage::new(config, id, db).await; let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); diff --git a/src/table.rs b/src/table.rs index 5c8e93a5..f45f48c2 100644 --- a/src/table.rs +++ b/src/table.rs @@ -1,8 +1,8 @@ -use std::marker::PhantomData; use std::time::Duration; use std::sync::Arc; use serde::{Serialize, Deserialize}; use async_trait::async_trait; +use reduce::Reduce; use crate::error::Error; use crate::proto::*; @@ -12,7 +12,7 @@ use crate::rpc_client::*; pub struct Table<F: TableFormat> { - phantom: PhantomData<F>, + pub instance: F, pub name: String, @@ -49,9 +49,11 @@ impl<F: TableFormat + 'static> TableRpcHandler for TableRpcHandlerAdapter<F> { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Serialize, Deserialize)] pub enum TableRPC<F: TableFormat> { - Update(F::K, F::V), + Ok, + Read(Vec<F::K>), + Update(Vec<(F::K, F::V)>), } pub struct Partition { @@ -70,18 +72,18 @@ pub trait ValueMerge { #[async_trait] pub trait TableFormat: Send + Sync { - type K: Clone + Serialize + for<'de> Deserialize<'de> + KeyHash + Send + Sync; + type K: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + KeyHash + Send + Sync; type V: Clone + Serialize + for<'de> Deserialize<'de> + ValueMerge + Send + Sync; async fn updated(&self, key: &Self::K, old: Option<&Self::V>, new: &Self::V); } impl<F: TableFormat + 'static> Table<F> { - pub fn new(system: Arc<System>, db: &sled::Db, name: String, param: TableReplicationParams) -> Self { + pub fn new(instance: F, system: Arc<System>, db: &sled::Db, name: String, param: TableReplicationParams) -> Self { let store = db.open_tree(&name) .expect("Unable to open DB tree"); Self{ - phantom: PhantomData::default(), + instance, name, system, store, @@ -95,22 +97,112 @@ impl<F: TableFormat + 'static> Table<F> { } pub async fn insert(&self, k: &F::K, v: &F::V) -> Result<(), Error> { - unimplemented!(); + let hash = k.hash(); + let who = self.system.members.read().await + .walk_ring(&hash, self.param.replication_factor); + + let rpc = &TableRPC::<F>::Update(vec![(k.clone(), v.clone())]); + + self.rpc_try_call_many(&who[..], + &rpc, + self.param.write_quorum).await?; + Ok(()) + } + pub async fn get(&self, k: &F::K) -> Result<F::V, Error> { let hash = k.hash(); let who = self.system.members.read().await .walk_ring(&hash, self.param.replication_factor); - let msg = rmp_serde::encode::to_vec_named(&TableRPC::<F>::Update(k.clone(), v.clone()))?; - rpc_try_call_many(self.system.clone(), - &who[..], - &Message::TableRPC(self.name.to_string(), msg), - self.param.write_quorum, + let rpc = &TableRPC::<F>::Read(vec![k.clone()]); + let resps = self.rpc_try_call_many(&who[..], + &rpc, + self.param.read_quorum) + .await?; + + let mut values = vec![]; + for resp in resps { + if let TableRPC::Update(mut pairs) = resp { + if pairs.len() == 0 { + continue; + } else if pairs.len() == 1 && pairs[0].0 == *k { + values.push(pairs.drain(..).next().unwrap().1); + continue; + } + } + return Err(Error::Message(format!("Invalid return value to read"))); + } + values.drain(..) + .reduce(|mut x, y| { x.merge(&y); x }) + .map(Ok) + .unwrap_or(Err(Error::NotFound)) + } + + async fn rpc_try_call_many(&self, who: &[UUID], rpc: &TableRPC<F>, quorum: usize) -> Result<Vec<TableRPC<F>>, Error> { + let rpc_bytes = rmp_serde::encode::to_vec_named(rpc)?; + let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes); + + let resps = rpc_try_call_many(self.system.clone(), + who, + &rpc_msg, + quorum, self.param.timeout).await?; - Ok(()) + + let mut resps_vals = vec![]; + for resp in resps { + if let Message::TableRPC(tbl, rep_by) = &resp { + if *tbl == self.name { + resps_vals.push(rmp_serde::decode::from_read_ref(&rep_by)?); + continue; + } + } + return Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp))) + } + Ok(resps_vals) } async fn handle(&self, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> { - unimplemented!() + match msg { + TableRPC::Read(keys) => { + Ok(TableRPC::Update(self.handle_read(&keys)?)) + } + TableRPC::Update(pairs) => { + self.handle_write(pairs).await?; + Ok(TableRPC::Ok) + } + _ => Err(Error::RPCError(format!("Unexpected table RPC"))) + } + } + + fn handle_read(&self, keys: &[F::K]) -> Result<Vec<(F::K, F::V)>, Error> { + let mut results = vec![]; + for key in keys.iter() { + if let Some(bytes) = self.store.get(&key.hash())? { + let pair = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(bytes.as_ref())?; + results.push(pair); + } + } + Ok(results) + } + + async fn handle_write(&self, mut pairs: Vec<(F::K, F::V)>) -> Result<(), Error> { + for mut pair in pairs.drain(..) { + let hash = pair.0.hash(); + + let old_val = match self.store.get(&hash)? { + Some(prev_bytes) => { + let (_, old_val) = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(&prev_bytes)?; + pair.1.merge(&old_val); + Some(old_val) + } + None => None + }; + + let new_bytes = rmp_serde::encode::to_vec_named(&pair)?; + self.store.insert(&hash, new_bytes)?; + + self.instance.updated(&pair.0, old_val.as_ref(), &pair.1).await; + } + Ok(()) } } diff --git a/src/version_table.rs b/src/version_table.rs index d857ac12..86086421 100644 --- a/src/version_table.rs +++ b/src/version_table.rs @@ -1,13 +1,14 @@ use std::sync::Arc; use serde::{Serialize, Deserialize}; use async_trait::async_trait; +use tokio::sync::RwLock; use crate::data::*; use crate::table::*; -use crate::membership::System; +use crate::server::Garage; -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct VersionMetaKey { pub bucket: String, pub key: String, @@ -33,7 +34,7 @@ pub enum VersionData { } pub struct VersionTable { - system: Arc<System>, + pub garage: RwLock<Option<Arc<Garage>>>, } impl KeyHash for VersionMetaKey { |