aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/model')
-rw-r--r--src/model/Cargo.toml5
-rw-r--r--src/model/garage.rs97
-rw-r--r--src/model/helper/bucket.rs3
-rw-r--r--src/model/index_counter.rs305
-rw-r--r--src/model/k2v/causality.rs96
-rw-r--r--src/model/k2v/counter_table.rs20
-rw-r--r--src/model/k2v/item_table.rs291
-rw-r--r--src/model/k2v/mod.rs7
-rw-r--r--src/model/k2v/poll.rs50
-rw-r--r--src/model/k2v/rpc.rs343
-rw-r--r--src/model/lib.rs9
-rw-r--r--src/model/s3/block_ref_table.rs (renamed from src/model/block_ref_table.rs)8
-rw-r--r--src/model/s3/mod.rs3
-rw-r--r--src/model/s3/object_table.rs (renamed from src/model/object_table.rs)7
-rw-r--r--src/model/s3/version_table.rs (renamed from src/model/version_table.rs)7
15 files changed, 1219 insertions, 32 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 007cec89..133fe44e 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -22,8 +22,10 @@ garage_model_050 = { package = "garage_model", version = "0.5.1" }
async-trait = "0.1.7"
arc-swap = "1.0"
+blake2 = "0.9"
err-derive = "0.3"
hex = "0.4"
+base64 = "0.13"
tracing = "0.1.30"
rand = "0.8"
zstd = { version = "0.9", default-features = false }
@@ -42,3 +44,6 @@ opentelemetry = "0.17"
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
#netapp = { version = "0.4", path = "../../../netapp" }
netapp = "0.4"
+
+[features]
+k2v = [ "garage_util/k2v" ]
diff --git a/src/model/garage.rs b/src/model/garage.rs
index abdb920a..03e21f8a 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -13,13 +13,19 @@ use garage_table::replication::TableFullReplication;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
-use crate::block_ref_table::*;
+use crate::s3::block_ref_table::*;
+use crate::s3::object_table::*;
+use crate::s3::version_table::*;
+
use crate::bucket_alias_table::*;
use crate::bucket_table::*;
use crate::helper;
use crate::key_table::*;
-use crate::object_table::*;
-use crate::version_table::*;
+
+#[cfg(feature = "k2v")]
+use crate::index_counter::*;
+#[cfg(feature = "k2v")]
+use crate::k2v::{counter_table::*, item_table::*, poll::*, rpc::*};
/// An entire Garage full of data
pub struct Garage {
@@ -35,16 +41,32 @@ pub struct Garage {
/// The block manager
pub block_manager: Arc<BlockManager>,
- /// Table containing informations about buckets
+ /// Table containing buckets
pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>,
- /// Table containing informations about bucket aliases
+ /// Table containing bucket aliases
pub bucket_alias_table: Arc<Table<BucketAliasTable, TableFullReplication>>,
- /// Table containing informations about api keys
+ /// Table containing api keys
pub key_table: Arc<Table<KeyTable, TableFullReplication>>,
+ /// Table containing S3 objects
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
+ /// Table containing S3 object versions
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
+ /// Table containing S3 block references (not blocks themselves)
pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
+
+ #[cfg(feature = "k2v")]
+ pub k2v: GarageK2V,
+}
+
+#[cfg(feature = "k2v")]
+pub struct GarageK2V {
+ /// Table containing K2V items
+ pub item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
+ /// Indexing table containing K2V item counters
+ pub counter_table: Arc<IndexCounter<K2VCounterTable>>,
+ /// K2V RPC handler
+ pub rpc: Arc<K2VRpcHandler>,
}
impl Garage {
@@ -95,6 +117,21 @@ impl Garage {
system.clone(),
);
+ // ---- admin tables ----
+ info!("Initialize bucket_table...");
+ let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db);
+
+ info!("Initialize bucket_alias_table...");
+ let bucket_alias_table = Table::new(
+ BucketAliasTable,
+ control_rep_param.clone(),
+ system.clone(),
+ &db,
+ );
+ info!("Initialize key_table_table...");
+ let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db);
+
+ // ---- S3 tables ----
info!("Initialize block_ref_table...");
let block_ref_table = Table::new(
BlockRefTable {
@@ -117,29 +154,20 @@ impl Garage {
);
info!("Initialize object_table...");
+ #[allow(clippy::redundant_clone)]
let object_table = Table::new(
ObjectTable {
background: background.clone(),
version_table: version_table.clone(),
},
- meta_rep_param,
- system.clone(),
- &db,
- );
-
- info!("Initialize bucket_table...");
- let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db);
-
- info!("Initialize bucket_alias_table...");
- let bucket_alias_table = Table::new(
- BucketAliasTable,
- control_rep_param.clone(),
+ meta_rep_param.clone(),
system.clone(),
&db,
);
- info!("Initialize key_table_table...");
- let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db);
+ // ---- K2V ----
+ #[cfg(feature = "k2v")]
+ let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
info!("Initialize Garage...");
@@ -155,6 +183,8 @@ impl Garage {
object_table,
version_table,
block_ref_table,
+ #[cfg(feature = "k2v")]
+ k2v,
})
}
@@ -162,3 +192,30 @@ impl Garage {
helper::bucket::BucketHelper(self)
}
}
+
+#[cfg(feature = "k2v")]
+impl GarageK2V {
+ fn new(system: Arc<System>, db: &sled::Db, meta_rep_param: TableShardedReplication) -> Self {
+ info!("Initialize K2V counter table...");
+ let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
+ info!("Initialize K2V subscription manager...");
+ let subscriptions = Arc::new(SubscriptionManager::new());
+ info!("Initialize K2V item table...");
+ let item_table = Table::new(
+ K2VItemTable {
+ counter_table: counter_table.clone(),
+ subscriptions: subscriptions.clone(),
+ },
+ meta_rep_param,
+ system.clone(),
+ db,
+ );
+ let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions);
+
+ Self {
+ item_table,
+ counter_table,
+ rpc,
+ }
+ }
+}
diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs
index 706faf26..54d2f97b 100644
--- a/src/model/helper/bucket.rs
+++ b/src/model/helper/bucket.rs
@@ -1,4 +1,4 @@
-use garage_table::util::EmptyKey;
+use garage_table::util::*;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::{Error as GarageError, OkOrMessage};
@@ -116,6 +116,7 @@ impl<'a> BucketHelper<'a> {
None,
Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())),
10,
+ EnumerationOrder::Forward,
)
.await?
.into_iter()
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
new file mode 100644
index 00000000..123154d4
--- /dev/null
+++ b/src/model/index_counter.rs
@@ -0,0 +1,305 @@
+use std::collections::{hash_map, BTreeMap, HashMap};
+use std::marker::PhantomData;
+use std::sync::Arc;
+use std::time::Duration;
+
+use serde::{Deserialize, Serialize};
+use tokio::sync::{mpsc, watch};
+
+use garage_rpc::ring::Ring;
+use garage_rpc::system::System;
+use garage_util::data::*;
+use garage_util::error::*;
+
+use garage_table::crdt::*;
+use garage_table::replication::TableShardedReplication;
+use garage_table::*;
+
+pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static {
+ const NAME: &'static str;
+ type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ type S: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+}
+
+/// A counter entry in the global table
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct CounterEntry<T: CounterSchema> {
+ pub pk: T::P,
+ pub sk: T::S,
+ pub values: BTreeMap<String, CounterValue>,
+}
+
+impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> {
+ fn partition_key(&self) -> &T::P {
+ &self.pk
+ }
+ fn sort_key(&self) -> &T::S {
+ &self.sk
+ }
+ fn is_tombstone(&self) -> bool {
+ self.values
+ .iter()
+ .all(|(_, v)| v.node_values.iter().all(|(_, (_, v))| *v == 0))
+ }
+}
+
+impl<T: CounterSchema> CounterEntry<T> {
+ pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> {
+ let nodes = &ring.layout.node_id_vec[..];
+ self.filtered_values_with_nodes(nodes)
+ }
+
+ pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap<String, i64> {
+ let mut ret = HashMap::new();
+ for (name, vals) in self.values.iter() {
+ let new_vals = vals
+ .node_values
+ .iter()
+ .filter(|(n, _)| nodes.contains(n))
+ .map(|(_, (_, v))| *v)
+ .collect::<Vec<_>>();
+ if !new_vals.is_empty() {
+ ret.insert(
+ name.clone(),
+ new_vals.iter().fold(i64::MIN, |a, b| std::cmp::max(a, *b)),
+ );
+ }
+ }
+
+ ret
+ }
+}
+
+/// A counter entry in the global table
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct CounterValue {
+ pub node_values: BTreeMap<Uuid, (u64, i64)>,
+}
+
+impl<T: CounterSchema> Crdt for CounterEntry<T> {
+ fn merge(&mut self, other: &Self) {
+ for (name, e2) in other.values.iter() {
+ if let Some(e) = self.values.get_mut(name) {
+ e.merge(e2);
+ } else {
+ self.values.insert(name.clone(), e2.clone());
+ }
+ }
+ }
+}
+
+impl Crdt for CounterValue {
+ fn merge(&mut self, other: &Self) {
+ for (node, (t2, e2)) in other.node_values.iter() {
+ if let Some((t, e)) = self.node_values.get_mut(node) {
+ if t2 > t {
+ *e = *e2;
+ }
+ } else {
+ self.node_values.insert(*node, (*t2, *e2));
+ }
+ }
+ }
+}
+
+pub struct CounterTable<T: CounterSchema> {
+ _phantom_t: PhantomData<T>,
+}
+
+impl<T: CounterSchema> TableSchema for CounterTable<T> {
+ const TABLE_NAME: &'static str = T::NAME;
+
+ type P = T::P;
+ type S = T::S;
+ type E = CounterEntry<T>;
+ type Filter = (DeletedFilter, Vec<Uuid>);
+
+ fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) {
+ // nothing for now
+ }
+
+ fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
+ if filter.0 == DeletedFilter::Any {
+ return true;
+ }
+
+ let is_tombstone = entry
+ .filtered_values_with_nodes(&filter.1[..])
+ .iter()
+ .all(|(_, v)| *v == 0);
+ filter.0.apply(is_tombstone)
+ }
+}
+
+// ----
+
+pub struct IndexCounter<T: CounterSchema> {
+ this_node: Uuid,
+ local_counter: sled::Tree,
+ propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>,
+ pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
+}
+
+impl<T: CounterSchema> IndexCounter<T> {
+ pub fn new(
+ system: Arc<System>,
+ replication: TableShardedReplication,
+ db: &sled::Db,
+ ) -> Arc<Self> {
+ let background = system.background.clone();
+
+ let (propagate_tx, propagate_rx) = mpsc::unbounded_channel();
+
+ let this = Arc::new(Self {
+ this_node: system.id,
+ local_counter: db
+ .open_tree(format!("local_counter:{}", T::NAME))
+ .expect("Unable to open local counter tree"),
+ propagate_tx,
+ table: Table::new(
+ CounterTable {
+ _phantom_t: Default::default(),
+ },
+ replication,
+ system,
+ db,
+ ),
+ });
+
+ let this2 = this.clone();
+ background.spawn_worker(
+ format!("{} index counter propagator", T::NAME),
+ move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit),
+ );
+ this
+ }
+
+ pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> {
+ let tree_key = self.table.data.tree_key(pk, sk);
+
+ let new_entry = self.local_counter.transaction(|tx| {
+ let mut entry = match tx.get(&tree_key[..])? {
+ Some(old_bytes) => {
+ rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes)
+ .map_err(Error::RmpDecode)
+ .map_err(sled::transaction::ConflictableTransactionError::Abort)?
+ }
+ None => LocalCounterEntry {
+ values: BTreeMap::new(),
+ },
+ };
+
+ for (s, inc) in counts.iter() {
+ let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
+ ent.0 += 1;
+ ent.1 += *inc;
+ }
+
+ let new_entry_bytes = rmp_to_vec_all_named(&entry)
+ .map_err(Error::RmpEncode)
+ .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ tx.insert(&tree_key[..], new_entry_bytes)?;
+
+ Ok(entry)
+ })?;
+
+ if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), new_entry)) {
+ error!(
+ "Could not propagate updated counter values, failed to send to channel: {}",
+ e
+ );
+ }
+
+ Ok(())
+ }
+
+ async fn propagate_loop(
+ self: Arc<Self>,
+ mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>,
+ must_exit: watch::Receiver<bool>,
+ ) {
+ // This loop batches updates to counters to be sent all at once.
+ // They are sent once the propagate_rx channel has been emptied (or is closed).
+ let mut buf = HashMap::new();
+ let mut errors = 0;
+
+ loop {
+ let (ent, closed) = match propagate_rx.try_recv() {
+ Ok(ent) => (Some(ent), false),
+ Err(mpsc::error::TryRecvError::Empty) if buf.is_empty() => {
+ match propagate_rx.recv().await {
+ Some(ent) => (Some(ent), false),
+ None => (None, true),
+ }
+ }
+ Err(mpsc::error::TryRecvError::Empty) => (None, false),
+ Err(mpsc::error::TryRecvError::Disconnected) => (None, true),
+ };
+
+ if let Some((pk, sk, counters)) = ent {
+ let tree_key = self.table.data.tree_key(&pk, &sk);
+ let dist_entry = counters.into_counter_entry::<T>(self.this_node, pk, sk);
+ match buf.entry(tree_key) {
+ hash_map::Entry::Vacant(e) => {
+ e.insert(dist_entry);
+ }
+ hash_map::Entry::Occupied(mut e) => {
+ e.get_mut().merge(&dist_entry);
+ }
+ }
+ // As long as we can add entries, loop back and add them to batch
+ // before sending batch to other nodes
+ continue;
+ }
+
+ if !buf.is_empty() {
+ let entries = buf.iter().map(|(_k, v)| v);
+ if let Err(e) = self.table.insert_many(entries).await {
+ errors += 1;
+ if errors >= 2 && *must_exit.borrow() {
+ error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e);
+ break;
+ }
+ warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors);
+ tokio::time::sleep(Duration::from_secs(5)).await;
+ continue;
+ }
+
+ buf.clear();
+ errors = 0;
+ }
+
+ if closed || *must_exit.borrow() {
+ break;
+ }
+ }
+ }
+}
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+struct LocalCounterEntry {
+ values: BTreeMap<String, (u64, i64)>,
+}
+
+impl LocalCounterEntry {
+ fn into_counter_entry<T: CounterSchema>(
+ self,
+ this_node: Uuid,
+ pk: T::P,
+ sk: T::S,
+ ) -> CounterEntry<T> {
+ CounterEntry {
+ pk,
+ sk,
+ values: self
+ .values
+ .into_iter()
+ .map(|(name, (ts, v))| {
+ let mut node_values = BTreeMap::new();
+ node_values.insert(this_node, (ts, v));
+ (name, CounterValue { node_values })
+ })
+ .collect(),
+ }
+ }
+}
diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs
new file mode 100644
index 00000000..8c76a32b
--- /dev/null
+++ b/src/model/k2v/causality.rs
@@ -0,0 +1,96 @@
+use std::collections::BTreeMap;
+use std::convert::TryInto;
+
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::*;
+
+/// Node IDs used in K2V are u64 integers that are the abbreviation
+/// of full Garage node IDs which are 256-bit UUIDs.
+pub type K2VNodeId = u64;
+
+pub fn make_node_id(node_id: Uuid) -> K2VNodeId {
+ let mut tmp = [0u8; 8];
+ tmp.copy_from_slice(&node_id.as_slice()[..8]);
+ u64::from_be_bytes(tmp)
+}
+
+#[derive(PartialEq, Debug, Serialize, Deserialize)]
+pub struct CausalContext {
+ pub vector_clock: BTreeMap<K2VNodeId, u64>,
+}
+
+impl CausalContext {
+ /// Empty causality context
+ pub fn new_empty() -> Self {
+ Self {
+ vector_clock: BTreeMap::new(),
+ }
+ }
+ /// Make binary representation and encode in base64
+ pub fn serialize(&self) -> String {
+ let mut ints = Vec::with_capacity(2 * self.vector_clock.len());
+ for (node, time) in self.vector_clock.iter() {
+ ints.push(*node);
+ ints.push(*time);
+ }
+ let checksum = ints.iter().fold(0, |acc, v| acc ^ *v);
+
+ let mut bytes = u64::to_be_bytes(checksum).to_vec();
+ for i in ints {
+ bytes.extend(u64::to_be_bytes(i));
+ }
+
+ base64::encode_config(bytes, base64::URL_SAFE_NO_PAD)
+ }
+ /// Parse from base64-encoded binary representation
+ pub fn parse(s: &str) -> Result<Self, String> {
+ let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD)
+ .map_err(|e| format!("bad causality token base64: {}", e))?;
+ if bytes.len() % 16 != 8 || bytes.len() < 8 {
+ return Err("bad causality token length".into());
+ }
+
+ let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap());
+ let mut ret = CausalContext {
+ vector_clock: BTreeMap::new(),
+ };
+
+ for i in 0..(bytes.len() / 16) {
+ let node_id = u64::from_be_bytes(bytes[8 + i * 16..16 + i * 16].try_into().unwrap());
+ let time = u64::from_be_bytes(bytes[16 + i * 16..24 + i * 16].try_into().unwrap());
+ ret.vector_clock.insert(node_id, time);
+ }
+
+ let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t);
+
+ if check != checksum {
+ return Err("bad causality token checksum".into());
+ }
+
+ Ok(ret)
+ }
+ /// Check if this causal context contains newer items than another one
+ pub fn is_newer_than(&self, other: &Self) -> bool {
+ self.vector_clock
+ .iter()
+ .any(|(k, v)| v > other.vector_clock.get(k).unwrap_or(&0))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_causality_token_serialization() {
+ let ct = CausalContext {
+ vector_clock: [(4, 42), (1928131023, 76), (0xefc0c1c47f9de433, 2)]
+ .iter()
+ .cloned()
+ .collect(),
+ };
+
+ assert_eq!(CausalContext::parse(&ct.serialize()).unwrap(), ct);
+ }
+}
diff --git a/src/model/k2v/counter_table.rs b/src/model/k2v/counter_table.rs
new file mode 100644
index 00000000..4856eb2b
--- /dev/null
+++ b/src/model/k2v/counter_table.rs
@@ -0,0 +1,20 @@
+use garage_util::data::*;
+
+use crate::index_counter::*;
+
+pub const ENTRIES: &str = "entries";
+pub const CONFLICTS: &str = "conflicts";
+pub const VALUES: &str = "values";
+pub const BYTES: &str = "bytes";
+
+#[derive(PartialEq, Clone)]
+pub struct K2VCounterTable;
+
+impl CounterSchema for K2VCounterTable {
+ const NAME: &'static str = "k2v_index_counter";
+
+ // Partition key = bucket id
+ type P = Uuid;
+ // Sort key = K2V item's partition key
+ type S = String;
+}
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
new file mode 100644
index 00000000..8b7cc08a
--- /dev/null
+++ b/src/model/k2v/item_table.rs
@@ -0,0 +1,291 @@
+use serde::{Deserialize, Serialize};
+use std::collections::BTreeMap;
+use std::sync::Arc;
+
+use garage_util::data::*;
+
+use garage_table::crdt::*;
+use garage_table::*;
+
+use crate::index_counter::*;
+use crate::k2v::causality::*;
+use crate::k2v::counter_table::*;
+use crate::k2v::poll::*;
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct K2VItem {
+ pub partition: K2VItemPartition,
+ pub sort_key: String,
+
+ items: BTreeMap<K2VNodeId, DvvsEntry>,
+}
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, Hash, Eq)]
+pub struct K2VItemPartition {
+ pub bucket_id: Uuid,
+ pub partition_key: String,
+}
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+struct DvvsEntry {
+ t_discard: u64,
+ values: Vec<(u64, DvvsValue)>,
+}
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub enum DvvsValue {
+ Value(#[serde(with = "serde_bytes")] Vec<u8>),
+ Deleted,
+}
+
+impl K2VItem {
+ /// Creates a new K2VItem when no previous entry existed in the db
+ pub fn new(bucket_id: Uuid, partition_key: String, sort_key: String) -> Self {
+ Self {
+ partition: K2VItemPartition {
+ bucket_id,
+ partition_key,
+ },
+ sort_key,
+ items: BTreeMap::new(),
+ }
+ }
+ /// Updates a K2VItem with a new value or a deletion event
+ pub fn update(
+ &mut self,
+ this_node: Uuid,
+ context: &Option<CausalContext>,
+ new_value: DvvsValue,
+ ) {
+ if let Some(context) = context {
+ for (node, t_discard) in context.vector_clock.iter() {
+ if let Some(e) = self.items.get_mut(node) {
+ e.t_discard = std::cmp::max(e.t_discard, *t_discard);
+ } else {
+ self.items.insert(
+ *node,
+ DvvsEntry {
+ t_discard: *t_discard,
+ values: vec![],
+ },
+ );
+ }
+ }
+ }
+
+ self.discard();
+
+ let node_id = make_node_id(this_node);
+ let e = self.items.entry(node_id).or_insert(DvvsEntry {
+ t_discard: 0,
+ values: vec![],
+ });
+ let t_prev = e.max_time();
+ e.values.push((t_prev + 1, new_value));
+ }
+
+ /// Extract the causality context of a K2V Item
+ pub fn causal_context(&self) -> CausalContext {
+ let mut cc = CausalContext::new_empty();
+ for (node, ent) in self.items.iter() {
+ cc.vector_clock.insert(*node, ent.max_time());
+ }
+ cc
+ }
+
+ /// Extract the list of values
+ pub fn values(&'_ self) -> Vec<&'_ DvvsValue> {
+ let mut ret = vec![];
+ for (_, ent) in self.items.iter() {
+ for (_, v) in ent.values.iter() {
+ if !ret.contains(&v) {
+ ret.push(v);
+ }
+ }
+ }
+ ret
+ }
+
+ fn discard(&mut self) {
+ for (_, ent) in self.items.iter_mut() {
+ ent.discard();
+ }
+ }
+
+ // returns counters: (non-deleted entries, conflict entries, non-tombstone values, bytes used)
+ fn stats(&self) -> (i64, i64, i64, i64) {
+ let values = self.values();
+
+ let n_entries = if self.is_tombstone() { 0 } else { 1 };
+ let n_conflicts = if values.len() > 1 { 1 } else { 0 };
+ let n_values = values
+ .iter()
+ .filter(|v| matches!(v, DvvsValue::Value(_)))
+ .count() as i64;
+ let n_bytes = values
+ .iter()
+ .map(|v| match v {
+ DvvsValue::Deleted => 0,
+ DvvsValue::Value(v) => v.len() as i64,
+ })
+ .sum();
+
+ (n_entries, n_conflicts, n_values, n_bytes)
+ }
+}
+
+impl DvvsEntry {
+ fn max_time(&self) -> u64 {
+ self.values
+ .iter()
+ .fold(self.t_discard, |acc, (vts, _)| std::cmp::max(acc, *vts))
+ }
+
+ fn discard(&mut self) {
+ self.values = std::mem::take(&mut self.values)
+ .into_iter()
+ .filter(|(t, _)| *t > self.t_discard)
+ .collect::<Vec<_>>();
+ }
+}
+
+impl Crdt for K2VItem {
+ fn merge(&mut self, other: &Self) {
+ for (node, e2) in other.items.iter() {
+ if let Some(e) = self.items.get_mut(node) {
+ e.merge(e2);
+ } else {
+ self.items.insert(*node, e2.clone());
+ }
+ }
+ }
+}
+
+impl Crdt for DvvsEntry {
+ fn merge(&mut self, other: &Self) {
+ self.t_discard = std::cmp::max(self.t_discard, other.t_discard);
+ self.discard();
+
+ let t_max = self.max_time();
+ for (vt, vv) in other.values.iter() {
+ if *vt > t_max {
+ self.values.push((*vt, vv.clone()));
+ }
+ }
+ }
+}
+
+impl PartitionKey for K2VItemPartition {
+ fn hash(&self) -> Hash {
+ use blake2::{Blake2b, Digest};
+
+ let mut hasher = Blake2b::new();
+ hasher.update(self.bucket_id.as_slice());
+ hasher.update(self.partition_key.as_bytes());
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&hasher.finalize()[..32]);
+ hash.into()
+ }
+}
+
+impl Entry<K2VItemPartition, String> for K2VItem {
+ fn partition_key(&self) -> &K2VItemPartition {
+ &self.partition
+ }
+ fn sort_key(&self) -> &String {
+ &self.sort_key
+ }
+ fn is_tombstone(&self) -> bool {
+ self.values()
+ .iter()
+ .all(|v| matches!(v, DvvsValue::Deleted))
+ }
+}
+
+pub struct K2VItemTable {
+ pub(crate) counter_table: Arc<IndexCounter<K2VCounterTable>>,
+ pub(crate) subscriptions: Arc<SubscriptionManager>,
+}
+
+#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
+pub struct ItemFilter {
+ pub exclude_only_tombstones: bool,
+ pub conflicts_only: bool,
+}
+
+impl TableSchema for K2VItemTable {
+ const TABLE_NAME: &'static str = "k2v_item";
+
+ type P = K2VItemPartition;
+ type S = String;
+ type E = K2VItem;
+ type Filter = ItemFilter;
+
+ fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
+ // 1. Count
+ let (old_entries, old_conflicts, old_values, old_bytes) = match old {
+ None => (0, 0, 0, 0),
+ Some(e) => e.stats(),
+ };
+ let (new_entries, new_conflicts, new_values, new_bytes) = match new {
+ None => (0, 0, 0, 0),
+ Some(e) => e.stats(),
+ };
+
+ let count_pk = old
+ .map(|e| e.partition.bucket_id)
+ .unwrap_or_else(|| new.unwrap().partition.bucket_id);
+ let count_sk = old
+ .map(|e| &e.partition.partition_key)
+ .unwrap_or_else(|| &new.unwrap().partition.partition_key);
+
+ if let Err(e) = self.counter_table.count(
+ &count_pk,
+ count_sk,
+ &[
+ (ENTRIES, new_entries - old_entries),
+ (CONFLICTS, new_conflicts - old_conflicts),
+ (VALUES, new_values - old_values),
+ (BYTES, new_bytes - old_bytes),
+ ],
+ ) {
+ error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e);
+ }
+
+ // 2. Notify
+ if let Some(new_ent) = new {
+ self.subscriptions.notify(new_ent);
+ }
+ }
+
+ #[allow(clippy::nonminimal_bool)]
+ fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
+ let v = entry.values();
+ !(filter.conflicts_only && v.len() < 2)
+ && !(filter.exclude_only_tombstones && entry.is_tombstone())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_dvvsentry_merge_simple() {
+ let e1 = DvvsEntry {
+ t_discard: 4,
+ values: vec![
+ (5, DvvsValue::Value(vec![15])),
+ (6, DvvsValue::Value(vec![16])),
+ ],
+ };
+ let e2 = DvvsEntry {
+ t_discard: 5,
+ values: vec![(6, DvvsValue::Value(vec![16])), (7, DvvsValue::Deleted)],
+ };
+
+ let mut e3 = e1.clone();
+ e3.merge(&e2);
+ assert_eq!(e2, e3);
+ }
+}
diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs
new file mode 100644
index 00000000..664172a6
--- /dev/null
+++ b/src/model/k2v/mod.rs
@@ -0,0 +1,7 @@
+pub mod causality;
+
+pub mod counter_table;
+pub mod item_table;
+
+pub mod poll;
+pub mod rpc;
diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs
new file mode 100644
index 00000000..93105207
--- /dev/null
+++ b/src/model/k2v/poll.rs
@@ -0,0 +1,50 @@
+use std::collections::HashMap;
+use std::sync::Mutex;
+
+use serde::{Deserialize, Serialize};
+use tokio::sync::broadcast;
+
+use crate::k2v::item_table::*;
+
+#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct PollKey {
+ pub partition: K2VItemPartition,
+ pub sort_key: String,
+}
+
+#[derive(Default)]
+pub struct SubscriptionManager {
+ subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>,
+}
+
+impl SubscriptionManager {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
+ let mut subs = self.subscriptions.lock().unwrap();
+ if let Some(s) = subs.get(key) {
+ s.subscribe()
+ } else {
+ let (tx, rx) = broadcast::channel(8);
+ subs.insert(key.clone(), tx);
+ rx
+ }
+ }
+
+ pub fn notify(&self, item: &K2VItem) {
+ let key = PollKey {
+ partition: item.partition.clone(),
+ sort_key: item.sort_key.clone(),
+ };
+ let mut subs = self.subscriptions.lock().unwrap();
+ if let Some(s) = subs.get(&key) {
+ if s.send(item.clone()).is_err() {
+ // no more subscribers, remove channel from here
+ // (we will re-create it later if we need to subscribe again)
+ subs.remove(&key);
+ }
+ }
+ }
+}
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
new file mode 100644
index 00000000..90101d0f
--- /dev/null
+++ b/src/model/k2v/rpc.rs
@@ -0,0 +1,343 @@
+//! Module that implements RPCs specific to K2V.
+//! This is necessary for insertions into the K2V store,
+//! as they have to be transmitted to one of the nodes responsible
+//! for storing the entry to be processed (the API entry
+//! node does not process the entry directly, as this would
+//! mean the vector clock gets much larger than needed).
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
+use serde::{Deserialize, Serialize};
+use tokio::select;
+
+use garage_util::crdt::*;
+use garage_util::data::*;
+use garage_util::error::*;
+
+use garage_rpc::system::System;
+use garage_rpc::*;
+
+use garage_table::replication::{TableReplication, TableShardedReplication};
+use garage_table::table::TABLE_RPC_TIMEOUT;
+use garage_table::{PartitionKey, Table};
+
+use crate::k2v::causality::*;
+use crate::k2v::item_table::*;
+use crate::k2v::poll::*;
+
+/// RPC messages for K2V
+#[derive(Debug, Serialize, Deserialize)]
+enum K2VRpc {
+ Ok,
+ InsertItem(InsertedItem),
+ InsertManyItems(Vec<InsertedItem>),
+ PollItem {
+ key: PollKey,
+ causal_context: CausalContext,
+ timeout_msec: u64,
+ },
+ PollItemResponse(Option<K2VItem>),
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct InsertedItem {
+ partition: K2VItemPartition,
+ sort_key: String,
+ causal_context: Option<CausalContext>,
+ value: DvvsValue,
+}
+
+impl Rpc for K2VRpc {
+ type Response = Result<K2VRpc, Error>;
+}
+
+/// The block manager, handling block exchange between nodes, and block storage on local node
+pub struct K2VRpcHandler {
+ system: Arc<System>,
+ item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
+ endpoint: Arc<Endpoint<K2VRpc, Self>>,
+ subscriptions: Arc<SubscriptionManager>,
+}
+
+impl K2VRpcHandler {
+ pub fn new(
+ system: Arc<System>,
+ item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
+ subscriptions: Arc<SubscriptionManager>,
+ ) -> Arc<Self> {
+ let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string());
+
+ let rpc_handler = Arc::new(Self {
+ system,
+ item_table,
+ endpoint,
+ subscriptions,
+ });
+ rpc_handler.endpoint.set_handler(rpc_handler.clone());
+
+ rpc_handler
+ }
+
+ // ---- public interface ----
+
+ pub async fn insert(
+ &self,
+ bucket_id: Uuid,
+ partition_key: String,
+ sort_key: String,
+ causal_context: Option<CausalContext>,
+ value: DvvsValue,
+ ) -> Result<(), Error> {
+ let partition = K2VItemPartition {
+ bucket_id,
+ partition_key,
+ };
+ let mut who = self
+ .item_table
+ .data
+ .replication
+ .write_nodes(&partition.hash());
+ who.sort();
+
+ self.system
+ .rpc
+ .try_call_many(
+ &self.endpoint,
+ &who[..],
+ K2VRpc::InsertItem(InsertedItem {
+ partition,
+ sort_key,
+ causal_context,
+ value,
+ }),
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(1)
+ .with_timeout(TABLE_RPC_TIMEOUT)
+ .interrupt_after_quorum(true),
+ )
+ .await?;
+
+ Ok(())
+ }
+
+ pub async fn insert_batch(
+ &self,
+ bucket_id: Uuid,
+ items: Vec<(String, String, Option<CausalContext>, DvvsValue)>,
+ ) -> Result<(), Error> {
+ let n_items = items.len();
+
+ let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
+
+ for (partition_key, sort_key, causal_context, value) in items {
+ let partition = K2VItemPartition {
+ bucket_id,
+ partition_key,
+ };
+ let mut who = self
+ .item_table
+ .data
+ .replication
+ .write_nodes(&partition.hash());
+ who.sort();
+
+ call_list.entry(who).or_default().push(InsertedItem {
+ partition,
+ sort_key,
+ causal_context,
+ value,
+ });
+ }
+
+ debug!(
+ "K2V insert_batch: {} requests to insert {} items",
+ call_list.len(),
+ n_items
+ );
+ let call_futures = call_list.into_iter().map(|(nodes, items)| async move {
+ let resp = self
+ .system
+ .rpc
+ .try_call_many(
+ &self.endpoint,
+ &nodes[..],
+ K2VRpc::InsertManyItems(items),
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(1)
+ .with_timeout(TABLE_RPC_TIMEOUT)
+ .interrupt_after_quorum(true),
+ )
+ .await?;
+ Ok::<_, Error>((nodes, resp))
+ });
+
+ let mut resps = call_futures.collect::<FuturesUnordered<_>>();
+ while let Some(resp) = resps.next().await {
+ resp?;
+ }
+
+ Ok(())
+ }
+
+ pub async fn poll(
+ &self,
+ bucket_id: Uuid,
+ partition_key: String,
+ sort_key: String,
+ causal_context: CausalContext,
+ timeout_msec: u64,
+ ) -> Result<Option<K2VItem>, Error> {
+ let poll_key = PollKey {
+ partition: K2VItemPartition {
+ bucket_id,
+ partition_key,
+ },
+ sort_key,
+ };
+ let nodes = self
+ .item_table
+ .data
+ .replication
+ .write_nodes(&poll_key.partition.hash());
+
+ let resps = self
+ .system
+ .rpc
+ .try_call_many(
+ &self.endpoint,
+ &nodes[..],
+ K2VRpc::PollItem {
+ key: poll_key,
+ causal_context,
+ timeout_msec,
+ },
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(self.item_table.data.replication.read_quorum())
+ .with_timeout(Duration::from_millis(timeout_msec) + TABLE_RPC_TIMEOUT),
+ )
+ .await?;
+
+ let mut resp: Option<K2VItem> = None;
+ for v in resps {
+ match v {
+ K2VRpc::PollItemResponse(Some(x)) => {
+ if let Some(y) = &mut resp {
+ y.merge(&x);
+ } else {
+ resp = Some(x);
+ }
+ }
+ K2VRpc::PollItemResponse(None) => {
+ return Ok(None);
+ }
+ v => return Err(Error::unexpected_rpc_message(v)),
+ }
+ }
+
+ Ok(resp)
+ }
+
+ // ---- internal handlers ----
+
+ async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> {
+ let new = self.local_insert(item)?;
+
+ // Propagate to rest of network
+ if let Some(updated) = new {
+ self.item_table.insert(&updated).await?;
+ }
+
+ Ok(K2VRpc::Ok)
+ }
+
+ async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result<K2VRpc, Error> {
+ let mut updated_vec = vec![];
+
+ for item in items {
+ let new = self.local_insert(item)?;
+
+ if let Some(updated) = new {
+ updated_vec.push(updated);
+ }
+ }
+
+ // Propagate to rest of network
+ if !updated_vec.is_empty() {
+ self.item_table.insert_many(&updated_vec).await?;
+ }
+
+ Ok(K2VRpc::Ok)
+ }
+
+ fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
+ let tree_key = self
+ .item_table
+ .data
+ .tree_key(&item.partition, &item.sort_key);
+
+ self.item_table
+ .data
+ .update_entry_with(&tree_key[..], |ent| {
+ let mut ent = ent.unwrap_or_else(|| {
+ K2VItem::new(
+ item.partition.bucket_id,
+ item.partition.partition_key.clone(),
+ item.sort_key.clone(),
+ )
+ });
+ ent.update(self.system.id, &item.causal_context, item.value.clone());
+ ent
+ })
+ }
+
+ async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
+ let mut chan = self.subscriptions.subscribe(key);
+
+ let mut value = self
+ .item_table
+ .data
+ .read_entry(&key.partition, &key.sort_key)?
+ .map(|bytes| self.item_table.data.decode_entry(&bytes[..]))
+ .transpose()?
+ .unwrap_or_else(|| {
+ K2VItem::new(
+ key.partition.bucket_id,
+ key.partition.partition_key.clone(),
+ key.sort_key.clone(),
+ )
+ });
+
+ while !value.causal_context().is_newer_than(ct) {
+ value = chan.recv().await?;
+ }
+
+ Ok(value)
+ }
+}
+
+#[async_trait]
+impl EndpointHandler<K2VRpc> for K2VRpcHandler {
+ async fn handle(self: &Arc<Self>, message: &K2VRpc, _from: NodeID) -> Result<K2VRpc, Error> {
+ match message {
+ K2VRpc::InsertItem(item) => self.handle_insert(item).await,
+ K2VRpc::InsertManyItems(items) => self.handle_insert_many(&items[..]).await,
+ K2VRpc::PollItem {
+ key,
+ causal_context,
+ timeout_msec,
+ } => {
+ let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
+ select! {
+ ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
+ _ = delay => Ok(K2VRpc::PollItemResponse(None)),
+ }
+ }
+ m => Err(Error::unexpected_rpc_message(m)),
+ }
+ }
+}
diff --git a/src/model/lib.rs b/src/model/lib.rs
index 05a4cdc7..7c9d9270 100644
--- a/src/model/lib.rs
+++ b/src/model/lib.rs
@@ -3,12 +3,15 @@ extern crate tracing;
pub mod permission;
-pub mod block_ref_table;
+pub mod index_counter;
+
pub mod bucket_alias_table;
pub mod bucket_table;
pub mod key_table;
-pub mod object_table;
-pub mod version_table;
+
+#[cfg(feature = "k2v")]
+pub mod k2v;
+pub mod s3;
pub mod garage;
pub mod helper;
diff --git a/src/model/block_ref_table.rs b/src/model/s3/block_ref_table.rs
index b6945403..9b3991bf 100644
--- a/src/model/block_ref_table.rs
+++ b/src/model/s3/block_ref_table.rs
@@ -51,11 +51,11 @@ impl TableSchema for BlockRefTable {
type E = BlockRef;
type Filter = DeletedFilter;
- fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
+ fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
#[allow(clippy::or_fun_call)]
- let block = &old.as_ref().or(new.as_ref()).unwrap().block;
- let was_before = old.as_ref().map(|x| !x.deleted.get()).unwrap_or(false);
- let is_after = new.as_ref().map(|x| !x.deleted.get()).unwrap_or(false);
+ let block = &old.or(new).unwrap().block;
+ let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false);
+ let is_after = new.map(|x| !x.deleted.get()).unwrap_or(false);
if is_after && !was_before {
if let Err(e) = self.block_manager.block_incref(block) {
warn!("block_incref failed for block {:?}: {}", block, e);
diff --git a/src/model/s3/mod.rs b/src/model/s3/mod.rs
new file mode 100644
index 00000000..4e94337d
--- /dev/null
+++ b/src/model/s3/mod.rs
@@ -0,0 +1,3 @@
+pub mod block_ref_table;
+pub mod object_table;
+pub mod version_table;
diff --git a/src/model/object_table.rs b/src/model/s3/object_table.rs
index da53878e..3d9a89f7 100644
--- a/src/model/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -9,7 +9,7 @@ use garage_table::crdt::*;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
-use crate::version_table::*;
+use crate::s3::version_table::*;
use garage_model_050::object_table as old;
@@ -232,8 +232,11 @@ impl TableSchema for ObjectTable {
type E = Object;
type Filter = ObjectFilter;
- fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
+ fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
let version_table = self.version_table.clone();
+ let old = old.cloned();
+ let new = new.cloned();
+
self.background.spawn(async move {
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of old versions
diff --git a/src/model/version_table.rs b/src/model/s3/version_table.rs
index 839b1f4f..ad096772 100644
--- a/src/model/version_table.rs
+++ b/src/model/s3/version_table.rs
@@ -8,7 +8,7 @@ use garage_table::crdt::*;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
-use crate::block_ref_table::*;
+use crate::s3::block_ref_table::*;
use garage_model_050::version_table as old;
@@ -137,8 +137,11 @@ impl TableSchema for VersionTable {
type E = Version;
type Filter = DeletedFilter;
- fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
+ fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
let block_ref_table = self.block_ref_table.clone();
+ let old = old.cloned();
+ let new = new.cloned();
+
self.background.spawn(async move {
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of version blocks