aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-01-04 11:34:43 +0100
committerAlex Auvolat <alex@adnab.me>2023-01-04 11:34:43 +0100
commit570e5e5bbb7a3eac41350db9433e28ed289b97f4 (patch)
treea7fc299ba180098be5a3bef28a39256870ce697b /src/table
parent6e44369cbc810b8912ca0f7f5fd293e87f10c851 (diff)
parent4eb8ca3a528dae2848141f5cc3eb607eb7d40114 (diff)
downloadgarage-570e5e5bbb7a3eac41350db9433e28ed289b97f4.tar.gz
garage-570e5e5bbb7a3eac41350db9433e28ed289b97f4.zip
Merge branch 'main' into next
Diffstat (limited to 'src/table')
-rw-r--r--src/table/Cargo.toml10
-rw-r--r--src/table/data.rs118
-rw-r--r--src/table/gc.rs58
-rw-r--r--src/table/lib.rs8
-rw-r--r--src/table/merkle.rs61
-rw-r--r--src/table/metrics.rs38
-rw-r--r--src/table/queue.rs77
-rw-r--r--src/table/replication/parameters.rs2
-rw-r--r--src/table/schema.rs22
-rw-r--r--src/table/sync.rs87
-rw-r--r--src/table/table.rs58
-rw-r--r--src/table/util.rs6
12 files changed, 341 insertions, 204 deletions
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index 38c6b41c..3911c945 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_table"
-version = "0.8.0"
+version = "0.8.1"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,20 +14,20 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_db = { version = "0.8.0", path = "../db" }
-garage_rpc = { version = "0.8.0", path = "../rpc" }
-garage_util = { version = "0.8.0", path = "../util" }
+garage_db = { version = "0.8.1", path = "../db" }
+garage_rpc = { version = "0.8.1", path = "../rpc" }
+garage_util = { version = "0.8.1", path = "../util" }
opentelemetry = "0.17"
async-trait = "0.1.7"
+arc-swap = "1.0"
bytes = "1.0"
hex = "0.4"
hexdump = "0.1"
tracing = "0.1.30"
rand = "0.8"
-rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/table/data.rs b/src/table/data.rs
index 3212e82b..5c792f1f 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -10,6 +10,7 @@ use garage_db::counted_tree_hack::CountedTree;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::migrate::Migrate;
use garage_rpc::system::System;
@@ -31,16 +32,16 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) merkle_tree: db::Tree,
pub(crate) merkle_todo: db::Tree,
pub(crate) merkle_todo_notify: Notify,
+
+ pub(crate) insert_queue: db::Tree,
+ pub(crate) insert_queue_notify: Notify,
+
pub(crate) gc_todo: CountedTree,
pub(crate) metrics: TableMetrics,
}
-impl<F, R> TableData<F, R>
-where
- F: TableSchema,
- R: TableReplication,
-{
+impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> {
let store = db
.open_tree(&format!("{}:table", F::TABLE_NAME))
@@ -53,12 +54,22 @@ where
.open_tree(&format!("{}:merkle_todo", F::TABLE_NAME))
.expect("Unable to open DB Merkle TODO tree");
+ let insert_queue = db
+ .open_tree(&format!("{}:insert_queue", F::TABLE_NAME))
+ .expect("Unable to open insert queue DB tree");
+
let gc_todo = db
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
- .expect("Unable to open DB tree");
+ .expect("Unable to open GC DB tree");
let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2");
- let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone());
+ let metrics = TableMetrics::new(
+ F::TABLE_NAME,
+ store.clone(),
+ merkle_tree.clone(),
+ merkle_todo.clone(),
+ gc_todo.clone(),
+ );
Arc::new(Self {
system,
@@ -68,6 +79,8 @@ where
merkle_tree,
merkle_todo,
merkle_todo_notify: Notify::new(),
+ insert_queue,
+ insert_queue_notify: Notify::new(),
gc_todo,
metrics,
})
@@ -167,9 +180,8 @@ where
pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> {
let update = self.decode_entry(update_bytes)?;
- let tree_key = self.tree_key(update.partition_key(), update.sort_key());
- self.update_entry_with(&tree_key[..], |ent| match ent {
+ self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent {
Some(mut ent) => {
ent.merge(&update);
ent
@@ -181,11 +193,14 @@ where
pub fn update_entry_with(
&self,
- tree_key: &[u8],
+ partition_key: &F::P,
+ sort_key: &F::S,
f: impl Fn(Option<F::E>) -> F::E,
) -> Result<Option<F::E>, Error> {
+ let tree_key = self.tree_key(partition_key, sort_key);
+
let changed = self.store.db().transaction(|mut tx| {
- let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? {
+ let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
Some(old_bytes) => {
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
let new_entry = f(Some(old_entry.clone()));
@@ -194,23 +209,24 @@ where
None => (None, None, f(None)),
};
- // Scenario 1: the value changed, so of course there is a change
- let value_changed = Some(&new_entry) != old_entry.as_ref();
-
+ // Changed can be true in two scenarios
+ // Scenario 1: the actual represented value changed,
+ // so of course the messagepack encoding changed as well
// Scenario 2: the value didn't change but due to a migration in the
- // data format, the messagepack encoding changed. In this case
- // we have to write the migrated value in the table and update
- // the associated Merkle tree entry.
- let new_bytes = rmp_to_vec_all_named(&new_entry)
+ // data format, the messagepack encoding changed. In this case,
+ // we also have to write the migrated value in the table and update
+ // the associated Merkle tree entry.
+ let new_bytes = new_entry
+ .encode()
.map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?;
- let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]);
+ let changed = Some(&new_bytes[..]) != old_bytes.as_deref();
drop(old_bytes);
- if value_changed || encoding_changed {
- let new_bytes_hash = blake2sum(&new_bytes[..]);
- tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?;
- tx.insert(&self.store, tree_key, new_bytes)?;
+ if changed {
+ let new_bytes_hash = blake2sum(&new_bytes);
+ tx.insert(&self.merkle_todo, &tree_key, new_bytes_hash.as_slice())?;
+ tx.insert(&self.store, &tree_key, new_bytes)?;
self.instance
.updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?;
@@ -236,7 +252,7 @@ where
let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
let nodes = self.replication.write_nodes(&pk_hash);
if nodes.first() == Some(&self.system.id) {
- GcTodoEntry::new(tree_key.to_vec(), new_bytes_hash).save(&self.gc_todo)?;
+ GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?;
}
}
@@ -252,10 +268,11 @@ where
.db()
.transaction(|mut tx| match tx.get(&self.store, k)? {
Some(cur_v) if cur_v == v => {
+ let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
+
tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?;
- let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
self.instance.updated(&mut tx, Some(&old_entry), None)?;
Ok(true)
}
@@ -279,10 +296,11 @@ where
.db()
.transaction(|mut tx| match tx.get(&self.store, k)? {
Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
+ let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
+
tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?;
- let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
self.instance.updated(&mut tx, Some(&old_entry), None)?;
Ok(true)
}
@@ -296,6 +314,32 @@ where
Ok(removed)
}
+ // ---- Insert queue functions ----
+
+ pub(crate) fn queue_insert(
+ &self,
+ tx: &mut db::Transaction,
+ ins: &F::E,
+ ) -> db::TxResult<(), Error> {
+ let tree_key = self.tree_key(ins.partition_key(), ins.sort_key());
+
+ let new_entry = match tx.get(&self.insert_queue, &tree_key)? {
+ Some(old_v) => {
+ let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?;
+ entry.merge(ins);
+ entry.encode()
+ }
+ None => ins.encode(),
+ };
+ let new_entry = new_entry
+ .map_err(Error::RmpEncode)
+ .map_err(db::TxError::Abort)?;
+ tx.insert(&self.insert_queue, &tree_key, new_entry)?;
+ self.insert_queue_notify.notify_one();
+
+ Ok(())
+ }
+
// ---- Utility functions ----
pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
@@ -305,18 +349,18 @@ where
}
pub fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
- match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
- Ok(x) => Ok(x),
- Err(e) => match F::try_migrate(bytes) {
- Some(x) => Ok(x),
- None => {
- warn!("Unable to decode entry of {}: {}", F::TABLE_NAME, e);
- for line in hexdump::hexdump_iter(bytes) {
- debug!("{}", line);
- }
- Err(e.into())
+ match F::E::decode(bytes) {
+ Some(x) => Ok(x),
+ None => {
+ error!("Unable to decode entry of {}", F::TABLE_NAME);
+ for line in hexdump::hexdump_iter(bytes) {
+ debug!("{}", line);
}
- },
+ Err(Error::Message(format!(
+ "Unable to decode entry of {}",
+ F::TABLE_NAME
+ )))
+ }
}
}
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 83e7eeff..5b9124a7 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -31,7 +31,7 @@ const TABLE_GC_BATCH_SIZE: usize = 1024;
// and the moment the garbage collection actually happens)
const TABLE_GC_DELAY: Duration = Duration::from_secs(24 * 3600);
-pub(crate) struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> {
+pub(crate) struct TableGc<F: TableSchema, R: TableReplication> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
@@ -49,29 +49,26 @@ impl Rpc for GcRpc {
type Response = Result<GcRpc, Error>;
}
-impl<F, R> TableGc<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- pub(crate) fn launch(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
+impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
+ pub(crate) fn new(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
let endpoint = system
.netapp
.endpoint(format!("garage_table/gc.rs/Rpc:{}", F::TABLE_NAME));
let gc = Arc::new(Self {
- system: system.clone(),
+ system,
data,
endpoint,
});
-
gc.endpoint.set_handler(gc.clone());
- system.background.spawn_worker(GcWorker::new(gc.clone()));
-
gc
}
+ pub(crate) fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
+ bg.spawn_worker(GcWorker::new(self.clone()));
+ }
+
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
let now = now_msec();
@@ -276,11 +273,7 @@ where
}
#[async_trait]
-impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> {
async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> {
match message {
GcRpc::Update(items) => {
@@ -298,20 +291,12 @@ where
}
}
-struct GcWorker<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+struct GcWorker<F: TableSchema, R: TableReplication> {
gc: Arc<TableGc<F, R>>,
wait_delay: Duration,
}
-impl<F, R> GcWorker<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> GcWorker<F, R> {
fn new(gc: Arc<TableGc<F, R>>) -> Self {
Self {
gc,
@@ -321,21 +306,15 @@ where
}
#[async_trait]
-impl<F, R> Worker for GcWorker<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> Worker for GcWorker<F, R> {
fn name(&self) -> String {
format!("{} GC", F::TABLE_NAME)
}
- fn info(&self) -> Option<String> {
- let l = self.gc.data.gc_todo_len().unwrap_or(0);
- if l > 0 {
- Some(format!("{} items in queue", l))
- } else {
- None
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ queue_length: Some(self.gc.data.gc_todo_len().unwrap_or(0) as u64),
+ ..Default::default()
}
}
@@ -349,10 +328,7 @@ where
}
}
- async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
- if *must_exit.borrow() {
- return WorkerState::Done;
- }
+ async fn wait_for_work(&mut self) -> WorkerState {
tokio::time::sleep(self.wait_delay).await;
WorkerState::Busy
}
diff --git a/src/table/lib.rs b/src/table/lib.rs
index b0153e9a..fdf114a6 100644
--- a/src/table/lib.rs
+++ b/src/table/lib.rs
@@ -4,16 +4,18 @@
#[macro_use]
extern crate tracing;
-mod metrics;
pub mod schema;
pub mod util;
pub mod data;
+pub mod replication;
+pub mod table;
+
mod gc;
mod merkle;
-pub mod replication;
+mod metrics;
+mod queue;
mod sync;
-pub mod table;
pub use schema::*;
pub use table::*;
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index a5c29723..e86d0251 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -3,12 +3,14 @@ use std::time::Duration;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
+use tokio::select;
use tokio::sync::watch;
use garage_db as db;
use garage_util::background::*;
use garage_util::data::*;
+use garage_util::encode::{nonversioned_decode, nonversioned_encode};
use garage_util::error::Error;
use garage_rpc::ring::*;
@@ -64,22 +66,18 @@ pub enum MerkleNode {
Leaf(Vec<u8>, Hash),
}
-impl<F, R> MerkleUpdater<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- pub(crate) fn launch(background: &BackgroundRunner, data: Arc<TableData<F, R>>) -> Arc<Self> {
- let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
+impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
+ pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> {
+ let empty_node_hash = blake2sum(&nonversioned_encode(&MerkleNode::Empty).unwrap()[..]);
- let ret = Arc::new(Self {
+ Arc::new(Self {
data,
empty_node_hash,
- });
-
- background.spawn_worker(MerkleWorker(ret.clone()));
+ })
+ }
- ret
+ pub(crate) fn spawn_workers(self: &Arc<Self>, background: &BackgroundRunner) {
+ background.spawn_worker(MerkleWorker(self.clone()));
}
fn updater_loop_iter(&self) -> Result<WorkerState, Error> {
@@ -276,7 +274,7 @@ where
tx.remove(&self.data.merkle_tree, k.encode())?;
Ok(self.empty_node_hash)
} else {
- let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?;
+ let vby = nonversioned_encode(v).map_err(|e| db::TxError::Abort(e.into()))?;
let rethash = blake2sum(&vby[..]);
tx.insert(&self.data.merkle_tree, k.encode(), vby)?;
Ok(rethash)
@@ -293,32 +291,27 @@ where
Ok(self.data.merkle_tree.len()?)
}
+ pub fn merkle_tree_fast_len(&self) -> Result<Option<usize>, Error> {
+ Ok(self.data.merkle_tree.fast_len()?)
+ }
+
pub fn todo_len(&self) -> Result<usize, Error> {
Ok(self.data.merkle_todo.len()?)
}
}
-struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>)
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static;
+struct MerkleWorker<F: TableSchema, R: TableReplication>(Arc<MerkleUpdater<F, R>>);
#[async_trait]
-impl<F, R> Worker for MerkleWorker<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> Worker for MerkleWorker<F, R> {
fn name(&self) -> String {
- format!("{} Merkle tree updater", F::TABLE_NAME)
+ format!("{} Merkle", F::TABLE_NAME)
}
- fn info(&self) -> Option<String> {
- let l = self.0.todo_len().unwrap_or(0);
- if l > 0 {
- Some(format!("{} items in queue", l))
- } else {
- None
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ queue_length: Some(self.0.todo_len().unwrap_or(0) as u64),
+ ..Default::default()
}
}
@@ -337,11 +330,11 @@ where
.unwrap()
}
- async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
- if *must_exit.borrow() {
- return WorkerState::Done;
+ async fn wait_for_work(&mut self) -> WorkerState {
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs(60)) => (),
+ _ = self.0.data.merkle_todo_notify.notified() => (),
}
- tokio::time::sleep(Duration::from_secs(10)).await;
WorkerState::Busy
}
}
@@ -372,7 +365,7 @@ impl MerkleNode {
fn decode_opt(ent: &Option<db::Value>) -> Result<Self, Error> {
match ent {
None => Ok(MerkleNode::Empty),
- Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),
+ Some(v) => Ok(nonversioned_decode::<MerkleNode>(&v[..])?),
}
}
diff --git a/src/table/metrics.rs b/src/table/metrics.rs
index 3a1783e0..8318a84f 100644
--- a/src/table/metrics.rs
+++ b/src/table/metrics.rs
@@ -5,6 +5,8 @@ use garage_db::counted_tree_hack::CountedTree;
/// TableMetrics reference all counter used for metrics
pub struct TableMetrics {
+ pub(crate) _table_size: ValueObserver<u64>,
+ pub(crate) _merkle_tree_size: ValueObserver<u64>,
pub(crate) _merkle_todo_len: ValueObserver<u64>,
pub(crate) _gc_todo_len: ValueObserver<u64>,
@@ -20,9 +22,43 @@ pub struct TableMetrics {
pub(crate) sync_items_received: Counter<u64>,
}
impl TableMetrics {
- pub fn new(table_name: &'static str, merkle_todo: db::Tree, gc_todo: CountedTree) -> Self {
+ pub fn new(
+ table_name: &'static str,
+ store: db::Tree,
+ merkle_tree: db::Tree,
+ merkle_todo: db::Tree,
+ gc_todo: CountedTree,
+ ) -> Self {
let meter = global::meter(table_name);
TableMetrics {
+ _table_size: meter
+ .u64_value_observer(
+ "table.size",
+ move |observer| {
+ if let Ok(Some(v)) = store.fast_len() {
+ observer.observe(
+ v as u64,
+ &[KeyValue::new("table_name", table_name)],
+ );
+ }
+ },
+ )
+ .with_description("Number of items in table")
+ .init(),
+ _merkle_tree_size: meter
+ .u64_value_observer(
+ "table.merkle_tree_size",
+ move |observer| {
+ if let Ok(Some(v)) = merkle_tree.fast_len() {
+ observer.observe(
+ v as u64,
+ &[KeyValue::new("table_name", table_name)],
+ );
+ }
+ },
+ )
+ .with_description("Number of nodes in table's Merkle tree")
+ .init(),
_merkle_todo_len: meter
.u64_value_observer(
"table.merkle_updater_todo_queue_length",
diff --git a/src/table/queue.rs b/src/table/queue.rs
new file mode 100644
index 00000000..0857209b
--- /dev/null
+++ b/src/table/queue.rs
@@ -0,0 +1,77 @@
+use std::sync::Arc;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use tokio::select;
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::error::Error;
+
+use crate::replication::*;
+use crate::schema::*;
+use crate::table::*;
+
+const BATCH_SIZE: usize = 100;
+
+pub(crate) struct InsertQueueWorker<F, R>(pub(crate) Arc<Table<F, R>>)
+where
+ F: TableSchema,
+ R: TableReplication;
+
+#[async_trait]
+impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> {
+ fn name(&self) -> String {
+ format!("{} queue", F::TABLE_NAME)
+ }
+
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ queue_length: Some(self.0.data.insert_queue.len().unwrap_or(0) as u64),
+ ..Default::default()
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ let mut kv_pairs = vec![];
+ let mut values = vec![];
+
+ for entry_kv in self.0.data.insert_queue.iter()? {
+ let (k, v) = entry_kv?;
+
+ values.push(self.0.data.decode_entry(&v)?);
+ kv_pairs.push((k, v));
+
+ if kv_pairs.len() > BATCH_SIZE {
+ break;
+ }
+ }
+
+ if kv_pairs.is_empty() {
+ return Ok(WorkerState::Idle);
+ }
+
+ self.0.insert_many(values).await?;
+
+ self.0.data.insert_queue.db().transaction(|mut tx| {
+ for (k, v) in kv_pairs.iter() {
+ if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? {
+ if &v2 == v {
+ tx.remove(&self.0.data.insert_queue, k)?;
+ }
+ }
+ }
+ Ok(())
+ })?;
+
+ Ok(WorkerState::Busy)
+ }
+
+ async fn wait_for_work(&mut self) -> WorkerState {
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs(600)) => (),
+ _ = self.0.data.insert_queue_notify.notified() => (),
+ }
+ WorkerState::Busy
+ }
+}
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index 3740d947..f00815a2 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -2,7 +2,7 @@ use garage_rpc::ring::*;
use garage_util::data::*;
/// Trait to describe how a table shall be replicated
-pub trait TableReplication: Send + Sync {
+pub trait TableReplication: Send + Sync + 'static {
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
diff --git a/src/table/schema.rs b/src/table/schema.rs
index f37e98d8..5cbf6c95 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -2,11 +2,14 @@ use serde::{Deserialize, Serialize};
use garage_db as db;
use garage_util::data::*;
+use garage_util::migrate::Migrate;
use crate::crdt::Crdt;
/// Trait for field used to partition data
-pub trait PartitionKey {
+pub trait PartitionKey:
+ Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static
+{
/// Get the key used to partition
fn hash(&self) -> Hash;
}
@@ -27,7 +30,7 @@ impl PartitionKey for FixedBytes32 {
}
/// Trait for field used to sort data
-pub trait SortKey {
+pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
/// Get the key used to sort
fn sort_key(&self) -> &[u8];
}
@@ -46,7 +49,7 @@ impl SortKey for FixedBytes32 {
/// Trait for an entry in a table. It must be sortable and partitionnable.
pub trait Entry<P: PartitionKey, S: SortKey>:
- Crdt + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
+ Crdt + PartialEq + Clone + Migrate + Send + Sync + 'static
{
/// Get the key used to partition
fn partition_key(&self) -> &P;
@@ -65,23 +68,16 @@ pub trait TableSchema: Send + Sync + 'static {
const TABLE_NAME: &'static str;
/// The partition key used in that table
- type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ type P: PartitionKey;
/// The sort key used int that table
- type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ type S: SortKey;
/// They type for an entry in that table
type E: Entry<Self::P, Self::S>;
/// The type for a filter that can be applied to select entries
/// (e.g. filter out deleted entries)
- type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
-
- // Action to take if not able to decode current version:
- // try loading from an older version
- /// Try migrating an entry from an older version
- fn try_migrate(_bytes: &[u8]) -> Option<Self::E> {
- None
- }
+ type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static;
/// Actions triggered by data changing in a table. If such actions
/// include updates to the local database that should be applied
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 9d79d856..92a353c6 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -2,6 +2,7 @@ use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant};
+use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use futures_util::stream::*;
use opentelemetry::KeyValue;
@@ -13,7 +14,8 @@ use tokio::sync::{mpsc, watch};
use garage_util::background::*;
use garage_util::data::*;
-use garage_util::error::Error;
+use garage_util::encode::{debug_serialize, nonversioned_encode};
+use garage_util::error::{Error, OkOrMessage};
use garage_rpc::ring::*;
use garage_rpc::system::System;
@@ -27,12 +29,12 @@ use crate::*;
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
-pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> {
+pub struct TableSyncer<F: TableSchema, R: TableReplication> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
- add_full_sync_tx: mpsc::UnboundedSender<()>,
+ add_full_sync_tx: ArcSwapOption<mpsc::UnboundedSender<()>>,
endpoint: Arc<Endpoint<SyncRpc, Self>>,
}
@@ -60,12 +62,8 @@ struct TodoPartition {
retain: bool,
}
-impl<F, R> TableSyncer<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- pub(crate) fn launch(
+impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
+ pub(crate) fn new(
system: Arc<System>,
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
@@ -74,34 +72,40 @@ where
.netapp
.endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME));
- let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
-
let syncer = Arc::new(Self {
- system: system.clone(),
+ system,
data,
merkle,
- add_full_sync_tx,
+ add_full_sync_tx: ArcSwapOption::new(None),
endpoint,
});
-
syncer.endpoint.set_handler(syncer.clone());
- system.background.spawn_worker(SyncWorker {
- syncer: syncer.clone(),
- ring_recv: system.ring.clone(),
- ring: system.ring.borrow().clone(),
+ syncer
+ }
+
+ pub(crate) fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
+ let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
+ self.add_full_sync_tx
+ .store(Some(Arc::new(add_full_sync_tx)));
+
+ bg.spawn_worker(SyncWorker {
+ syncer: self.clone(),
+ ring_recv: self.system.ring.clone(),
+ ring: self.system.ring.borrow().clone(),
add_full_sync_rx,
todo: vec![],
next_full_sync: Instant::now() + Duration::from_secs(20),
});
-
- syncer
}
- pub fn add_full_sync(&self) {
- if self.add_full_sync_tx.send(()).is_err() {
- error!("({}) Could not add full sync", F::TABLE_NAME);
- }
+ pub fn add_full_sync(&self) -> Result<(), Error> {
+ let tx = self.add_full_sync_tx.load();
+ let tx = tx
+ .as_ref()
+ .ok_or_message("table sync worker is not running")?;
+ tx.send(()).ok_or_message("send error")?;
+ Ok(())
}
// ----
@@ -295,7 +299,7 @@ where
);
return Ok(());
}
- let root_ck_hash = hash_of::<MerkleNode>(&root_ck)?;
+ let root_ck_hash = hash_of_merkle_node(&root_ck)?;
// Check if they have the same root checksum
// If so, do nothing.
@@ -452,16 +456,12 @@ where
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
#[async_trait]
-impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> {
async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> {
match message {
SyncRpc::RootCkHash(range, h) => {
let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
- let hash = hash_of::<MerkleNode>(&root_ck)?;
+ let hash = hash_of_merkle_node(&root_ck)?;
Ok(SyncRpc::RootCkDifferent(hash != *h))
}
SyncRpc::GetNode(k) => {
@@ -490,7 +490,7 @@ where
// -------- Sync Worker ---------
-struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
+struct SyncWorker<F: TableSchema, R: TableReplication> {
syncer: Arc<TableSyncer<F, R>>,
ring_recv: watch::Receiver<Arc<Ring>>,
ring: Arc<Ring>,
@@ -499,7 +499,7 @@ struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
next_full_sync: Instant,
}
-impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
+impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> {
fn add_full_sync(&mut self) {
let system = &self.syncer.system;
let data = &self.syncer.data;
@@ -565,17 +565,15 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
}
#[async_trait]
-impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> {
+impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
fn name(&self) -> String {
format!("{} sync", F::TABLE_NAME)
}
- fn info(&self) -> Option<String> {
- let l = self.todo.len();
- if l > 0 {
- Some(format!("{} partitions remaining", l))
- } else {
- None
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ queue_length: Some(self.todo.len() as u64),
+ ..Default::default()
}
}
@@ -588,10 +586,7 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor
}
}
- async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
- if *must_exit.borrow() {
- return WorkerState::Done;
- }
+ async fn wait_for_work(&mut self) -> WorkerState {
select! {
s = self.add_full_sync_rx.recv() => {
if let Some(()) = s {
@@ -620,8 +615,8 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor
// ---- UTIL ----
-fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
- Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
+fn hash_of_merkle_node(x: &MerkleNode) -> Result<Hash, Error> {
+ Ok(blake2sum(&nonversioned_encode(x)?[..]))
}
fn join_ordered<'a, K: Ord + Eq, V1, V2>(
diff --git a/src/table/table.rs b/src/table/table.rs
index 8a66c420..7ad79677 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -14,9 +14,11 @@ use opentelemetry::{
use garage_db as db;
+use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
+use garage_util::migrate::Migrate;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -25,16 +27,18 @@ use crate::crdt::Crdt;
use crate::data::*;
use crate::gc::*;
use crate::merkle::*;
+use crate::queue::InsertQueueWorker;
use crate::replication::*;
use crate::schema::*;
use crate::sync::*;
use crate::util::*;
-pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
+pub struct Table<F: TableSchema, R: TableReplication> {
pub system: Arc<System>,
pub data: Arc<TableData<F, R>>,
pub merkle_updater: Arc<MerkleUpdater<F, R>>,
pub syncer: Arc<TableSyncer<F, R>>,
+ gc: Arc<TableGc<F, R>>,
endpoint: Arc<Endpoint<TableRpc<F>, Self>>,
}
@@ -61,11 +65,7 @@ impl<F: TableSchema> Rpc for TableRpc<F> {
type Response = Result<TableRpc<F>, Error>;
}
-impl<F, R> Table<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> Table<F, R> {
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> {
@@ -75,15 +75,16 @@ where
let data = TableData::new(system.clone(), instance, replication, db);
- let merkle_updater = MerkleUpdater::launch(&system.background, data.clone());
+ let merkle_updater = MerkleUpdater::new(data.clone());
- let syncer = TableSyncer::launch(system.clone(), data.clone(), merkle_updater.clone());
- TableGc::launch(system.clone(), data.clone());
+ let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone());
+ let gc = TableGc::new(system.clone(), data.clone());
let table = Arc::new(Self {
system,
data,
merkle_updater,
+ gc,
syncer,
endpoint,
});
@@ -93,6 +94,13 @@ where
table
}
+ pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
+ self.merkle_updater.spawn_workers(bg);
+ self.syncer.spawn_workers(bg);
+ self.gc.spawn_workers(bg);
+ bg.spawn_worker(InsertQueueWorker(self.clone()));
+ }
+
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} insert", F::TABLE_NAME));
@@ -111,7 +119,7 @@ where
let hash = e.partition_key().hash();
let who = self.data.replication.write_nodes(&hash);
- let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
+ let e_enc = Arc::new(ByteBuf::from(e.encode()?));
let rpc = TableRpc::<F>::Update(vec![e_enc]);
self.system
@@ -128,6 +136,11 @@ where
Ok(())
}
+ /// Insert item locally
+ pub fn queue_insert(&self, tx: &mut db::Transaction, e: &F::E) -> db::TxResult<(), Error> {
+ self.data.queue_insert(tx, e)
+ }
+
pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error>
where
I: IntoIterator<Item = IE> + Send + Sync,
@@ -157,7 +170,7 @@ where
let entry = entry.borrow();
let hash = entry.partition_key().hash();
let who = self.data.replication.write_nodes(&hash);
- let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
+ let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
for node in who {
call_list.entry(node).or_default().push(e_enc.clone());
}
@@ -259,9 +272,11 @@ where
if not_all_same {
let self2 = self.clone();
let ent2 = ret_entry.clone();
- self.system
- .background
- .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
+ tokio::spawn(async move {
+ if let Err(e) = self2.repair_on_read(&who[..], ent2).await {
+ warn!("Error doing repair on read: {}", e);
+ }
+ });
}
}
@@ -358,11 +373,12 @@ where
.into_iter()
.map(|k| ret.get(&k).unwrap().clone())
.collect::<Vec<_>>();
- self.system.background.spawn_cancellable(async move {
+ tokio::spawn(async move {
for v in to_repair {
- self2.repair_on_read(&who[..], v).await?;
+ if let Err(e) = self2.repair_on_read(&who[..], v).await {
+ warn!("Error doing repair on read: {}", e);
+ }
}
- Ok(())
});
}
@@ -393,7 +409,7 @@ where
// =============== UTILITY FUNCTION FOR CLIENT OPERATIONS ===============
async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {
- let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
+ let what_enc = Arc::new(ByteBuf::from(what.encode()?));
self.system
.rpc
.try_call_many(
@@ -408,11 +424,7 @@ where
}
#[async_trait]
-impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table<F, R> {
async fn handle(
self: &Arc<Self>,
msg: &TableRpc<F>,
diff --git a/src/table/util.rs b/src/table/util.rs
index 20595a94..0b10cf3f 100644
--- a/src/table/util.rs
+++ b/src/table/util.rs
@@ -49,3 +49,9 @@ impl EnumerationOrder {
}
}
}
+
+impl Default for EnumerationOrder {
+ fn default() -> Self {
+ EnumerationOrder::Forward
+ }
+}