aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/data.rs28
-rw-r--r--src/table/merkle.rs4
-rw-r--r--src/table/queue.rs4
-rw-r--r--src/table/schema.rs6
4 files changed, 24 insertions, 18 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index e76836ca..bbfdf58b 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -34,7 +34,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) merkle_todo_notify: Notify,
pub(crate) insert_queue: db::Tree,
- pub(crate) insert_queue_notify: Notify,
+ pub(crate) insert_queue_notify: Arc<Notify>,
pub(crate) gc_todo: CountedTree,
@@ -80,7 +80,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
merkle_todo,
merkle_todo_notify: Notify::new(),
insert_queue,
- insert_queue_notify: Notify::new(),
+ insert_queue_notify: Arc::new(Notify::new()),
gc_todo,
metrics,
})
@@ -203,14 +203,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
) -> Result<Option<F::E>, Error> {
let tree_key = self.tree_key(partition_key, sort_key);
- let changed = self.store.db().transaction(|mut tx| {
+ let changed = self.store.db().transaction(|tx| {
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
Some(old_bytes) => {
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
- let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?;
+ let new_entry = update_fn(tx, Some(old_entry.clone()))?;
(Some(old_entry), Some(old_bytes), new_entry)
}
- None => (None, None, update_fn(&mut tx, None)?),
+ None => (None, None, update_fn(tx, None)?),
};
// Changed can be true in two scenarios
@@ -233,7 +233,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
tx.insert(&self.store, &tree_key, new_bytes)?;
self.instance
- .updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?;
+ .updated(tx, old_entry.as_ref(), Some(&new_entry))?;
Ok(Some((new_entry, new_bytes_hash)))
} else {
@@ -270,14 +270,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
let removed = self
.store
.db()
- .transaction(|mut tx| match tx.get(&self.store, k)? {
+ .transaction(|tx| match tx.get(&self.store, k)? {
Some(cur_v) if cur_v == v => {
let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?;
- self.instance.updated(&mut tx, Some(&old_entry), None)?;
+ self.instance.updated(tx, Some(&old_entry), None)?;
Ok(true)
}
_ => Ok(false),
@@ -298,14 +298,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
let removed = self
.store
.db()
- .transaction(|mut tx| match tx.get(&self.store, k)? {
+ .transaction(|tx| match tx.get(&self.store, k)? {
Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?;
- self.instance.updated(&mut tx, Some(&old_entry), None)?;
+ self.instance.updated(tx, Some(&old_entry), None)?;
Ok(true)
}
_ => Ok(false),
@@ -339,7 +339,9 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
.map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?;
tx.insert(&self.insert_queue, &tree_key, new_entry)?;
- self.insert_queue_notify.notify_one();
+
+ let notif = self.insert_queue_notify.clone();
+ tx.on_commit(move || notif.notify_one());
Ok(())
}
@@ -347,9 +349,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
// ---- Utility functions ----
pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
- let mut ret = p.hash().to_vec();
- ret.extend(s.sort_key());
- ret
+ [p.hash().as_slice(), s.sort_key()].concat()
}
pub fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index e86d0251..4577f872 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -108,9 +108,9 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
self.data
.merkle_tree
.db()
- .transaction(|mut tx| self.update_item_rec(&mut tx, k, &khash, &key, new_vhash))?;
+ .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?;
- let deleted = self.data.merkle_todo.db().transaction(|mut tx| {
+ let deleted = self.data.merkle_todo.db().transaction(|tx| {
let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by);
if remove {
tx.remove(&self.data.merkle_todo, k)?;
diff --git a/src/table/queue.rs b/src/table/queue.rs
index 0857209b..ffe0a4a7 100644
--- a/src/table/queue.rs
+++ b/src/table/queue.rs
@@ -12,7 +12,7 @@ use crate::replication::*;
use crate::schema::*;
use crate::table::*;
-const BATCH_SIZE: usize = 100;
+const BATCH_SIZE: usize = 1024;
pub(crate) struct InsertQueueWorker<F, R>(pub(crate) Arc<Table<F, R>>)
where
@@ -53,7 +53,7 @@ impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> {
self.0.insert_many(values).await?;
- self.0.data.insert_queue.db().transaction(|mut tx| {
+ self.0.data.insert_queue.db().transaction(|tx| {
for (k, v) in kv_pairs.iter() {
if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? {
if &v2 == v {
diff --git a/src/table/schema.rs b/src/table/schema.rs
index 5cbf6c95..fc1a465e 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -6,6 +6,8 @@ use garage_util::migrate::Migrate;
use crate::crdt::Crdt;
+// =================================== PARTITION KEYS
+
/// Trait for field used to partition data
pub trait PartitionKey:
Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static
@@ -29,6 +31,8 @@ impl PartitionKey for FixedBytes32 {
}
}
+// =================================== SORT KEYS
+
/// Trait for field used to sort data
pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
/// Get the key used to sort
@@ -47,6 +51,8 @@ impl SortKey for FixedBytes32 {
}
}
+// =================================== SCHEMA
+
/// Trait for an entry in a table. It must be sortable and partitionnable.
pub trait Entry<P: PartitionKey, S: SortKey>:
Crdt + PartialEq + Clone + Migrate + Send + Sync + 'static