aboutsummaryrefslogtreecommitdiff
path: root/src/table/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/table.rs')
-rw-r--r--src/table/table.rs265
1 files changed, 51 insertions, 214 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index 0e75754c..a4cb4b24 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -2,40 +2,35 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::Duration;
-use log::warn;
-
-use arc_swap::ArcSwapOption;
use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
-use sled::Transactional;
use garage_util::data::*;
use garage_util::error::Error;
use garage_rpc::membership::System;
-use garage_rpc::ring::Ring;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
use crate::crdt::CRDT;
-use crate::merkle::*;
+use crate::data::*;
use crate::schema::*;
use crate::table_sync::*;
+use crate::replication::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
-pub struct Table<F: TableSchema, R: TableReplication> {
- pub instance: F,
+pub struct TableAux<F: TableSchema, R: TableReplication> {
+ pub system: Arc<System>,
pub replication: R,
-
- pub name: String,
pub(crate) rpc_client: Arc<RpcClient<TableRPC<F>>>,
+}
- pub system: Arc<System>,
- pub store: sled::Tree,
- pub syncer: ArcSwapOption<TableSyncer<F, R>>,
- merkle_updater: Arc<MerkleUpdater>,
+pub struct Table<F: TableSchema, R: TableReplication> {
+ pub data: Arc<TableData<F>>,
+ pub aux: Arc<TableAux<F, R>>,
+ pub syncer: Arc<TableSyncer<F, R>>,
}
#[derive(Serialize, Deserialize)]
@@ -55,23 +50,6 @@ pub(crate) enum TableRPC<F: TableSchema> {
impl<F: TableSchema> RpcMessage for TableRPC<F> {}
-pub trait TableReplication: Send + Sync {
- // See examples in table_sharded.rs and table_fullcopy.rs
- // To understand various replication methods
-
- // Which nodes to send reads from
- fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
- fn read_quorum(&self) -> usize;
-
- // Which nodes to send writes to
- fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
- fn write_quorum(&self, system: &System) -> usize;
- fn max_write_errors(&self) -> usize;
-
- // Which are the nodes that do actually replicate the data
- fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>;
- fn split_points(&self, ring: &Ring) -> Vec<Hash>;
-}
impl<F, R> Table<F, R>
where
@@ -88,60 +66,51 @@ where
name: String,
rpc_server: &mut RpcServer,
) -> Arc<Self> {
- let store = db
- .open_tree(&format!("{}:table", name))
- .expect("Unable to open DB tree");
-
- let merkle_todo_store = db
- .open_tree(&format!("{}:merkle_todo", name))
- .expect("Unable to open DB Merkle TODO tree");
- let merkle_tree_store = db
- .open_tree(&format!("{}:merkle_tree", name))
- .expect("Unable to open DB Merkle tree tree");
-
let rpc_path = format!("table_{}", name);
let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path);
- let merkle_updater = MerkleUpdater::new(
- name.clone(),
+ let data = TableData::new(
+ name,
+ instance,
+ db,
system.background.clone(),
- merkle_todo_store,
- merkle_tree_store,
);
- let table = Arc::new(Self {
- instance,
+ let aux = Arc::new(TableAux{
+ system,
replication,
- name,
rpc_client,
- system,
- store,
- syncer: ArcSwapOption::from(None),
- merkle_updater,
});
- table.clone().register_handler(rpc_server, rpc_path);
- let syncer = TableSyncer::launch(table.clone());
- table.syncer.swap(Some(syncer));
+ let syncer = TableSyncer::launch(
+ data.clone(),
+ aux.clone(),
+ );
- table.merkle_updater.launch();
+ let table = Arc::new(Self {
+ data,
+ aux,
+ syncer,
+ });
+
+ table.clone().register_handler(rpc_server, rpc_path);
table
}
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
- let who = self.replication.write_nodes(&hash, &self.system);
+ let who = self.aux.replication.write_nodes(&hash, &self.aux.system);
//eprintln!("insert who: {:?}", who);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
let rpc = TableRPC::<F>::Update(vec![e_enc]);
- self.rpc_client
+ self.aux.rpc_client
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.replication.write_quorum(&self.system))
+ RequestStrategy::with_quorum(self.aux.replication.write_quorum(&self.aux.system))
.with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
@@ -153,7 +122,7 @@ where
for entry in entries.iter() {
let hash = entry.partition_key().hash();
- let who = self.replication.write_nodes(&hash, &self.system);
+ let who = self.aux.replication.write_nodes(&hash, &self.aux.system);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
for node in who {
if !call_list.contains_key(&node) {
@@ -166,7 +135,7 @@ where
let call_futures = call_list.drain().map(|(node, entries)| async move {
let rpc = TableRPC::<F>::Update(entries);
- let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?;
+ let resp = self.aux.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?;
Ok::<_, Error>((node, resp))
});
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
@@ -177,7 +146,7 @@ where
errors.push(e);
}
}
- if errors.len() > self.replication.max_write_errors() {
+ if errors.len() > self.aux.replication.max_write_errors() {
Err(Error::Message("Too many errors".into()))
} else {
Ok(())
@@ -190,16 +159,17 @@ where
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash();
- let who = self.replication.read_nodes(&hash, &self.system);
+ let who = self.aux.replication.read_nodes(&hash, &self.aux.system);
//eprintln!("get who: {:?}", who);
let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self
+ .aux
.rpc_client
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.replication.read_quorum())
+ RequestStrategy::with_quorum(self.aux.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
@@ -210,7 +180,7 @@ where
for resp in resps {
if let TableRPC::ReadEntryResponse(value) = resp {
if let Some(v_bytes) = value {
- let v = self.decode_entry(v_bytes.as_slice())?;
+ let v = self.data.decode_entry(v_bytes.as_slice())?;
ret = match ret {
None => Some(v),
Some(mut x) => {
@@ -230,7 +200,7 @@ where
if not_all_same {
let self2 = self.clone();
let ent2 = ret_entry.clone();
- self.system
+ self.aux.system
.background
.spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
}
@@ -246,16 +216,16 @@ where
limit: usize,
) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash();
- let who = self.replication.read_nodes(&hash, &self.system);
+ let who = self.aux.replication.read_nodes(&hash, &self.aux.system);
let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
let resps = self
- .rpc_client
+ .aux.rpc_client
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.replication.read_quorum())
+ RequestStrategy::with_quorum(self.aux.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
@@ -266,8 +236,8 @@ where
for resp in resps {
if let TableRPC::Update(entries) = resp {
for entry_bytes in entries.iter() {
- let entry = self.decode_entry(entry_bytes.as_slice())?;
- let entry_key = self.tree_key(entry.partition_key(), entry.sort_key());
+ let entry = self.data.decode_entry(entry_bytes.as_slice())?;
+ let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key());
match ret.remove(&entry_key) {
None => {
ret.insert(entry_key, Some(entry));
@@ -287,7 +257,7 @@ where
}
if !to_repair.is_empty() {
let self2 = self.clone();
- self.system.background.spawn_cancellable(async move {
+ self.aux.system.background.spawn_cancellable(async move {
for (_, v) in to_repair.iter_mut() {
self2.repair_on_read(&who[..], v.take().unwrap()).await?;
}
@@ -306,7 +276,7 @@ where
async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
- self.rpc_client
+ self.aux.rpc_client
.try_call_many(
&who[..],
TableRPC::<F>::Update(vec![what_enc]),
@@ -326,8 +296,8 @@ where
});
let self2 = self.clone();
- self.rpc_client
- .set_local_handler(self.system.id, move |msg| {
+ self.aux.rpc_client
+ .set_local_handler(self.aux.system.id, move |msg| {
let self2 = self2.clone();
async move { self2.handle(&msg).await }
});
@@ -336,157 +306,24 @@ where
async fn handle(self: &Arc<Self>, msg: &TableRPC<F>) -> Result<TableRPC<F>, Error> {
match msg {
TableRPC::ReadEntry(key, sort_key) => {
- let value = self.handle_read_entry(key, sort_key)?;
+ let value = self.data.read_entry(key, sort_key)?;
Ok(TableRPC::ReadEntryResponse(value))
}
TableRPC::ReadRange(key, begin_sort_key, filter, limit) => {
- let values = self.handle_read_range(key, begin_sort_key, filter, *limit)?;
+ let values = self.data.read_range(key, begin_sort_key, filter, *limit)?;
Ok(TableRPC::Update(values))
}
TableRPC::Update(pairs) => {
- self.handle_update(pairs)?;
+ self.data.update_many(pairs)?;
Ok(TableRPC::Ok)
}
TableRPC::SyncRPC(rpc) => {
- let syncer = self.syncer.load_full().unwrap();
- let response = syncer
- .handle_rpc(rpc, self.system.background.stop_signal.clone())
+ let response = self.syncer
+ .handle_rpc(rpc, self.aux.system.background.stop_signal.clone())
.await?;
Ok(TableRPC::SyncRPC(response))
}
_ => Err(Error::BadRPC(format!("Unexpected table RPC"))),
}
}
-
- fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> {
- let tree_key = self.tree_key(p, s);
- if let Some(bytes) = self.store.get(&tree_key)? {
- Ok(Some(ByteBuf::from(bytes.to_vec())))
- } else {
- Ok(None)
- }
- }
-
- fn handle_read_range(
- &self,
- p: &F::P,
- s: &Option<F::S>,
- filter: &Option<F::Filter>,
- limit: usize,
- ) -> Result<Vec<Arc<ByteBuf>>, Error> {
- let partition_hash = p.hash();
- let first_key = match s {
- None => partition_hash.to_vec(),
- Some(sk) => self.tree_key(p, sk),
- };
- let mut ret = vec![];
- for item in self.store.range(first_key..) {
- let (key, value) = item?;
- if &key[..32] != partition_hash.as_slice() {
- break;
- }
- let keep = match filter {
- None => true,
- Some(f) => {
- let entry = self.decode_entry(value.as_ref())?;
- F::matches_filter(&entry, f)
- }
- };
- if keep {
- ret.push(Arc::new(ByteBuf::from(value.as_ref())));
- }
- if ret.len() >= limit {
- break;
- }
- }
- Ok(ret)
- }
-
- // ========== CODE THAT ACTUALLY MODIFIES THE TREE ================
-
- pub fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> {
- for update_bytes in entries.iter() {
- self.update_entry(update_bytes.as_slice())?;
- }
- Ok(())
- }
-
- pub(crate) fn update_entry(self: &Arc<Self>, update_bytes: &[u8]) -> Result<(), Error> {
- let update = self.decode_entry(update_bytes)?;
- let tree_key = self.tree_key(update.partition_key(), update.sort_key());
-
- let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| {
- let (old_entry, new_entry) = match db.get(&tree_key)? {
- Some(prev_bytes) => {
- let old_entry = self
- .decode_entry(&prev_bytes)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- let mut new_entry = old_entry.clone();
- new_entry.merge(&update);
- (Some(old_entry), new_entry)
- }
- None => (None, update.clone()),
- };
-
- if Some(&new_entry) != old_entry.as_ref() {
- let new_bytes = rmp_to_vec_all_named(&new_entry)
- .map_err(Error::RMPEncode)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?;
- db.insert(tree_key.clone(), new_bytes)?;
- Ok(Some((old_entry, new_entry)))
- } else {
- Ok(None)
- }
- })?;
-
- if let Some((old_entry, new_entry)) = changed {
- self.instance.updated(old_entry, Some(new_entry));
- self.syncer.load_full().unwrap().invalidate(&tree_key[..]);
- }
-
- Ok(())
- }
-
- pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
- let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| {
- if let Some(cur_v) = txn.get(k)? {
- if cur_v == v {
- txn.remove(k)?;
- mkl_todo.insert(k, vec![])?;
- return Ok(true);
- }
- }
- Ok(false)
- })?;
-
- if removed {
- let old_entry = self.decode_entry(v)?;
- self.instance.updated(Some(old_entry), None);
- self.syncer.load_full().unwrap().invalidate(k);
- }
- Ok(removed)
- }
-
- fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
- let mut ret = p.hash().to_vec();
- ret.extend(s.sort_key());
- ret
- }
-
- fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
- match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
- Ok(x) => Ok(x),
- Err(e) => match F::try_migrate(bytes) {
- Some(x) => Ok(x),
- None => {
- warn!("Unable to decode entry of {}: {}", self.name, e);
- for line in hexdump::hexdump_iter(bytes) {
- debug!("{}", line);
- }
- Err(e.into())
- }
- },
- }
- }
}