aboutsummaryrefslogtreecommitdiff
path: root/src/model/index_counter.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2023-01-03 15:28:24 +0000
committerAlex <alex@adnab.me>2023-01-03 15:28:24 +0000
commit73ed9c74039448c69ebe382e361acf3ecbfef70b (patch)
tree7fb21a559e53557d5dea5efd2b7dafe9f9751367 /src/model/index_counter.rs
parent582b0761790b7958a3ba10c4b549b466997d2dcd (diff)
parent1d5bdc17a46648eb3494ff629d0d360d0217c1e2 (diff)
downloadgarage-73ed9c74039448c69ebe382e361acf3ecbfef70b.tar.gz
garage-73ed9c74039448c69ebe382e361acf3ecbfef70b.zip
Merge pull request 'Refactor how things are migrated' (#461) from format-migration into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/461
Diffstat (limited to 'src/model/index_counter.rs')
-rw-r--r--src/model/index_counter.rs80
1 files changed, 48 insertions, 32 deletions
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 6303ea3e..35d6596d 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -12,6 +12,7 @@ use garage_rpc::system::System;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::migrate::Migrate;
use garage_util::time::*;
use garage_table::crdt::*;
@@ -29,14 +30,44 @@ pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static {
fn counts(&self) -> Vec<(&'static str, i64)>;
}
-/// A counter entry in the global table
-#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
-pub struct CounterEntry<T: CountedItem> {
- pub pk: T::CP,
- pub sk: T::CS,
- pub values: BTreeMap<String, CounterValue>,
+mod v08 {
+ use super::CountedItem;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+ use std::collections::BTreeMap;
+
+ // ---- Global part (the table everyone queries) ----
+
+ /// A counter entry in the global table
+ #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
+ pub struct CounterEntry<T: CountedItem> {
+ pub pk: T::CP,
+ pub sk: T::CS,
+ pub values: BTreeMap<String, CounterValue>,
+ }
+
+ /// A counter entry in the global table
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct CounterValue {
+ pub node_values: BTreeMap<Uuid, (u64, i64)>,
+ }
+
+ impl<T: CountedItem> garage_util::migrate::InitialFormat for CounterEntry<T> {}
+
+ // ---- Local part (the counter we maintain transactionnaly on each node) ----
+
+ #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+ pub(super) struct LocalCounterEntry<T: CountedItem> {
+ pub(super) pk: T::CP,
+ pub(super) sk: T::CS,
+ pub(super) values: BTreeMap<String, (u64, i64)>,
+ }
+
+ impl<T: CountedItem> garage_util::migrate::InitialFormat for LocalCounterEntry<T> {}
}
+pub use v08::*;
+
impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
fn partition_key(&self) -> &T::CP {
&self.pk
@@ -78,12 +109,6 @@ impl<T: CountedItem> CounterEntry<T> {
}
}
-/// A counter entry in the global table
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct CounterValue {
- pub node_values: BTreeMap<Uuid, (u64, i64)>,
-}
-
impl<T: CountedItem> Crdt for CounterEntry<T> {
fn merge(&mut self, other: &Self) {
for (name, e2) in other.values.iter() {
@@ -195,11 +220,9 @@ impl<T: CountedItem> IndexCounter<T> {
let tree_key = self.table.data.tree_key(pk, sk);
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
- Some(old_bytes) => {
- rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes)
- .map_err(Error::RmpDecode)
- .map_err(db::TxError::Abort)?
- }
+ Some(old_bytes) => LocalCounterEntry::<T>::decode(&old_bytes)
+ .ok_or_message("Cannot decode local counter entry")
+ .map_err(db::TxError::Abort)?,
None => LocalCounterEntry {
pk: pk.clone(),
sk: sk.clone(),
@@ -214,7 +237,8 @@ impl<T: CountedItem> IndexCounter<T> {
ent.1 += *inc;
}
- let new_entry_bytes = rmp_to_vec_all_named(&entry)
+ let new_entry_bytes = entry
+ .encode()
.map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?;
tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?;
@@ -255,15 +279,15 @@ impl<T: CountedItem> IndexCounter<T> {
info!("zeroing old counters... ({})", hex::encode(&batch[0].0));
for (local_counter_k, local_counter) in batch {
- let mut local_counter =
- rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?;
+ let mut local_counter = LocalCounterEntry::<T>::decode(&local_counter)
+ .ok_or_message("Cannot decode local counter entry")?;
for (_, tv) in local_counter.values.iter_mut() {
tv.0 = std::cmp::max(tv.0 + 1, now);
tv.1 = 0;
}
- let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
+ let local_counter_bytes = local_counter.encode()?;
self.local_counter
.insert(&local_counter_k, &local_counter_bytes)?;
@@ -311,9 +335,8 @@ impl<T: CountedItem> IndexCounter<T> {
let local_counter_key = self.table.data.tree_key(pk, sk);
let mut local_counter = match self.local_counter.get(&local_counter_key)? {
Some(old_bytes) => {
- let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(
- &old_bytes,
- )?;
+ let ent = LocalCounterEntry::<T>::decode(&old_bytes)
+ .ok_or_message("Cannot decode local counter entry")?;
assert!(ent.pk == *pk);
assert!(ent.sk == *sk);
ent
@@ -330,7 +353,7 @@ impl<T: CountedItem> IndexCounter<T> {
tv.1 += v;
}
- let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
+ let local_counter_bytes = local_counter.encode()?;
self.local_counter
.insert(&local_counter_key, local_counter_bytes)?;
@@ -350,13 +373,6 @@ impl<T: CountedItem> IndexCounter<T> {
// ----
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
-struct LocalCounterEntry<T: CountedItem> {
- pk: T::CP,
- sk: T::CS,
- values: BTreeMap<String, (u64, i64)>,
-}
-
impl<T: CountedItem> LocalCounterEntry<T> {
fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> {
CounterEntry {