diff options
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/data.rs | 28 | ||||
-rw-r--r-- | src/table/merkle.rs | 4 | ||||
-rw-r--r-- | src/table/queue.rs | 4 | ||||
-rw-r--r-- | src/table/schema.rs | 6 |
4 files changed, 24 insertions, 18 deletions
diff --git a/src/table/data.rs b/src/table/data.rs index e76836ca..bbfdf58b 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -34,7 +34,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> { pub(crate) merkle_todo_notify: Notify, pub(crate) insert_queue: db::Tree, - pub(crate) insert_queue_notify: Notify, + pub(crate) insert_queue_notify: Arc<Notify>, pub(crate) gc_todo: CountedTree, @@ -80,7 +80,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { merkle_todo, merkle_todo_notify: Notify::new(), insert_queue, - insert_queue_notify: Notify::new(), + insert_queue_notify: Arc::new(Notify::new()), gc_todo, metrics, }) @@ -203,14 +203,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { ) -> Result<Option<F::E>, Error> { let tree_key = self.tree_key(partition_key, sort_key); - let changed = self.store.db().transaction(|mut tx| { + let changed = self.store.db().transaction(|tx| { let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? { Some(old_bytes) => { let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; - let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?; + let new_entry = update_fn(tx, Some(old_entry.clone()))?; (Some(old_entry), Some(old_bytes), new_entry) } - None => (None, None, update_fn(&mut tx, None)?), + None => (None, None, update_fn(tx, None)?), }; // Changed can be true in two scenarios @@ -233,7 +233,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { tx.insert(&self.store, &tree_key, new_bytes)?; self.instance - .updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?; + .updated(tx, old_entry.as_ref(), Some(&new_entry))?; Ok(Some((new_entry, new_bytes_hash))) } else { @@ -270,14 +270,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { let removed = self .store .db() - .transaction(|mut tx| match tx.get(&self.store, k)? { + .transaction(|tx| match tx.get(&self.store, k)? { Some(cur_v) if cur_v == v => { let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?; tx.remove(&self.store, k)?; tx.insert(&self.merkle_todo, k, vec![])?; - self.instance.updated(&mut tx, Some(&old_entry), None)?; + self.instance.updated(tx, Some(&old_entry), None)?; Ok(true) } _ => Ok(false), @@ -298,14 +298,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { let removed = self .store .db() - .transaction(|mut tx| match tx.get(&self.store, k)? { + .transaction(|tx| match tx.get(&self.store, k)? { Some(cur_v) if blake2sum(&cur_v[..]) == vhash => { let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?; tx.remove(&self.store, k)?; tx.insert(&self.merkle_todo, k, vec![])?; - self.instance.updated(&mut tx, Some(&old_entry), None)?; + self.instance.updated(tx, Some(&old_entry), None)?; Ok(true) } _ => Ok(false), @@ -339,7 +339,9 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { .map_err(Error::RmpEncode) .map_err(db::TxError::Abort)?; tx.insert(&self.insert_queue, &tree_key, new_entry)?; - self.insert_queue_notify.notify_one(); + + let notif = self.insert_queue_notify.clone(); + tx.on_commit(move || notif.notify_one()); Ok(()) } @@ -347,9 +349,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { // ---- Utility functions ---- pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { - let mut ret = p.hash().to_vec(); - ret.extend(s.sort_key()); - ret + [p.hash().as_slice(), s.sort_key()].concat() } pub fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> { diff --git a/src/table/merkle.rs b/src/table/merkle.rs index e86d0251..4577f872 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -108,9 +108,9 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> { self.data .merkle_tree .db() - .transaction(|mut tx| self.update_item_rec(&mut tx, k, &khash, &key, new_vhash))?; + .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?; - let deleted = self.data.merkle_todo.db().transaction(|mut tx| { + let deleted = self.data.merkle_todo.db().transaction(|tx| { let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by); if remove { tx.remove(&self.data.merkle_todo, k)?; diff --git a/src/table/queue.rs b/src/table/queue.rs index 0857209b..ffe0a4a7 100644 --- a/src/table/queue.rs +++ b/src/table/queue.rs @@ -12,7 +12,7 @@ use crate::replication::*; use crate::schema::*; use crate::table::*; -const BATCH_SIZE: usize = 100; +const BATCH_SIZE: usize = 1024; pub(crate) struct InsertQueueWorker<F, R>(pub(crate) Arc<Table<F, R>>) where @@ -53,7 +53,7 @@ impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> { self.0.insert_many(values).await?; - self.0.data.insert_queue.db().transaction(|mut tx| { + self.0.data.insert_queue.db().transaction(|tx| { for (k, v) in kv_pairs.iter() { if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? { if &v2 == v { diff --git a/src/table/schema.rs b/src/table/schema.rs index 5cbf6c95..fc1a465e 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -6,6 +6,8 @@ use garage_util::migrate::Migrate; use crate::crdt::Crdt; +// =================================== PARTITION KEYS + /// Trait for field used to partition data pub trait PartitionKey: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static @@ -29,6 +31,8 @@ impl PartitionKey for FixedBytes32 { } } +// =================================== SORT KEYS + /// Trait for field used to sort data pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static { /// Get the key used to sort @@ -47,6 +51,8 @@ impl SortKey for FixedBytes32 { } } +// =================================== SCHEMA + /// Trait for an entry in a table. It must be sortable and partitionnable. pub trait Entry<P: PartitionKey, S: SortKey>: Crdt + PartialEq + Clone + Migrate + Send + Sync + 'static |