aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-18 19:27:02 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-18 19:27:02 +0100
commit4348bde180887f5185ca6da6024476e8e8fb2fe6 (patch)
tree8a35f0e4229d15665af7673eebcaa1b3d75a73cb /src
parent5b659b28ce6bef15072d2fc93f777aa8ff73b2d8 (diff)
parent4eb16e886388f35d2bdee52b16922421004cf132 (diff)
downloadgarage-4348bde180887f5185ca6da6024476e8e8fb2fe6.tar.gz
garage-4348bde180887f5185ca6da6024476e8e8fb2fe6.zip
Merge branch 'dev-0.2'
Diffstat (limited to 'src')
-rw-r--r--src/api/Cargo.toml21
-rw-r--r--src/api/api_server.rs5
-rw-r--r--src/api/error.rs8
-rw-r--r--src/api/s3_copy.rs106
-rw-r--r--src/api/s3_delete.rs11
-rw-r--r--src/api/s3_get.rs12
-rw-r--r--src/api/s3_list.rs8
-rw-r--r--src/api/s3_put.rs207
-rw-r--r--src/api/signature.rs36
-rw-r--r--src/garage/Cargo.toml13
-rw-r--r--src/garage/admin_rpc.rs189
-rw-r--r--src/garage/cli.rs552
-rw-r--r--src/garage/main.rs512
-rw-r--r--src/garage/repair.rs83
-rw-r--r--src/garage/server.rs63
-rw-r--r--src/model/Cargo.toml15
-rw-r--r--src/model/block.rs241
-rw-r--r--src/model/block_ref_table.rs20
-rw-r--r--src/model/bucket_table.rs42
-rw-r--r--src/model/garage.rs31
-rw-r--r--src/model/key_table.rs65
-rw-r--r--src/model/object_table.rs97
-rw-r--r--src/model/version_table.rs107
-rw-r--r--src/rpc/Cargo.toml21
-rw-r--r--src/rpc/membership.rs49
-rw-r--r--src/rpc/ring.rs40
-rw-r--r--src/rpc/rpc_client.rs8
-rw-r--r--src/rpc/rpc_server.rs14
-rw-r--r--src/table/Cargo.toml11
-rw-r--r--src/table/crdt.rs327
-rw-r--r--src/table/crdt/bool.rs34
-rw-r--r--src/table/crdt/crdt.rs73
-rw-r--r--src/table/crdt/lww.rs114
-rw-r--r--src/table/crdt/lww_map.rs145
-rw-r--r--src/table/crdt/map.rs83
-rw-r--r--src/table/crdt/mod.rs22
-rw-r--r--src/table/data.rs254
-rw-r--r--src/table/gc.rs248
-rw-r--r--src/table/lib.rs8
-rw-r--r--src/table/merkle.rs454
-rw-r--r--src/table/replication/fullcopy.rs51
-rw-r--r--src/table/replication/mod.rs6
-rw-r--r--src/table/replication/parameters.rs21
-rw-r--r--src/table/replication/sharded.rs (renamed from src/table/table_sharded.rs)31
-rw-r--r--src/table/schema.rs10
-rw-r--r--src/table/sync.rs614
-rw-r--r--src/table/table.rs229
-rw-r--r--src/table/table_fullcopy.rs59
-rw-r--r--src/table/table_sync.rs891
-rw-r--r--src/util/Cargo.toml21
-rw-r--r--src/util/background.rs185
-rw-r--r--src/util/data.rs20
-rw-r--r--src/util/error.rs8
-rw-r--r--src/util/lib.rs1
-rw-r--r--src/util/time.rs16
-rw-r--r--src/web/Cargo.toml5
56 files changed, 3723 insertions, 2794 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index 45388eff..bce9946e 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -17,26 +17,25 @@ garage_util = { version = "0.1.1", path = "../util" }
garage_table = { version = "0.1.1", path = "../table" }
garage_model = { version = "0.1.1", path = "../model" }
-err-derive = "0.2.3"
-bytes = "0.4"
-hex = "0.3"
+err-derive = "0.3"
+bytes = "1.0"
+hex = "0.4"
base64 = "0.13"
log = "0.4"
chrono = "0.4"
-md-5 = "0.9.1"
-sha2 = "0.8"
-hmac = "0.7"
-crypto-mac = "0.7"
-rand = "0.7"
+md-5 = "0.9"
+sha2 = "0.9"
+hmac = "0.10"
+crypto-mac = "0.10"
futures = "0.3"
futures-util = "0.3"
-tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
http = "0.2"
-hyper = "^0.13.6"
+hyper = "0.14"
url = "2.1"
httpdate = "0.3"
percent-encoding = "2.1.0"
-roxmltree = "0.11"
+roxmltree = "0.14"
http-range = "0.1"
diff --git a/src/api/api_server.rs b/src/api/api_server.rs
index c6b1d483..bc98686d 100644
--- a/src/api/api_server.rs
+++ b/src/api/api_server.rs
@@ -137,7 +137,10 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
)));
}
let source_key = source_key.ok_or_bad_request("No source key specified")?;
- Ok(handle_copy(garage, &bucket, &key, &source_bucket, &source_key).await?)
+ Ok(
+ handle_copy(garage, &req, &bucket, &key, &source_bucket, &source_key)
+ .await?,
+ )
} else {
// PutObject query
Ok(handle_put(garage, req, &bucket, &key, content_sha256).await?)
diff --git a/src/api/error.rs b/src/api/error.rs
index a1681fc3..42a7ab10 100644
--- a/src/api/error.rs
+++ b/src/api/error.rs
@@ -33,7 +33,7 @@ pub enum Error {
InvalidBase64(#[error(source)] base64::DecodeError),
#[error(display = "Invalid XML: {}", _0)]
- InvalidXML(#[error(source)] roxmltree::Error),
+ InvalidXML(String),
#[error(display = "Invalid header value: {}", _0)]
InvalidHeader(#[error(source)] hyper::header::ToStrError),
@@ -45,6 +45,12 @@ pub enum Error {
BadRequest(String),
}
+impl From<roxmltree::Error> for Error {
+ fn from(err: roxmltree::Error) -> Self {
+ Self::InvalidXML(format!("{}", err))
+ }
+}
+
impl Error {
pub fn http_status_code(&self) -> StatusCode {
match self {
diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs
index b6ec48b0..187fe347 100644
--- a/src/api/s3_copy.rs
+++ b/src/api/s3_copy.rs
@@ -1,11 +1,11 @@
use std::fmt::Write;
use std::sync::Arc;
-use chrono::{SecondsFormat, Utc};
-use hyper::{Body, Response};
+use hyper::{Body, Request, Response};
use garage_table::*;
use garage_util::data::*;
+use garage_util::time::*;
use garage_model::block_ref_table::*;
use garage_model::garage::Garage;
@@ -13,9 +13,11 @@ use garage_model::object_table::*;
use garage_model::version_table::*;
use crate::error::*;
+use crate::s3_put::get_headers;
pub async fn handle_copy(
garage: Arc<Garage>,
+ req: &Request<Body>,
dest_bucket: &str,
dest_key: &str,
source_bucket: &str,
@@ -41,17 +43,37 @@ pub async fn handle_copy(
};
let new_uuid = gen_uuid();
- let dest_object_version = ObjectVersion {
- uuid: new_uuid,
- timestamp: now_msec(),
- state: ObjectVersionState::Complete(source_last_state.clone()),
- };
+ let new_timestamp = now_msec();
- match &source_last_state {
+ // Implement x-amz-metadata-directive: REPLACE
+ let old_meta = match source_last_state {
ObjectVersionData::DeleteMarker => {
return Err(Error::NotFound);
}
- ObjectVersionData::Inline(_meta, _bytes) => {
+ ObjectVersionData::Inline(meta, _bytes) => meta,
+ ObjectVersionData::FirstBlock(meta, _fbh) => meta,
+ };
+ let new_meta = match req.headers().get("x-amz-metadata-directive") {
+ Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta {
+ headers: get_headers(req)?,
+ size: old_meta.size,
+ etag: old_meta.etag.clone(),
+ },
+ _ => old_meta.clone(),
+ };
+
+ // Save object copy
+ match source_last_state {
+ ObjectVersionData::DeleteMarker => unreachable!(),
+ ObjectVersionData::Inline(_meta, bytes) => {
+ let dest_object_version = ObjectVersion {
+ uuid: new_uuid,
+ timestamp: new_timestamp,
+ state: ObjectVersionState::Complete(ObjectVersionData::Inline(
+ new_meta,
+ bytes.clone(),
+ )),
+ };
let dest_object = Object::new(
dest_bucket.to_string(),
dest_key.to_string(),
@@ -59,44 +81,84 @@ pub async fn handle_copy(
);
garage.object_table.insert(&dest_object).await?;
}
- ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
+ ObjectVersionData::FirstBlock(_meta, first_block_hash) => {
+ // Get block list from source version
let source_version = garage
.version_table
.get(&source_last_v.uuid, &EmptyKey)
.await?;
let source_version = source_version.ok_or(Error::NotFound)?;
- let dest_version = Version::new(
- new_uuid,
+ // Write an "uploading" marker in Object table
+ // This holds a reference to the object in the Version table
+ // so that it won't be deleted, e.g. by repair_versions.
+ let tmp_dest_object_version = ObjectVersion {
+ uuid: new_uuid,
+ timestamp: new_timestamp,
+ state: ObjectVersionState::Uploading(new_meta.headers.clone()),
+ };
+ let tmp_dest_object = Object::new(
dest_bucket.to_string(),
dest_key.to_string(),
- false,
- source_version.blocks().to_vec(),
+ vec![tmp_dest_object_version],
);
- let dest_object = Object::new(
+ garage.object_table.insert(&tmp_dest_object).await?;
+
+ // Write version in the version table. Even with empty block list,
+ // this means that the BlockRef entries linked to this version cannot be
+ // marked as deleted (they are marked as deleted only if the Version
+ // doesn't exist or is marked as deleted).
+ let mut dest_version = Version::new(
+ new_uuid,
dest_bucket.to_string(),
dest_key.to_string(),
- vec![dest_object_version],
+ false,
);
+ garage.version_table.insert(&dest_version).await?;
+
+ // Fill in block list for version and insert block refs
+ for (bk, bv) in source_version.blocks.items().iter() {
+ dest_version.blocks.put(*bk, *bv);
+ }
let dest_block_refs = dest_version
- .blocks()
+ .blocks
+ .items()
.iter()
.map(|b| BlockRef {
- block: b.hash,
+ block: b.1.hash,
version: new_uuid,
- deleted: false,
+ deleted: false.into(),
})
.collect::<Vec<_>>();
futures::try_join!(
- garage.object_table.insert(&dest_object),
garage.version_table.insert(&dest_version),
garage.block_ref_table.insert_many(&dest_block_refs[..]),
)?;
+
+ // Insert final object
+ // We do this last because otherwise there is a race condition in the case where
+ // the copy call has the same source and destination (this happens, rclone does
+ // it to update the modification timestamp for instance). If we did this concurrently
+ // with the stuff before, the block's reference counts could be decremented before
+ // they are incremented again for the new version, leading to data being deleted.
+ let dest_object_version = ObjectVersion {
+ uuid: new_uuid,
+ timestamp: new_timestamp,
+ state: ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
+ new_meta,
+ *first_block_hash,
+ )),
+ };
+ let dest_object = Object::new(
+ dest_bucket.to_string(),
+ dest_key.to_string(),
+ vec![dest_object_version],
+ );
+ garage.object_table.insert(&dest_object).await?;
}
}
- let now = Utc::now();
- let last_modified = now.to_rfc3339_opts(SecondsFormat::Secs, true);
+ let last_modified = msec_to_rfc3339(new_timestamp);
let mut xml = String::new();
writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap();
writeln!(&mut xml, r#"<CopyObjectResult>"#).unwrap();
diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs
index 4b6a2b18..6abbfc48 100644
--- a/src/api/s3_delete.rs
+++ b/src/api/s3_delete.rs
@@ -4,6 +4,7 @@ use std::sync::Arc;
use hyper::{Body, Request, Response};
use garage_util::data::*;
+use garage_util::time::*;
use garage_model::garage::Garage;
use garage_model::object_table::*;
@@ -29,16 +30,16 @@ async fn handle_delete_internal(
_ => true,
});
- let mut must_delete = None;
+ let mut version_to_delete = None;
let mut timestamp = now_msec();
for v in interesting_versions {
- if v.timestamp + 1 > timestamp || must_delete.is_none() {
- must_delete = Some(v.uuid);
+ if v.timestamp + 1 > timestamp || version_to_delete.is_none() {
+ version_to_delete = Some(v.uuid);
}
timestamp = std::cmp::max(timestamp, v.timestamp + 1);
}
- let deleted_version = must_delete.ok_or(Error::NotFound)?;
+ let deleted_version = version_to_delete.ok_or(Error::NotFound)?;
let version_uuid = gen_uuid();
@@ -47,7 +48,7 @@ async fn handle_delete_internal(
key.into(),
vec![ObjectVersion {
uuid: version_uuid,
- timestamp: now_msec(),
+ timestamp,
state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
}],
);
diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs
index 68e7c66a..22a55b55 100644
--- a/src/api/s3_get.rs
+++ b/src/api/s3_get.rs
@@ -146,9 +146,10 @@ pub async fn handle_get(
let version = version.ok_or(Error::NotFound)?;
let mut blocks = version
- .blocks()
+ .blocks
+ .items()
.iter()
- .map(|vb| (vb.hash, None))
+ .map(|(_, vb)| (vb.hash, None))
.collect::<Vec<_>>();
blocks[0].1 = Some(first_block);
@@ -219,11 +220,12 @@ pub async fn handle_get_range(
// file (whereas block.offset designates the offset of the block WITHIN THE PART
// block.part_number, which is not the same in the case of a multipart upload)
let mut blocks = Vec::with_capacity(std::cmp::min(
- version.blocks().len(),
- 4 + ((end - begin) / std::cmp::max(version.blocks()[0].size as u64, 1024)) as usize,
+ version.blocks.len(),
+ 4 + ((end - begin) / std::cmp::max(version.blocks.items()[0].1.size as u64, 1024))
+ as usize,
));
let mut true_offset = 0;
- for b in version.blocks().iter() {
+ for (_, b) in version.blocks.items().iter() {
if true_offset >= end {
break;
}
diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs
index 98d774db..4d6c32bc 100644
--- a/src/api/s3_list.rs
+++ b/src/api/s3_list.rs
@@ -2,10 +2,10 @@ use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fmt::Write;
use std::sync::Arc;
-use chrono::{DateTime, NaiveDateTime, SecondsFormat, Utc};
use hyper::{Body, Response};
use garage_util::error::Error as GarageError;
+use garage_util::time::*;
use garage_model::garage::Garage;
use garage_model::object_table::*;
@@ -42,7 +42,7 @@ pub fn parse_list_objects_query(
Ok(ListObjectsQuery {
is_v2: params.get("list-type").map(|x| x == "2").unwrap_or(false),
bucket: bucket.to_string(),
- delimiter: params.get("delimiter").cloned(),
+ delimiter: params.get("delimiter").filter(|x| !x.is_empty()).cloned(),
max_keys: params
.get("max-keys")
.map(|x| {
@@ -247,9 +247,7 @@ pub async fn handle_list(
}
for (key, info) in result_keys.iter() {
- let last_modif = NaiveDateTime::from_timestamp(info.last_modified as i64 / 1000, 0);
- let last_modif = DateTime::<Utc>::from_utc(last_modif, Utc);
- let last_modif = last_modif.to_rfc3339_opts(SecondsFormat::Millis, true);
+ let last_modif = msec_to_rfc3339(info.last_modified);
writeln!(&mut xml, "\t<Contents>").unwrap();
writeln!(
&mut xml,
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index ec599a05..c4e3b818 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -5,11 +5,12 @@ use std::sync::Arc;
use futures::stream::*;
use hyper::{Body, Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
-use sha2::{Digest as Sha256Digest, Sha256};
+use sha2::Sha256;
use garage_table::*;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
+use garage_util::time::*;
use garage_model::block::INLINE_THRESHOLD;
use garage_model::block_ref_table::*;
@@ -52,14 +53,14 @@ pub async fn handle_put(
if first_block.len() < INLINE_THRESHOLD {
let mut md5sum = Md5::new();
md5sum.update(&first_block[..]);
- let md5sum_arr = md5sum.finalize();
- let md5sum_hex = hex::encode(md5sum_arr);
+ let data_md5sum = md5sum.finalize();
+ let data_md5sum_hex = hex::encode(data_md5sum);
- let sha256sum_hash = sha256sum(&first_block[..]);
+ let data_sha256sum = sha256sum(&first_block[..]);
ensure_checksum_matches(
- md5sum_arr.as_slice(),
- sha256sum_hash,
+ data_md5sum.as_slice(),
+ data_sha256sum,
content_md5.as_deref(),
content_sha256,
)?;
@@ -71,7 +72,7 @@ pub async fn handle_put(
ObjectVersionMeta {
headers,
size: first_block.len() as u64,
- etag: md5sum_hex.clone(),
+ etag: data_md5sum_hex.clone(),
},
first_block,
)),
@@ -80,41 +81,45 @@ pub async fn handle_put(
let object = Object::new(bucket.into(), key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
- return Ok(put_response(version_uuid, md5sum_hex));
+ return Ok(put_response(version_uuid, data_md5sum_hex));
}
// Write version identifier in object table so that we have a trace
// that we are uploading something
let mut object_version = ObjectVersion {
uuid: version_uuid,
- timestamp: now_msec(),
+ timestamp: version_timestamp,
state: ObjectVersionState::Uploading(headers.clone()),
};
let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
// Initialize corresponding entry in version table
- let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]);
- let first_block_hash = sha256sum(&first_block[..]);
+ // Write this entry now, even with empty block list,
+ // to prevent block_ref entries from being deleted (they can be deleted
+ // if the reference a version that isn't found in the version table)
+ let version = Version::new(version_uuid, bucket.into(), key.into(), false);
+ garage.version_table.insert(&version).await?;
// Transfer data and verify checksum
+ let first_block_hash = blake2sum(&first_block[..]);
let tx_result = read_and_put_blocks(
&garage,
- version,
+ &version,
1,
first_block,
first_block_hash,
&mut chunker,
)
.await
- .and_then(|(total_size, md5sum_arr, sha256sum)| {
+ .and_then(|(total_size, data_md5sum, data_sha256sum)| {
ensure_checksum_matches(
- md5sum_arr.as_slice(),
- sha256sum,
+ data_md5sum.as_slice(),
+ data_sha256sum,
content_md5.as_deref(),
content_sha256,
)
- .map(|()| (total_size, md5sum_arr))
+ .map(|()| (total_size, data_md5sum))
});
// If something went wrong, clean up
@@ -148,13 +153,13 @@ pub async fn handle_put(
/// Validate MD5 sum against content-md5 header
/// and sha256sum against signed content-sha256
fn ensure_checksum_matches(
- md5sum: &[u8],
- sha256sum: garage_util::data::FixedBytes32,
+ data_md5sum: &[u8],
+ data_sha256sum: garage_util::data::FixedBytes32,
content_md5: Option<&str>,
content_sha256: Option<garage_util::data::FixedBytes32>,
) -> Result<(), Error> {
if let Some(expected_sha256) = content_sha256 {
- if expected_sha256 != sha256sum {
+ if expected_sha256 != data_sha256sum {
return Err(Error::BadRequest(format!(
"Unable to validate x-amz-content-sha256"
)));
@@ -163,7 +168,7 @@ fn ensure_checksum_matches(
}
}
if let Some(expected_md5) = content_md5 {
- if expected_md5.trim_matches('"') != base64::encode(md5sum) {
+ if expected_md5.trim_matches('"') != base64::encode(data_md5sum) {
return Err(Error::BadRequest(format!("Unable to validate content-md5")));
} else {
trace!("Successfully validated content-md5");
@@ -173,8 +178,8 @@ fn ensure_checksum_matches(
}
async fn read_and_put_blocks(
- garage: &Arc<Garage>,
- version: Version,
+ garage: &Garage,
+ version: &Version,
part_number: u64,
first_block: Vec<u8>,
first_block_hash: Hash,
@@ -183,11 +188,11 @@ async fn read_and_put_blocks(
let mut md5hasher = Md5::new();
let mut sha256hasher = Sha256::new();
md5hasher.update(&first_block[..]);
- sha256hasher.input(&first_block[..]);
+ sha256hasher.update(&first_block[..]);
let mut next_offset = first_block.len();
let mut put_curr_version_block = put_block_meta(
- garage.clone(),
+ &garage,
&version,
part_number,
0,
@@ -203,11 +208,11 @@ async fn read_and_put_blocks(
futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
if let Some(block) = next_block {
md5hasher.update(&block[..]);
- sha256hasher.input(&block[..]);
- let block_hash = sha256sum(&block[..]);
+ sha256hasher.update(&block[..]);
+ let block_hash = blake2sum(&block[..]);
let block_len = block.len();
put_curr_version_block = put_block_meta(
- garage.clone(),
+ &garage,
&version,
part_number,
next_offset as u64,
@@ -222,39 +227,35 @@ async fn read_and_put_blocks(
}
let total_size = next_offset as u64;
- let md5sum_arr = md5hasher.finalize();
+ let data_md5sum = md5hasher.finalize();
- let sha256sum_arr = sha256hasher.result();
- let mut hash = [0u8; 32];
- hash.copy_from_slice(&sha256sum_arr[..]);
- let sha256sum_arr = Hash::from(hash);
+ let data_sha256sum = sha256hasher.finalize();
+ let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
- Ok((total_size, md5sum_arr, sha256sum_arr))
+ Ok((total_size, data_md5sum, data_sha256sum))
}
async fn put_block_meta(
- garage: Arc<Garage>,
+ garage: &Garage,
version: &Version,
part_number: u64,
offset: u64,
hash: Hash,
size: u64,
) -> Result<(), GarageError> {
- // TODO: don't clone, restart from empty block list ??
let mut version = version.clone();
- version
- .add_block(VersionBlock {
+ version.blocks.put(
+ VersionBlockKey {
part_number,
offset,
- hash,
- size,
- })
- .unwrap();
+ },
+ VersionBlock { hash, size },
+ );
let block_ref = BlockRef {
block: hash,
version: version.uuid,
- deleted: false,
+ deleted: false.into(),
};
futures::try_join!(
@@ -319,6 +320,7 @@ pub async fn handle_create_multipart_upload(
let version_uuid = gen_uuid();
let headers = get_headers(req)?;
+ // Create object in object table
let object_version = ObjectVersion {
uuid: version_uuid,
timestamp: now_msec(),
@@ -327,6 +329,14 @@ pub async fn handle_create_multipart_upload(
let object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]);
garage.object_table.insert(&object).await?;
+ // Insert empty version so that block_ref entries refer to something
+ // (they are inserted concurrently with blocks in the version table, so
+ // there is the possibility that they are inserted before the version table
+ // is created, in which case it is allowed to delete them, e.g. in repair_*)
+ let version = Version::new(version_uuid, bucket.into(), key.into(), false);
+ garage.version_table.insert(&version).await?;
+
+ // Send success response
let mut xml = String::new();
writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap();
writeln!(
@@ -389,11 +399,11 @@ pub async fn handle_put_part(
}
// Copy block to store
- let version = Version::new(version_uuid, bucket, key, false, vec![]);
- let first_block_hash = sha256sum(&first_block[..]);
- let (_, md5sum_arr, sha256sum) = read_and_put_blocks(
+ let version = Version::new(version_uuid, bucket, key, false);
+ let first_block_hash = blake2sum(&first_block[..]);
+ let (_, data_md5sum, data_sha256sum) = read_and_put_blocks(
&garage,
- version,
+ &version,
part_number,
first_block,
first_block_hash,
@@ -401,15 +411,24 @@ pub async fn handle_put_part(
)
.await?;
+ // Verify that checksums map
ensure_checksum_matches(
- md5sum_arr.as_slice(),
- sha256sum,
+ data_md5sum.as_slice(),
+ data_sha256sum,
content_md5.as_deref(),
content_sha256,
)?;
+ // Store part etag in version
+ let data_md5sum_hex = hex::encode(data_md5sum);
+ let mut version = version;
+ version
+ .parts_etags
+ .put(part_number, data_md5sum_hex.clone());
+ garage.version_table.insert(&version).await?;
+
let response = Response::builder()
- .header("ETag", format!("\"{}\"", hex::encode(md5sum_arr)))
+ .header("ETag", format!("\"{}\"", data_md5sum_hex))
.body(Body::from(vec![]))
.unwrap();
Ok(response)
@@ -444,17 +463,15 @@ pub async fn handle_complete_multipart_upload(
)?;
let object = object.ok_or(Error::BadRequest(format!("Object not found")))?;
- let object_version = object
+ let mut object_version = object
.versions()
.iter()
- .find(|v| v.uuid == version_uuid && v.is_uploading());
- let mut object_version = match object_version {
- None => return Err(Error::NotFound),
- Some(x) => x.clone(),
- };
+ .find(|v| v.uuid == version_uuid && v.is_uploading())
+ .cloned()
+ .ok_or(Error::BadRequest(format!("Version not found")))?;
let version = version.ok_or(Error::BadRequest(format!("Version not found")))?;
- if version.blocks().len() == 0 {
+ if version.blocks.len() == 0 {
return Err(Error::BadRequest(format!("No data was uploaded")));
}
@@ -464,53 +481,50 @@ pub async fn handle_complete_multipart_upload(
};
// Check that the list of parts they gave us corresponds to the parts we have here
- // TODO: check MD5 sum of all uploaded parts? but that would mean we have to store them somewhere...
- let mut parts = version
- .blocks()
+ debug!("Expected parts from request: {:?}", body_list_of_parts);
+ debug!("Parts stored in version: {:?}", version.parts_etags.items());
+ let parts = version
+ .parts_etags
+ .items()
.iter()
- .map(|x| x.part_number)
- .collect::<Vec<_>>();
- parts.dedup();
+ .map(|pair| (&pair.0, &pair.1));
let same_parts = body_list_of_parts
.iter()
- .map(|x| &x.part_number)
- .eq(parts.iter());
+ .map(|x| (&x.part_number, &x.etag))
+ .eq(parts);
if !same_parts {
return Err(Error::BadRequest(format!("We don't have the same parts")));
}
- // ETag calculation: we produce ETags that have the same form as
- // those of S3 multipart uploads, but we don't use their actual
- // calculation for the first part (we use random bytes). This
- // shouldn't impact compatibility as the S3 docs specify that
- // the ETag is an opaque value in case of a multipart upload.
- // See also: https://teppen.io/2018/06/23/aws_s3_etags/
- let num_parts = version.blocks().last().unwrap().part_number
- - version.blocks().first().unwrap().part_number
+ // Calculate etag of final object
+ // To understand how etags are calculated, read more here:
+ // https://teppen.io/2018/06/23/aws_s3_etags/
+ let num_parts = version.blocks.items().last().unwrap().0.part_number
+ - version.blocks.items().first().unwrap().0.part_number
+ 1;
- let etag = format!(
- "{}-{}",
- hex::encode(&rand::random::<[u8; 16]>()[..]),
- num_parts
- );
+ let mut etag_md5_hasher = Md5::new();
+ for (_, etag) in version.parts_etags.items().iter() {
+ etag_md5_hasher.update(etag.as_bytes());
+ }
+ let etag = format!("{}-{}", hex::encode(etag_md5_hasher.finalize()), num_parts);
- let total_size = version
- .blocks()
- .iter()
- .map(|x| x.size)
- .fold(0, |x, y| x + y);
+ // Calculate total size of final object
+ let total_size = version.blocks.items().iter().map(|x| x.1.size).sum();
+
+ // Write final object version
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta {
headers,
size: total_size,
- etag: etag,
+ etag,
},
- version.blocks()[0].hash,
+ version.blocks.items()[0].1.hash,
));
let final_object = Object::new(bucket.clone(), key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
+ // Send response saying ok we're done
let mut xml = String::new();
writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap();
writeln!(
@@ -570,17 +584,19 @@ fn get_mime_type(req: &Request<Body>) -> Result<String, Error> {
.to_string())
}
-fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
+pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
let content_type = get_mime_type(req)?;
- let other_headers = vec![
+ let mut other = BTreeMap::new();
+
+ // Preserve standard headers
+ let standard_header = vec![
hyper::header::CACHE_CONTROL,
hyper::header::CONTENT_DISPOSITION,
hyper::header::CONTENT_ENCODING,
hyper::header::CONTENT_LANGUAGE,
hyper::header::EXPIRES,
];
- let mut other = BTreeMap::new();
- for h in other_headers.iter() {
+ for h in standard_header.iter() {
if let Some(v) = req.headers().get(h) {
match v.to_str() {
Ok(v_str) => {
@@ -592,6 +608,21 @@ fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
}
}
}
+
+ // Preserve x-amz-meta- headers
+ for (k, v) in req.headers().iter() {
+ if k.as_str().starts_with("x-amz-meta-") {
+ match v.to_str() {
+ Ok(v_str) => {
+ other.insert(k.to_string(), v_str.to_string());
+ }
+ Err(e) => {
+ warn!("Discarding header {}, error in .to_str(): {}", k, e);
+ }
+ }
+ }
+ }
+
Ok(ObjectVersionHeaders {
content_type,
other,
diff --git a/src/api/signature.rs b/src/api/signature.rs
index d7fbd3f7..6dc69afa 100644
--- a/src/api/signature.rs
+++ b/src/api/signature.rs
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
-use hmac::{Hmac, Mac};
+use hmac::{Hmac, Mac, NewMac};
use hyper::{Body, Method, Request};
use sha2::{Digest, Sha256};
@@ -91,8 +91,8 @@ pub async fn check_signature(
"s3",
)
.ok_or_internal_error("Unable to build signing HMAC")?;
- hmac.input(string_to_sign.as_bytes());
- let signature = hex::encode(hmac.result().code());
+ hmac.update(string_to_sign.as_bytes());
+ let signature = hex::encode(hmac.finalize().into_bytes());
if authorization.signature != signature {
trace!("Canonical request: ``{}``", canonical_request);
@@ -106,12 +106,10 @@ pub async fn check_signature(
} else {
let bytes = hex::decode(authorization.content_sha256)
.ok_or_bad_request("Invalid content sha256 hash")?;
- let mut hash = [0u8; 32];
- if bytes.len() != 32 {
- return Err(Error::BadRequest(format!("Invalid content sha256 hash")));
- }
- hash.copy_from_slice(&bytes[..]);
- Some(Hash::from(hash))
+ Some(
+ Hash::try_from(&bytes[..])
+ .ok_or(Error::BadRequest(format!("Invalid content sha256 hash")))?,
+ )
};
Ok((key, content_sha256))
@@ -220,12 +218,12 @@ fn parse_credential(cred: &str) -> Result<(String, String), Error> {
fn string_to_sign(datetime: &DateTime<Utc>, scope_string: &str, canonical_req: &str) -> String {
let mut hasher = Sha256::default();
- hasher.input(canonical_req.as_bytes());
+ hasher.update(canonical_req.as_bytes());
[
"AWS4-HMAC-SHA256",
&datetime.format(LONG_DATETIME).to_string(),
scope_string,
- &hex::encode(hasher.result().as_slice()),
+ &hex::encode(hasher.finalize().as_slice()),
]
.join("\n")
}
@@ -238,14 +236,14 @@ fn signing_hmac(
) -> Result<HmacSha256, crypto_mac::InvalidKeyLength> {
let secret = String::from("AWS4") + secret_key;
let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?;
- date_hmac.input(datetime.format(SHORT_DATE).to_string().as_bytes());
- let mut region_hmac = HmacSha256::new_varkey(&date_hmac.result().code())?;
- region_hmac.input(region.as_bytes());
- let mut service_hmac = HmacSha256::new_varkey(&region_hmac.result().code())?;
- service_hmac.input(service.as_bytes());
- let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.result().code())?;
- signing_hmac.input(b"aws4_request");
- let hmac = HmacSha256::new_varkey(&signing_hmac.result().code())?;
+ date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes());
+ let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?;
+ region_hmac.update(region.as_bytes());
+ let mut service_hmac = HmacSha256::new_varkey(&region_hmac.finalize().into_bytes())?;
+ service_hmac.update(service.as_bytes());
+ let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?;
+ signing_hmac.update(b"aws4_request");
+ let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?;
Ok(hmac)
}
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 03bc472d..c1817bf2 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -21,21 +21,20 @@ garage_model = { version = "0.1.1", path = "../model" }
garage_api = { version = "0.1.1", path = "../api" }
garage_web = { version = "0.1.1", path = "../web" }
-bytes = "0.4"
-rand = "0.7"
-hex = "0.3"
-sha2 = "0.8"
+bytes = "1.0"
+rand = "0.8"
+hex = "0.4"
log = "0.4"
pretty_env_logger = "0.4"
+git-version = "0.3.4"
sled = "0.34"
-old_sled = { package = "sled", version = "0.31" }
structopt = { version = "0.3", default-features = false }
toml = "0.5"
-rmp-serde = "0.14.3"
+rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
futures = "0.3"
futures-util = "0.3"
-tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs
index e1981e3a..df00fcaf 100644
--- a/src/garage/admin_rpc.rs
+++ b/src/garage/admin_rpc.rs
@@ -1,3 +1,5 @@
+use std::collections::HashMap;
+use std::fmt::Write;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
@@ -5,6 +7,7 @@ use serde::{Deserialize, Serialize};
use garage_util::error::Error;
use garage_table::crdt::CRDT;
+use garage_table::replication::*;
use garage_table::*;
use garage_rpc::rpc_client::*;
@@ -14,6 +17,7 @@ use garage_model::bucket_table::*;
use garage_model::garage::Garage;
use garage_model::key_table::*;
+use crate::cli::*;
use crate::repair::Repair;
use crate::*;
@@ -25,6 +29,7 @@ pub enum AdminRPC {
BucketOperation(BucketOperation),
KeyOperation(KeyOperation),
LaunchRepair(RepairOpt),
+ Stats(StatsOpt),
// Replies
Ok(String),
@@ -55,6 +60,7 @@ impl AdminRpcHandler {
AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await,
AdminRPC::KeyOperation(ko) => self2.handle_key_cmd(ko).await,
AdminRPC::LaunchRepair(opt) => self2.handle_launch_repair(opt).await,
+ AdminRPC::Stats(opt) => self2.handle_stats(opt).await,
_ => Err(Error::BadRPC(format!("Invalid RPC"))),
}
}
@@ -116,7 +122,7 @@ impl AdminRpcHandler {
for (key_id, _, _) in bucket.authorized_keys() {
if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? {
if !key.deleted.get() {
- self.update_key_bucket(key, &bucket.name, false, false)
+ self.update_key_bucket(&key, &bucket.name, false, false)
.await?;
}
} else {
@@ -128,31 +134,31 @@ impl AdminRpcHandler {
Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name)))
}
BucketOperation::Allow(query) => {
- let key = self.get_existing_key(&query.key_id).await?;
+ let key = self.get_existing_key(&query.key_pattern).await?;
let bucket = self.get_existing_bucket(&query.bucket).await?;
let allow_read = query.read || key.allow_read(&query.bucket);
let allow_write = query.write || key.allow_write(&query.bucket);
- self.update_key_bucket(key, &query.bucket, allow_read, allow_write)
+ self.update_key_bucket(&key, &query.bucket, allow_read, allow_write)
.await?;
- self.update_bucket_key(bucket, &query.key_id, allow_read, allow_write)
+ self.update_bucket_key(bucket, &key.key_id, allow_read, allow_write)
.await?;
Ok(AdminRPC::Ok(format!(
"New permissions for {} on {}: read {}, write {}.",
- &query.key_id, &query.bucket, allow_read, allow_write
+ &key.key_id, &query.bucket, allow_read, allow_write
)))
}
BucketOperation::Deny(query) => {
- let key = self.get_existing_key(&query.key_id).await?;
+ let key = self.get_existing_key(&query.key_pattern).await?;
let bucket = self.get_existing_bucket(&query.bucket).await?;
let allow_read = !query.read && key.allow_read(&query.bucket);
let allow_write = !query.write && key.allow_write(&query.bucket);
- self.update_key_bucket(key, &query.bucket, allow_read, allow_write)
+ self.update_key_bucket(&key, &query.bucket, allow_read, allow_write)
.await?;
- self.update_bucket_key(bucket, &query.key_id, allow_read, allow_write)
+ self.update_bucket_key(bucket, &key.key_id, allow_read, allow_write)
.await?;
Ok(AdminRPC::Ok(format!(
"New permissions for {} on {}: read {}, write {}.",
- &query.key_id, &query.bucket, allow_read, allow_write
+ &key.key_id, &query.bucket, allow_read, allow_write
)))
}
BucketOperation::Website(query) => {
@@ -187,7 +193,12 @@ impl AdminRpcHandler {
let key_ids = self
.garage
.key_table
- .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000)
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
+ 10000,
+ )
.await?
.iter()
.map(|k| (k.key_id.to_string(), k.name.get().clone()))
@@ -195,7 +206,7 @@ impl AdminRpcHandler {
Ok(AdminRPC::KeyList(key_ids))
}
KeyOperation::Info(query) => {
- let key = self.get_existing_key(&query.key_id).await?;
+ let key = self.get_existing_key(&query.key_pattern).await?;
Ok(AdminRPC::KeyInfo(key))
}
KeyOperation::New(query) => {
@@ -204,13 +215,13 @@ impl AdminRpcHandler {
Ok(AdminRPC::KeyInfo(key))
}
KeyOperation::Rename(query) => {
- let mut key = self.get_existing_key(&query.key_id).await?;
+ let mut key = self.get_existing_key(&query.key_pattern).await?;
key.name.update(query.new_name);
self.garage.key_table.insert(&key).await?;
Ok(AdminRPC::KeyInfo(key))
}
KeyOperation::Delete(query) => {
- let key = self.get_existing_key(&query.key_id).await?;
+ let key = self.get_existing_key(&query.key_pattern).await?;
if !query.yes {
return Err(Error::BadRPC(format!(
"Add --yes flag to really perform this operation"
@@ -227,13 +238,24 @@ impl AdminRpcHandler {
return Err(Error::Message(format!("Bucket not found: {}", ab_name)));
}
}
- let del_key = Key::delete(key.key_id);
+ let del_key = Key::delete(key.key_id.to_string());
self.garage.key_table.insert(&del_key).await?;
Ok(AdminRPC::Ok(format!(
"Key {} was deleted successfully.",
- query.key_id
+ key.key_id
)))
}
+ KeyOperation::Import(query) => {
+ let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id)
+ .await?;
+ if prev_key.is_some() {
+ return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
+ }
+ let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
+ self.garage.key_table.insert(&imported_key).await?;
+ Ok(AdminRPC::KeyInfo(imported_key))
+
+ }
}
}
@@ -250,14 +272,28 @@ impl AdminRpcHandler {
))))
}
- async fn get_existing_key(&self, id: &String) -> Result<Key, Error> {
- self.garage
+ async fn get_existing_key(&self, pattern: &str) -> Result<Key, Error> {
+ let candidates = self
+ .garage
.key_table
- .get(&EmptyKey, id)
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(KeyFilter::Matches(pattern.to_string())),
+ 10,
+ )
.await?
+ .into_iter()
.filter(|k| !k.deleted.get())
- .map(Ok)
- .unwrap_or(Err(Error::BadRPC(format!("Key {} does not exist", id))))
+ .collect::<Vec<_>>();
+ if candidates.len() != 1 {
+ Err(Error::Message(format!(
+ "{} matching keys",
+ candidates.len()
+ )))
+ } else {
+ Ok(candidates.into_iter().next().unwrap())
+ }
}
/// Update **bucket table** to inform of the new linked key
@@ -290,11 +326,12 @@ impl AdminRpcHandler {
/// Update **key table** to inform of the new linked bucket
async fn update_key_bucket(
&self,
- mut key: Key,
+ key: &Key,
bucket: &String,
allow_read: bool,
allow_write: bool,
) -> Result<(), Error> {
+ let mut key = key.clone();
let old_map = key.authorized_buckets.take_and_clear();
key.authorized_buckets.merge(&old_map.update_mutator(
bucket.clone(),
@@ -350,12 +387,118 @@ impl AdminRpcHandler {
.background
.spawn_worker("Repair worker".into(), move |must_exit| async move {
repair.repair_worker(opt, must_exit).await
- })
- .await;
+ });
Ok(AdminRPC::Ok(format!(
"Repair launched on {:?}",
self.garage.system.id
)))
}
}
+
+ async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRPC, Error> {
+ if opt.all_nodes {
+ let mut ret = String::new();
+ let ring = self.garage.system.ring.borrow().clone();
+
+ for node in ring.config.members.keys() {
+ let mut opt = opt.clone();
+ opt.all_nodes = false;
+
+ writeln!(&mut ret, "\n======================").unwrap();
+ writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
+ match self
+ .rpc_client
+ .call(*node, AdminRPC::Stats(opt), ADMIN_RPC_TIMEOUT)
+ .await
+ {
+ Ok(AdminRPC::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(),
+ Ok(x) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(),
+ Err(e) => writeln!(&mut ret, "Error: {}", e).unwrap(),
+ }
+ }
+ Ok(AdminRPC::Ok(ret))
+ } else {
+ Ok(AdminRPC::Ok(self.gather_stats_local(opt)?))
+ }
+ }
+
+ fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> {
+ let mut ret = String::new();
+ writeln!(
+ &mut ret,
+ "\nGarage version: {}",
+ git_version::git_version!()
+ )
+ .unwrap();
+
+ // Gather ring statistics
+ let ring = self.garage.system.ring.borrow().clone();
+ let mut ring_nodes = HashMap::new();
+ for r in ring.ring.iter() {
+ for n in r.nodes.iter() {
+ if !ring_nodes.contains_key(n) {
+ ring_nodes.insert(*n, 0usize);
+ }
+ *ring_nodes.get_mut(n).unwrap() += 1;
+ }
+ }
+ writeln!(&mut ret, "\nRing nodes & partition count:").unwrap();
+ for (n, c) in ring_nodes.iter() {
+ writeln!(&mut ret, " {:?} {}", n, c).unwrap();
+ }
+
+ self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.key_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.object_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.version_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt)?;
+
+ writeln!(&mut ret, "\nBlock manager stats:").unwrap();
+ if opt.detailed {
+ writeln!(
+ &mut ret,
+ " number of blocks: {}",
+ self.garage.block_manager.rc_len()
+ )
+ .unwrap();
+ }
+ writeln!(
+ &mut ret,
+ " resync queue length: {}",
+ self.garage.block_manager.resync_queue_len()
+ )
+ .unwrap();
+
+ Ok(ret)
+ }
+
+ fn gather_table_stats<F, R>(
+ &self,
+ to: &mut String,
+ t: &Arc<Table<F, R>>,
+ opt: &StatsOpt,
+ ) -> Result<(), Error>
+ where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+ {
+ writeln!(to, "\nTable stats for {}", t.data.name).unwrap();
+ if opt.detailed {
+ writeln!(to, " number of items: {}", t.data.store.len()).unwrap();
+ writeln!(
+ to,
+ " Merkle tree size: {}",
+ t.merkle_updater.merkle_tree_len()
+ )
+ .unwrap();
+ }
+ writeln!(
+ to,
+ " Merkle updater todo queue length: {}",
+ t.merkle_updater.todo_len()
+ )
+ .unwrap();
+ writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap();
+ Ok(())
+ }
}
diff --git a/src/garage/cli.rs b/src/garage/cli.rs
new file mode 100644
index 00000000..21bafebd
--- /dev/null
+++ b/src/garage/cli.rs
@@ -0,0 +1,552 @@
+use std::collections::HashSet;
+use std::net::SocketAddr;
+use std::path::PathBuf;
+
+use serde::{Deserialize, Serialize};
+use structopt::StructOpt;
+
+use garage_util::error::Error;
+use garage_util::time::*;
+
+use garage_rpc::membership::*;
+use garage_rpc::ring::*;
+use garage_rpc::rpc_client::*;
+
+use garage_model::bucket_table::*;
+use garage_model::key_table::*;
+
+use crate::admin_rpc::*;
+
+#[derive(StructOpt, Debug)]
+pub enum Command {
+ /// Run Garage server
+ #[structopt(name = "server")]
+ Server(ServerOpt),
+
+ /// Get network status
+ #[structopt(name = "status")]
+ Status,
+
+ /// Garage node operations
+ #[structopt(name = "node")]
+ Node(NodeOperation),
+
+ /// Bucket operations
+ #[structopt(name = "bucket")]
+ Bucket(BucketOperation),
+
+ /// Key operations
+ #[structopt(name = "key")]
+ Key(KeyOperation),
+
+ /// Start repair of node data
+ #[structopt(name = "repair")]
+ Repair(RepairOpt),
+
+ /// Gather node statistics
+ #[structopt(name = "stats")]
+ Stats(StatsOpt),
+}
+
+#[derive(StructOpt, Debug)]
+pub struct ServerOpt {
+ /// Configuration file
+ #[structopt(short = "c", long = "config", default_value = "./config.toml")]
+ pub config_file: PathBuf,
+}
+
+#[derive(StructOpt, Debug)]
+pub enum NodeOperation {
+ /// Configure Garage node
+ #[structopt(name = "configure")]
+ Configure(ConfigureNodeOpt),
+
+ /// Remove Garage node from cluster
+ #[structopt(name = "remove")]
+ Remove(RemoveNodeOpt),
+}
+
+#[derive(StructOpt, Debug)]
+pub struct ConfigureNodeOpt {
+ /// Node to configure (prefix of hexadecimal node id)
+ node_id: String,
+
+ /// Location (datacenter) of the node
+ #[structopt(short = "d", long = "datacenter")]
+ datacenter: Option<String>,
+
+ /// Capacity (in relative terms, use 1 to represent your smallest server)
+ #[structopt(short = "c", long = "capacity")]
+ capacity: Option<u32>,
+
+ /// Optionnal node tag
+ #[structopt(short = "t", long = "tag")]
+ tag: Option<String>,
+}
+
+#[derive(StructOpt, Debug)]
+pub struct RemoveNodeOpt {
+ /// Node to configure (prefix of hexadecimal node id)
+ node_id: String,
+
+ /// If this flag is not given, the node won't be removed
+ #[structopt(long = "yes")]
+ yes: bool,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub enum BucketOperation {
+ /// List buckets
+ #[structopt(name = "list")]
+ List,
+
+ /// Get bucket info
+ #[structopt(name = "info")]
+ Info(BucketOpt),
+
+ /// Create bucket
+ #[structopt(name = "create")]
+ Create(BucketOpt),
+
+ /// Delete bucket
+ #[structopt(name = "delete")]
+ Delete(DeleteBucketOpt),
+
+ /// Allow key to read or write to bucket
+ #[structopt(name = "allow")]
+ Allow(PermBucketOpt),
+
+ /// Allow key to read or write to bucket
+ #[structopt(name = "deny")]
+ Deny(PermBucketOpt),
+
+ /// Expose as website or not
+ #[structopt(name = "website")]
+ Website(WebsiteOpt),
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub struct WebsiteOpt {
+ /// Create
+ #[structopt(long = "allow")]
+ pub allow: bool,
+
+ /// Delete
+ #[structopt(long = "deny")]
+ pub deny: bool,
+
+ /// Bucket name
+ pub bucket: String,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub struct BucketOpt {
+ /// Bucket name
+ pub name: String,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub struct DeleteBucketOpt {
+ /// Bucket name
+ pub name: String,
+
+ /// If this flag is not given, the bucket won't be deleted
+ #[structopt(long = "yes")]
+ pub yes: bool,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub struct PermBucketOpt {
+ /// Access key name or ID
+ #[structopt(long = "key")]
+ pub key_pattern: String,
+
+ /// Allow/deny read operations
+ #[structopt(long = "read")]
+ pub read: bool,
+
+ /// Allow/deny write operations
+ #[structopt(long = "write")]
+ pub write: bool,
+
+ /// Bucket name
+ pub bucket: String,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub enum KeyOperation {
+ /// List keys
+ #[structopt(name = "list")]
+ List,
+
+ /// Get key info
+ #[structopt(name = "info")]
+ Info(KeyOpt),
+
+ /// Create new key
+ #[structopt(name = "new")]
+ New(KeyNewOpt),
+
+ /// Rename key
+ #[structopt(name = "rename")]
+ Rename(KeyRenameOpt),
+
+ /// Delete key
+ #[structopt(name = "delete")]
+ Delete(KeyDeleteOpt),
+
+ /// Import key
+ #[structopt(name = "import")]
+ Import(KeyImportOpt),
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub struct KeyOpt {
+ /// ID or name of the key
+ pub key_pattern: String,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub struct KeyNewOpt {
+ /// Name of the key
+ #[structopt(long = "name", default_value = "Unnamed key")]
+ pub name: String,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub struct KeyRenameOpt {
+ /// ID or name of the key
+ pub key_pattern: String,
+
+ /// New name of the key
+ pub new_name: String,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub struct KeyDeleteOpt {
+ /// ID or name of the key
+ pub key_pattern: String,
+
+ /// Confirm deletion
+ #[structopt(long = "yes")]
+ pub yes: bool,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub struct KeyImportOpt {
+ /// Access key ID
+ pub key_id: String,
+
+ /// Secret access key
+ pub secret_key: String,
+
+ /// Key name
+ #[structopt(short = "n", default_value = "Imported key")]
+ pub name: String,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+pub struct RepairOpt {
+ /// Launch repair operation on all nodes
+ #[structopt(short = "a", long = "all-nodes")]
+ pub all_nodes: bool,
+
+ /// Confirm the launch of the repair operation
+ #[structopt(long = "yes")]
+ pub yes: bool,
+
+ #[structopt(subcommand)]
+ pub what: Option<RepairWhat>,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum RepairWhat {
+ /// Only do a full sync of metadata tables
+ #[structopt(name = "tables")]
+ Tables,
+ /// Only repair (resync/rebalance) the set of stored blocks
+ #[structopt(name = "blocks")]
+ Blocks,
+ /// Only redo the propagation of object deletions to the version table (slow)
+ #[structopt(name = "versions")]
+ Versions,
+ /// Only redo the propagation of version deletions to the block ref table (extremely slow)
+ #[structopt(name = "block_refs")]
+ BlockRefs,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+pub struct StatsOpt {
+ /// Gather statistics from all nodes
+ #[structopt(short = "a", long = "all-nodes")]
+ pub all_nodes: bool,
+
+ /// Gather detailed statistics (this can be long)
+ #[structopt(short = "d", long = "detailed")]
+ pub detailed: bool,
+}
+
+pub async fn cli_cmd(
+ cmd: Command,
+ membership_rpc_cli: RpcAddrClient<Message>,
+ admin_rpc_cli: RpcAddrClient<AdminRPC>,
+ rpc_host: SocketAddr,
+) -> Result<(), Error> {
+ match cmd {
+ Command::Status => cmd_status(membership_rpc_cli, rpc_host).await,
+ Command::Node(NodeOperation::Configure(configure_opt)) => {
+ cmd_configure(membership_rpc_cli, rpc_host, configure_opt).await
+ }
+ Command::Node(NodeOperation::Remove(remove_opt)) => {
+ cmd_remove(membership_rpc_cli, rpc_host, remove_opt).await
+ }
+ Command::Bucket(bo) => {
+ cmd_admin(admin_rpc_cli, rpc_host, AdminRPC::BucketOperation(bo)).await
+ }
+ Command::Key(ko) => cmd_admin(admin_rpc_cli, rpc_host, AdminRPC::KeyOperation(ko)).await,
+ Command::Repair(ro) => cmd_admin(admin_rpc_cli, rpc_host, AdminRPC::LaunchRepair(ro)).await,
+ Command::Stats(so) => cmd_admin(admin_rpc_cli, rpc_host, AdminRPC::Stats(so)).await,
+ _ => unreachable!(),
+ }
+}
+
+pub async fn cmd_status(
+ rpc_cli: RpcAddrClient<Message>,
+ rpc_host: SocketAddr,
+) -> Result<(), Error> {
+ let status = match rpc_cli
+ .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
+ .await??
+ {
+ Message::AdvertiseNodesUp(nodes) => nodes,
+ resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
+ };
+ let config = match rpc_cli
+ .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
+ .await??
+ {
+ Message::AdvertiseConfig(cfg) => cfg,
+ resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
+ };
+
+ println!("Healthy nodes:");
+ for adv in status.iter().filter(|x| x.is_up) {
+ if let Some(cfg) = config.members.get(&adv.id) {
+ println!(
+ "{:?}\t{}\t{}\t[{}]\t{}\t{}",
+ adv.id, adv.state_info.hostname, adv.addr, cfg.tag, cfg.datacenter, cfg.capacity
+ );
+ } else {
+ println!(
+ "{:?}\t{}\t{}\tUNCONFIGURED/REMOVED",
+ adv.id, adv.state_info.hostname, adv.addr
+ );
+ }
+ }
+
+ let status_keys = status.iter().map(|x| x.id).collect::<HashSet<_>>();
+ let failure_case_1 = status.iter().any(|x| !x.is_up);
+ let failure_case_2 = config
+ .members
+ .iter()
+ .any(|(id, _)| !status_keys.contains(id));
+ if failure_case_1 || failure_case_2 {
+ println!("\nFailed nodes:");
+ for adv in status.iter().filter(|x| !x.is_up) {
+ if let Some(cfg) = config.members.get(&adv.id) {
+ println!(
+ "{:?}\t{}\t{}\t[{}]\t{}\t{}\tlast seen: {}s ago",
+ adv.id,
+ adv.state_info.hostname,
+ adv.addr,
+ cfg.tag,
+ cfg.datacenter,
+ cfg.capacity,
+ (now_msec() - adv.last_seen) / 1000,
+ );
+ }
+ }
+ for (id, cfg) in config.members.iter() {
+ if !status.iter().any(|x| x.id == *id) {
+ println!(
+ "{:?}\t{}\t{}\t{}\tnever seen",
+ id, cfg.tag, cfg.datacenter, cfg.capacity
+ );
+ }
+ }
+ }
+
+ Ok(())
+}
+
+pub async fn cmd_configure(
+ rpc_cli: RpcAddrClient<Message>,
+ rpc_host: SocketAddr,
+ args: ConfigureNodeOpt,
+) -> Result<(), Error> {
+ let status = match rpc_cli
+ .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
+ .await??
+ {
+ Message::AdvertiseNodesUp(nodes) => nodes,
+ resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
+ };
+
+ let mut candidates = vec![];
+ for adv in status.iter() {
+ if hex::encode(&adv.id).starts_with(&args.node_id) {
+ candidates.push(adv.id);
+ }
+ }
+ if candidates.len() != 1 {
+ return Err(Error::Message(format!(
+ "{} matching nodes",
+ candidates.len()
+ )));
+ }
+
+ let mut config = match rpc_cli
+ .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
+ .await??
+ {
+ Message::AdvertiseConfig(cfg) => cfg,
+ resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
+ };
+
+ let new_entry = match config.members.get(&candidates[0]) {
+ None => NetworkConfigEntry {
+ datacenter: args
+ .datacenter
+ .expect("Please specifiy a datacenter with the -d flag"),
+ capacity: args
+ .capacity
+ .expect("Please specifiy a capacity with the -c flag"),
+ tag: args.tag.unwrap_or("".to_string()),
+ },
+ Some(old) => NetworkConfigEntry {
+ datacenter: args.datacenter.unwrap_or(old.datacenter.to_string()),
+ capacity: args.capacity.unwrap_or(old.capacity),
+ tag: args.tag.unwrap_or(old.tag.to_string()),
+ },
+ };
+
+ config.members.insert(candidates[0].clone(), new_entry);
+ config.version += 1;
+
+ rpc_cli
+ .call(
+ &rpc_host,
+ &Message::AdvertiseConfig(config),
+ ADMIN_RPC_TIMEOUT,
+ )
+ .await??;
+ Ok(())
+}
+
+pub async fn cmd_remove(
+ rpc_cli: RpcAddrClient<Message>,
+ rpc_host: SocketAddr,
+ args: RemoveNodeOpt,
+) -> Result<(), Error> {
+ let mut config = match rpc_cli
+ .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
+ .await??
+ {
+ Message::AdvertiseConfig(cfg) => cfg,
+ resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
+ };
+
+ let mut candidates = vec![];
+ for (key, _) in config.members.iter() {
+ if hex::encode(key).starts_with(&args.node_id) {
+ candidates.push(*key);
+ }
+ }
+ if candidates.len() != 1 {
+ return Err(Error::Message(format!(
+ "{} matching nodes",
+ candidates.len()
+ )));
+ }
+
+ if !args.yes {
+ return Err(Error::Message(format!(
+ "Add the flag --yes to really remove {:?} from the cluster",
+ candidates[0]
+ )));
+ }
+
+ config.members.remove(&candidates[0]);
+ config.version += 1;
+
+ rpc_cli
+ .call(
+ &rpc_host,
+ &Message::AdvertiseConfig(config),
+ ADMIN_RPC_TIMEOUT,
+ )
+ .await??;
+ Ok(())
+}
+
+pub async fn cmd_admin(
+ rpc_cli: RpcAddrClient<AdminRPC>,
+ rpc_host: SocketAddr,
+ args: AdminRPC,
+) -> Result<(), Error> {
+ match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await?? {
+ AdminRPC::Ok(msg) => {
+ println!("{}", msg);
+ }
+ AdminRPC::BucketList(bl) => {
+ println!("List of buckets:");
+ for bucket in bl {
+ println!("{}", bucket);
+ }
+ }
+ AdminRPC::BucketInfo(bucket) => {
+ print_bucket_info(&bucket);
+ }
+ AdminRPC::KeyList(kl) => {
+ println!("List of keys:");
+ for key in kl {
+ println!("{}\t{}", key.0, key.1);
+ }
+ }
+ AdminRPC::KeyInfo(key) => {
+ print_key_info(&key);
+ }
+ r => {
+ error!("Unexpected response: {:?}", r);
+ }
+ }
+ Ok(())
+}
+
+fn print_key_info(key: &Key) {
+ println!("Key name: {}", key.name.get());
+ println!("Key ID: {}", key.key_id);
+ println!("Secret key: {}", key.secret_key);
+ if key.deleted.get() {
+ println!("Key is deleted.");
+ } else {
+ println!("Authorized buckets:");
+ for (b, _, perm) in key.authorized_buckets.items().iter() {
+ println!("- {} R:{} W:{}", b, perm.allow_read, perm.allow_write);
+ }
+ }
+}
+
+fn print_bucket_info(bucket: &Bucket) {
+ println!("Bucket name: {}", bucket.name);
+ match bucket.state.get() {
+ BucketState::Deleted => println!("Bucket is deleted."),
+ BucketState::Present(p) => {
+ println!("Authorized keys:");
+ for (k, _, perm) in p.authorized_keys.items().iter() {
+ println!("- {} R:{} W:{}", k, perm.allow_read, perm.allow_write);
+ }
+ println!("Website access: {}", p.website.get());
+ }
+ };
+}
diff --git a/src/garage/main.rs b/src/garage/main.rs
index 8757a1bb..6c86d0fb 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -4,289 +4,67 @@
extern crate log;
mod admin_rpc;
+mod cli;
mod repair;
mod server;
-use std::collections::HashSet;
use std::net::SocketAddr;
-use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
-use serde::{Deserialize, Serialize};
use structopt::StructOpt;
use garage_util::config::TlsConfig;
-use garage_util::data::*;
use garage_util::error::Error;
use garage_rpc::membership::*;
-use garage_rpc::ring::*;
use garage_rpc::rpc_client::*;
use admin_rpc::*;
+use cli::*;
#[derive(StructOpt, Debug)]
#[structopt(name = "garage")]
pub struct Opt {
/// RPC connect to this host to execute client operations
#[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")]
- rpc_host: SocketAddr,
+ pub rpc_host: SocketAddr,
#[structopt(long = "ca-cert")]
- ca_cert: Option<String>,
+ pub ca_cert: Option<String>,
#[structopt(long = "client-cert")]
- client_cert: Option<String>,
+ pub client_cert: Option<String>,
#[structopt(long = "client-key")]
- client_key: Option<String>,
+ pub client_key: Option<String>,
#[structopt(subcommand)]
cmd: Command,
}
-#[derive(StructOpt, Debug)]
-pub enum Command {
- /// Run Garage server
- #[structopt(name = "server")]
- Server(ServerOpt),
-
- /// Get network status
- #[structopt(name = "status")]
- Status,
-
- /// Garage node operations
- #[structopt(name = "node")]
- Node(NodeOperation),
-
- /// Bucket operations
- #[structopt(name = "bucket")]
- Bucket(BucketOperation),
-
- /// Key operations
- #[structopt(name = "key")]
- Key(KeyOperation),
-
- /// Start repair of node data
- #[structopt(name = "repair")]
- Repair(RepairOpt),
-}
-
-#[derive(StructOpt, Debug)]
-pub struct ServerOpt {
- /// Configuration file
- #[structopt(short = "c", long = "config", default_value = "./config.toml")]
- config_file: PathBuf,
-}
-
-#[derive(StructOpt, Debug)]
-pub enum NodeOperation {
- /// Configure Garage node
- #[structopt(name = "configure")]
- Configure(ConfigureNodeOpt),
-
- /// Remove Garage node from cluster
- #[structopt(name = "remove")]
- Remove(RemoveNodeOpt),
-}
-
-#[derive(StructOpt, Debug)]
-pub struct ConfigureNodeOpt {
- /// Node to configure (prefix of hexadecimal node id)
- node_id: String,
-
- /// Location (datacenter) of the node
- #[structopt(short = "d", long = "datacenter")]
- datacenter: Option<String>,
-
- /// Capacity (in relative terms, use 1 to represent your smallest server)
- #[structopt(short = "c", long = "capacity")]
- capacity: Option<u32>,
-
- /// Optionnal node tag
- #[structopt(short = "t", long = "tag")]
- tag: Option<String>,
-}
-
-#[derive(StructOpt, Debug)]
-pub struct RemoveNodeOpt {
- /// Node to configure (prefix of hexadecimal node id)
- node_id: String,
-
- /// If this flag is not given, the node won't be removed
- #[structopt(long = "yes")]
- yes: bool,
-}
-
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
-pub enum BucketOperation {
- /// List buckets
- #[structopt(name = "list")]
- List,
-
- /// Get bucket info
- #[structopt(name = "info")]
- Info(BucketOpt),
-
- /// Create bucket
- #[structopt(name = "create")]
- Create(BucketOpt),
-
- /// Delete bucket
- #[structopt(name = "delete")]
- Delete(DeleteBucketOpt),
-
- /// Allow key to read or write to bucket
- #[structopt(name = "allow")]
- Allow(PermBucketOpt),
-
- /// Allow key to read or write to bucket
- #[structopt(name = "deny")]
- Deny(PermBucketOpt),
-
- /// Expose as website or not
- #[structopt(name = "website")]
- Website(WebsiteOpt),
-}
-
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
-pub struct WebsiteOpt {
- /// Create
- #[structopt(long = "allow")]
- pub allow: bool,
-
- /// Delete
- #[structopt(long = "deny")]
- pub deny: bool,
-
- /// Bucket name
- pub bucket: String,
-}
-
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
-pub struct BucketOpt {
- /// Bucket name
- pub name: String,
-}
-
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
-pub struct DeleteBucketOpt {
- /// Bucket name
- pub name: String,
-
- /// If this flag is not given, the bucket won't be deleted
- #[structopt(long = "yes")]
- pub yes: bool,
-}
-
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
-pub struct PermBucketOpt {
- /// Access key ID
- #[structopt(long = "key")]
- pub key_id: String,
-
- /// Allow/deny read operations
- #[structopt(long = "read")]
- pub read: bool,
-
- /// Allow/deny write operations
- #[structopt(long = "write")]
- pub write: bool,
-
- /// Bucket name
- pub bucket: String,
-}
-
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
-pub enum KeyOperation {
- /// List keys
- #[structopt(name = "list")]
- List,
-
- /// Get key info
- #[structopt(name = "info")]
- Info(KeyOpt),
-
- /// Create new key
- #[structopt(name = "new")]
- New(KeyNewOpt),
-
- /// Rename key
- #[structopt(name = "rename")]
- Rename(KeyRenameOpt),
-
- /// Delete key
- #[structopt(name = "delete")]
- Delete(KeyDeleteOpt),
-}
-
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
-pub struct KeyOpt {
- /// ID of the key
- key_id: String,
-}
-
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
-pub struct KeyNewOpt {
- /// Name of the key
- #[structopt(long = "name", default_value = "Unnamed key")]
- name: String,
-}
-
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
-pub struct KeyRenameOpt {
- /// ID of the key
- key_id: String,
-
- /// New name of the key
- new_name: String,
-}
-
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
-pub struct KeyDeleteOpt {
- /// ID of the key
- key_id: String,
-
- /// Confirm deletion
- #[structopt(long = "yes")]
- yes: bool,
-}
-
-#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
-pub struct RepairOpt {
- /// Launch repair operation on all nodes
- #[structopt(short = "a", long = "all-nodes")]
- pub all_nodes: bool,
-
- /// Confirm the launch of the repair operation
- #[structopt(long = "yes")]
- pub yes: bool,
-
- #[structopt(subcommand)]
- pub what: Option<RepairWhat>,
-}
-
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
-pub enum RepairWhat {
- /// Only do a full sync of metadata tables
- #[structopt(name = "tables")]
- Tables,
- /// Only repair (resync/rebalance) the set of stored blocks
- #[structopt(name = "blocks")]
- Blocks,
- /// Only redo the propagation of object deletions to the version table (slow)
- #[structopt(name = "versions")]
- Versions,
- /// Only redo the propagation of version deletions to the block ref table (extremely slow)
- #[structopt(name = "block_refs")]
- BlockRefs,
-}
-
#[tokio::main]
async fn main() {
pretty_env_logger::init();
let opt = Opt::from_args();
+ let res = if let Command::Server(server_opt) = opt.cmd {
+ // Abort on panic (same behavior as in Go)
+ std::panic::set_hook(Box::new(|panic_info| {
+ error!("{}", panic_info.to_string());
+ std::process::abort();
+ }));
+
+ server::run_server(server_opt.config_file).await
+ } else {
+ cli_command(opt).await
+ };
+
+ if let Err(e) = res {
+ error!("{}", e);
+ }
+}
+
+async fn cli_command(opt: Opt) -> Result<(), Error> {
let tls_config = match (opt.ca_cert, opt.client_cert, opt.client_key) {
(Some(ca_cert), Some(client_cert), Some(client_key)) => Some(TlsConfig {
ca_cert,
@@ -306,245 +84,5 @@ async fn main() {
RpcAddrClient::new(rpc_http_cli.clone(), MEMBERSHIP_RPC_PATH.to_string());
let admin_rpc_cli = RpcAddrClient::new(rpc_http_cli.clone(), ADMIN_RPC_PATH.to_string());
- let resp = match opt.cmd {
- Command::Server(server_opt) => {
- // Abort on panic (same behavior as in Go)
- std::panic::set_hook(Box::new(|panic_info| {
- error!("{}", panic_info.to_string());
- std::process::abort();
- }));
-
- server::run_server(server_opt.config_file).await
- }
- Command::Status => cmd_status(membership_rpc_cli, opt.rpc_host).await,
- Command::Node(NodeOperation::Configure(configure_opt)) => {
- cmd_configure(membership_rpc_cli, opt.rpc_host, configure_opt).await
- }
- Command::Node(NodeOperation::Remove(remove_opt)) => {
- cmd_remove(membership_rpc_cli, opt.rpc_host, remove_opt).await
- }
- Command::Bucket(bo) => {
- cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::BucketOperation(bo)).await
- }
- Command::Key(bo) => {
- cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::KeyOperation(bo)).await
- }
- Command::Repair(ro) => {
- cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::LaunchRepair(ro)).await
- }
- };
-
- if let Err(e) = resp {
- error!("Error: {}", e);
- }
-}
-
-async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Result<(), Error> {
- let status = match rpc_cli
- .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
- .await??
- {
- Message::AdvertiseNodesUp(nodes) => nodes,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
- };
- let config = match rpc_cli
- .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
- .await??
- {
- Message::AdvertiseConfig(cfg) => cfg,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
- };
-
- println!("Healthy nodes:");
- for adv in status.iter().filter(|x| x.is_up) {
- if let Some(cfg) = config.members.get(&adv.id) {
- println!(
- "{:?}\t{}\t{}\t[{}]\t{}\t{}",
- adv.id, adv.state_info.hostname, adv.addr, cfg.tag, cfg.datacenter, cfg.capacity
- );
- } else {
- println!(
- "{:?}\t{}\t{}\tUNCONFIGURED/REMOVED",
- adv.id, adv.state_info.hostname, adv.addr
- );
- }
- }
-
- let status_keys = status.iter().map(|x| x.id).collect::<HashSet<_>>();
- let failure_case_1 = status.iter().any(|x| !x.is_up);
- let failure_case_2 = config
- .members
- .iter()
- .any(|(id, _)| !status_keys.contains(id));
- if failure_case_1 || failure_case_2 {
- println!("\nFailed nodes:");
- for adv in status.iter().filter(|x| !x.is_up) {
- if let Some(cfg) = config.members.get(&adv.id) {
- println!(
- "{:?}\t{}\t{}\t[{}]\t{}\t{}\tlast seen: {}s ago",
- adv.id,
- adv.state_info.hostname,
- adv.addr,
- cfg.tag,
- cfg.datacenter,
- cfg.capacity,
- (now_msec() - adv.last_seen) / 1000,
- );
- }
- }
- for (id, cfg) in config.members.iter() {
- if !status.iter().any(|x| x.id == *id) {
- println!(
- "{:?}\t{}\t{}\t{}\tnever seen",
- id, cfg.tag, cfg.datacenter, cfg.capacity
- );
- }
- }
- }
-
- Ok(())
-}
-
-async fn cmd_configure(
- rpc_cli: RpcAddrClient<Message>,
- rpc_host: SocketAddr,
- args: ConfigureNodeOpt,
-) -> Result<(), Error> {
- let status = match rpc_cli
- .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
- .await??
- {
- Message::AdvertiseNodesUp(nodes) => nodes,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
- };
-
- let mut candidates = vec![];
- for adv in status.iter() {
- if hex::encode(&adv.id).starts_with(&args.node_id) {
- candidates.push(adv.id);
- }
- }
- if candidates.len() != 1 {
- return Err(Error::Message(format!(
- "{} matching nodes",
- candidates.len()
- )));
- }
-
- let mut config = match rpc_cli
- .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
- .await??
- {
- Message::AdvertiseConfig(cfg) => cfg,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
- };
-
- let new_entry = match config.members.get(&candidates[0]) {
- None => NetworkConfigEntry {
- datacenter: args
- .datacenter
- .expect("Please specifiy a datacenter with the -d flag"),
- capacity: args
- .capacity
- .expect("Please specifiy a capacity with the -c flag"),
- tag: args.tag.unwrap_or("".to_string()),
- },
- Some(old) => NetworkConfigEntry {
- datacenter: args.datacenter.unwrap_or(old.datacenter.to_string()),
- capacity: args.capacity.unwrap_or(old.capacity),
- tag: args.tag.unwrap_or(old.tag.to_string()),
- },
- };
-
- config.members.insert(candidates[0].clone(), new_entry);
- config.version += 1;
-
- rpc_cli
- .call(
- &rpc_host,
- &Message::AdvertiseConfig(config),
- ADMIN_RPC_TIMEOUT,
- )
- .await??;
- Ok(())
-}
-
-async fn cmd_remove(
- rpc_cli: RpcAddrClient<Message>,
- rpc_host: SocketAddr,
- args: RemoveNodeOpt,
-) -> Result<(), Error> {
- let mut config = match rpc_cli
- .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
- .await??
- {
- Message::AdvertiseConfig(cfg) => cfg,
- resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
- };
-
- let mut candidates = vec![];
- for (key, _) in config.members.iter() {
- if hex::encode(key).starts_with(&args.node_id) {
- candidates.push(*key);
- }
- }
- if candidates.len() != 1 {
- return Err(Error::Message(format!(
- "{} matching nodes",
- candidates.len()
- )));
- }
-
- if !args.yes {
- return Err(Error::Message(format!(
- "Add the flag --yes to really remove {:?} from the cluster",
- candidates[0]
- )));
- }
-
- config.members.remove(&candidates[0]);
- config.version += 1;
-
- rpc_cli
- .call(
- &rpc_host,
- &Message::AdvertiseConfig(config),
- ADMIN_RPC_TIMEOUT,
- )
- .await??;
- Ok(())
-}
-
-async fn cmd_admin(
- rpc_cli: RpcAddrClient<AdminRPC>,
- rpc_host: SocketAddr,
- args: AdminRPC,
-) -> Result<(), Error> {
- match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await?? {
- AdminRPC::Ok(msg) => {
- println!("{}", msg);
- }
- AdminRPC::BucketList(bl) => {
- println!("List of buckets:");
- for bucket in bl {
- println!("{}", bucket);
- }
- }
- AdminRPC::BucketInfo(bucket) => {
- println!("{:?}", bucket);
- }
- AdminRPC::KeyList(kl) => {
- println!("List of keys:");
- for key in kl {
- println!("{}\t{}", key.0, key.1);
- }
- }
- AdminRPC::KeyInfo(key) => {
- println!("{:?}", key);
- }
- r => {
- error!("Unexpected response: {:?}", r);
- }
- }
- Ok(())
+ cli_cmd(opt.cmd, membership_rpc_cli, admin_rpc_cli, opt.rpc_host).await
}
diff --git a/src/garage/repair.rs b/src/garage/repair.rs
index 297ae9cd..8200f1f0 100644
--- a/src/garage/repair.rs
+++ b/src/garage/repair.rs
@@ -16,7 +16,13 @@ pub struct Repair {
}
impl Repair {
- pub async fn repair_worker(
+ pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) {
+ if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
+ warn!("Repair worker failed with error: {}", e);
+ }
+ }
+
+ async fn repair_worker_aux(
&self,
opt: RepairOpt,
must_exit: watch::Receiver<bool>,
@@ -25,41 +31,11 @@ impl Repair {
if todo(RepairWhat::Tables) {
info!("Launching a full sync of tables");
- self.garage
- .bucket_table
- .syncer
- .load_full()
- .unwrap()
- .add_full_scan()
- .await;
- self.garage
- .object_table
- .syncer
- .load_full()
- .unwrap()
- .add_full_scan()
- .await;
- self.garage
- .version_table
- .syncer
- .load_full()
- .unwrap()
- .add_full_scan()
- .await;
- self.garage
- .block_ref_table
- .syncer
- .load_full()
- .unwrap()
- .add_full_scan()
- .await;
- self.garage
- .key_table
- .syncer
- .load_full()
- .unwrap()
- .add_full_scan()
- .await;
+ self.garage.bucket_table.syncer.add_full_sync();
+ self.garage.object_table.syncer.add_full_sync();
+ self.garage.version_table.syncer.add_full_sync();
+ self.garage.block_ref_table.syncer.add_full_sync();
+ self.garage.key_table.syncer.add_full_sync();
}
// TODO: wait for full sync to finish before proceeding to the rest?
@@ -93,11 +69,13 @@ impl Repair {
async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
- while let Some((item_key, item_bytes)) = self.garage.version_table.store.get_gt(&pos)? {
+ while let Some((item_key, item_bytes)) =
+ self.garage.version_table.data.store.get_gt(&pos)?
+ {
pos = item_key.to_vec();
let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?;
- if version.deleted {
+ if version.deleted.get() {
continue;
}
let object = self
@@ -110,13 +88,7 @@ impl Repair {
.versions()
.iter()
.any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted),
- None => {
- warn!(
- "Repair versions: object for version {:?} not found, skipping.",
- version
- );
- continue;
- }
+ None => false,
};
if !version_exists {
info!("Repair versions: marking version as deleted: {:?}", version);
@@ -127,7 +99,6 @@ impl Repair {
version.bucket,
version.key,
true,
- vec![],
))
.await?;
}
@@ -142,11 +113,13 @@ impl Repair {
async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
- while let Some((item_key, item_bytes)) = self.garage.block_ref_table.store.get_gt(&pos)? {
+ while let Some((item_key, item_bytes)) =
+ self.garage.block_ref_table.data.store.get_gt(&pos)?
+ {
pos = item_key.to_vec();
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?;
- if block_ref.deleted {
+ if block_ref.deleted.get() {
continue;
}
let version = self
@@ -154,16 +127,8 @@ impl Repair {
.version_table
.get(&block_ref.version, &EmptyKey)
.await?;
- let ref_exists = match version {
- Some(v) => !v.deleted,
- None => {
- warn!(
- "Block ref repair: version for block ref {:?} not found, skipping.",
- block_ref
- );
- continue;
- }
- };
+ // The version might not exist if it has been GC'ed
+ let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false);
if !ref_exists {
info!(
"Repair block ref: marking block_ref as deleted: {:?}",
@@ -174,7 +139,7 @@ impl Repair {
.insert(&BlockRef {
block: block_ref.block,
version: block_ref.version,
- deleted: true,
+ deleted: true.into(),
})
.await?;
}
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 2e109f8b..c45a69b8 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -21,13 +21,13 @@ async fn shutdown_signal(send_cancel: watch::Sender<bool>) -> Result<(), Error>
.await
.expect("failed to install CTRL+C signal handler");
info!("Received CTRL+C, shutting down.");
- send_cancel.broadcast(true)?;
+ send_cancel.send(true)?;
Ok(())
}
async fn wait_from(mut chan: watch::Receiver<bool>) -> () {
- while let Some(exit_now) = chan.recv().await {
- if exit_now {
+ while !*chan.borrow() {
+ if chan.changed().await.is_err() {
return;
}
}
@@ -40,37 +40,22 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Opening database...");
let mut db_path = config.metadata_dir.clone();
db_path.push("db");
- let db = match sled::open(&db_path) {
- Ok(db) => db,
- Err(e) => {
- warn!("Old DB could not be openned ({}), attempting migration.", e);
- let old = old_sled::open(&db_path).expect("Unable to open old DB for migration");
- let mut new_path = config.metadata_dir.clone();
- new_path.push("db2");
- let new = sled::open(&new_path).expect("Unable to open new DB for migration");
- new.import(old.export());
- if old.checksum().expect("unable to compute old db checksum")
- != new.checksum().expect("unable to compute new db checksum")
- {
- panic!("db checksums don't match after migration");
- }
- drop(new);
- drop(old);
- std::fs::remove_dir_all(&db_path).expect("Cannot remove old DB folder");
- std::fs::rename(new_path, &db_path)
- .expect("Cannot move new DB folder to correct place");
- sled::open(db_path).expect("Unable to open new DB after migration")
- }
- };
+ let db = sled::open(&db_path).expect("Unable to open sled DB");
info!("Initialize RPC server...");
let mut rpc_server = RpcServer::new(config.rpc_bind_addr.clone(), config.rpc_tls.clone());
info!("Initializing background runner...");
let (send_cancel, watch_cancel) = watch::channel(false);
- let background = BackgroundRunner::new(16, watch_cancel.clone());
+ let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
- let garage = Garage::new(config, db, background.clone(), &mut rpc_server).await;
+ info!("Initializing Garage main data store...");
+ let garage = Garage::new(config.clone(), db, background, &mut rpc_server);
+ let bootstrap = garage.system.clone().bootstrap(
+ &config.bootstrap_peers[..],
+ config.consul_host,
+ config.consul_service_name,
+ );
info!("Crate admin RPC handler...");
AdminRpcHandler::new(garage.clone()).register_handler(&mut rpc_server);
@@ -78,21 +63,13 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initializing RPC and API servers...");
let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone()));
let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
- let web_server = web_server::run_web_server(garage.clone(), wait_from(watch_cancel.clone()));
+ let web_server = web_server::run_web_server(garage, wait_from(watch_cancel.clone()));
futures::try_join!(
- garage
- .system
- .clone()
- .bootstrap(
- &garage.config.bootstrap_peers[..],
- garage.config.consul_host.clone(),
- garage.config.consul_service_name.clone()
- )
- .map(|rv| {
- info!("Bootstrap done");
- Ok(rv)
- }),
+ bootstrap.map(|rv| {
+ info!("Bootstrap done");
+ Ok(rv)
+ }),
run_rpc_server.map(|rv| {
info!("RPC server exited");
rv
@@ -105,9 +82,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Web server exited");
rv
}),
- background.run().map(|rv| {
- info!("Background runner exited");
- Ok(rv)
+ await_background_done.map(|rv| {
+ info!("Background runner exited: {:?}", rv);
+ Ok(())
}),
shutdown_signal(send_cancel),
)?;
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 48b75d24..98656ea9 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -16,23 +16,18 @@ path = "lib.rs"
garage_util = { version = "0.1.1", path = "../util" }
garage_rpc = { version = "0.1.1", path = "../rpc" }
garage_table = { version = "0.1.1", path = "../table" }
-model010 = { package = "garage_model_010b", version = "0.0.1" }
-bytes = "0.4"
-rand = "0.7"
-hex = "0.3"
-sha2 = "0.8"
-arc-swap = "0.4"
+rand = "0.8"
+hex = "0.4"
+arc-swap = "1.0"
log = "0.4"
sled = "0.34"
-rmp-serde = "0.14.3"
+rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
-async-trait = "0.1.30"
futures = "0.3"
futures-util = "0.3"
-tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
-
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
diff --git a/src/model/block.rs b/src/model/block.rs
index 56c85c6a..0d9af38f 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -5,22 +5,20 @@ use std::time::Duration;
use arc_swap::ArcSwapOption;
use futures::future::*;
use futures::select;
-use futures::stream::*;
use serde::{Deserialize, Serialize};
use tokio::fs;
-use tokio::prelude::*;
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{watch, Mutex, Notify};
-use garage_util::data;
use garage_util::data::*;
use garage_util::error::Error;
+use garage_util::time::*;
use garage_rpc::membership::System;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
-use garage_table::table_sharded::TableShardedReplication;
-use garage_table::TableReplication;
+use garage_table::replication::{sharded::TableShardedReplication, TableReplication};
use crate::block_ref_table::*;
@@ -28,7 +26,10 @@ use crate::garage::Garage;
pub const INLINE_THRESHOLD: usize = 3072;
+pub const BACKGROUND_WORKERS: u64 = 1;
+
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42);
+const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
@@ -56,14 +57,14 @@ pub struct BlockManager {
pub data_dir: PathBuf,
pub data_dir_lock: Mutex<()>,
- pub rc: sled::Tree,
+ rc: sled::Tree,
- pub resync_queue: sled::Tree,
- pub resync_notify: Notify,
+ resync_queue: sled::Tree,
+ resync_notify: Notify,
- pub system: Arc<System>,
+ system: Arc<System>,
rpc_client: Arc<RpcClient<Message>>,
- pub garage: ArcSwapOption<Garage>,
+ pub(crate) garage: ArcSwapOption<Garage>,
}
impl BlockManager {
@@ -77,7 +78,6 @@ impl BlockManager {
let rc = db
.open_tree("block_local_rc")
.expect("Unable to open block_local_rc tree");
- rc.set_merge_operator(rc_merge);
let resync_queue = db
.open_tree("block_local_resync_queue")
@@ -127,18 +127,16 @@ impl BlockManager {
}
}
- pub async fn spawn_background_worker(self: Arc<Self>) {
+ pub fn spawn_background_worker(self: Arc<Self>) {
// Launch 2 simultaneous workers for background resync loop preprocessing
- for i in 0..2usize {
+ for i in 0..BACKGROUND_WORKERS {
let bm2 = self.clone();
let background = self.system.background.clone();
tokio::spawn(async move {
- tokio::time::delay_for(Duration::from_secs(10)).await;
- background
- .spawn_worker(format!("block resync worker {}", i), move |must_exit| {
- bm2.resync_loop(must_exit)
- })
- .await;
+ tokio::time::sleep(Duration::from_secs(10 * (i + 1))).await;
+ background.spawn_worker(format!("block resync worker {}", i), move |must_exit| {
+ bm2.resync_loop(must_exit)
+ });
});
}
}
@@ -168,7 +166,7 @@ impl BlockManager {
Ok(f) => f,
Err(e) => {
// Not found but maybe we should have had it ??
- self.put_to_resync(hash, 0)?;
+ self.put_to_resync(hash, Duration::from_millis(0))?;
return Err(Into::into(e));
}
};
@@ -176,11 +174,16 @@ impl BlockManager {
f.read_to_end(&mut data).await?;
drop(f);
- if data::sha256sum(&data[..]) != *hash {
+ if blake2sum(&data[..]) != *hash {
let _lock = self.data_dir_lock.lock().await;
- warn!("Block {:?} is corrupted. Deleting and resyncing.", hash);
- fs::remove_file(path).await?;
- self.put_to_resync(&hash, 0)?;
+ warn!(
+ "Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
+ hash
+ );
+ let mut path2 = path.clone();
+ path2.set_extension(".corrupted");
+ fs::rename(path, path2).await?;
+ self.put_to_resync(&hash, Duration::from_millis(0))?;
return Err(Error::CorruptData(*hash));
}
@@ -191,7 +194,7 @@ impl BlockManager {
let needed = self
.rc
.get(hash.as_ref())?
- .map(|x| u64_from_bytes(x.as_ref()) > 0)
+ .map(|x| u64_from_be_bytes(x) > 0)
.unwrap_or(false);
if needed {
let path = self.block_path(hash);
@@ -215,84 +218,95 @@ impl BlockManager {
}
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
- let old_rc = self.rc.get(&hash)?;
- self.rc.merge(&hash, vec![1])?;
- if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
- self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?;
+ let old_rc = self.rc.fetch_and_update(&hash, |old| {
+ let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
+ Some(u64::to_be_bytes(old_v + 1).to_vec())
+ })?;
+ let old_rc = old_rc.map(u64_from_be_bytes).unwrap_or(0);
+ if old_rc == 0 {
+ self.put_to_resync(&hash, BLOCK_RW_TIMEOUT)?;
}
Ok(())
}
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
- let new_rc = self.rc.merge(&hash, vec![0])?;
- if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
- self.put_to_resync(&hash, 0)?;
+ let new_rc = self.rc.update_and_fetch(&hash, |old| {
+ let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
+ if old_v > 1 {
+ Some(u64::to_be_bytes(old_v - 1).to_vec())
+ } else {
+ None
+ }
+ })?;
+ if new_rc.is_none() {
+ self.put_to_resync(&hash, BLOCK_GC_TIMEOUT)?;
}
Ok(())
}
- fn put_to_resync(&self, hash: &Hash, delay_millis: u64) -> Result<(), Error> {
- let when = now_msec() + delay_millis;
+ fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> {
+ let when = now_msec() + delay.as_millis() as u64;
trace!("Put resync_queue: {} {:?}", when, hash);
let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref());
self.resync_queue.insert(key, hash.as_ref())?;
- self.resync_notify.notify();
+ self.resync_notify.notify_waiters();
Ok(())
}
- async fn resync_loop(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- ) -> Result<(), Error> {
- let mut n_failures = 0usize;
+ async fn resync_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() {
- if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
- let time_msec = u64_from_bytes(&time_bytes[0..8]);
- let now = now_msec();
- if now >= time_msec {
- let mut hash = [0u8; 32];
- hash.copy_from_slice(hash_bytes.as_ref());
- let hash = Hash::from(hash);
-
- if let Err(e) = self.resync_iter(&hash).await {
- warn!("Failed to resync block {:?}, retrying later: {}", hash, e);
- self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?;
- n_failures += 1;
- if n_failures >= 10 {
- warn!("Too many resync failures, throttling.");
- tokio::time::delay_for(Duration::from_secs(1)).await;
- }
- } else {
- n_failures = 0;
- }
- } else {
- self.resync_queue.insert(time_bytes, hash_bytes)?;
- let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now));
- select! {
- _ = delay.fuse() => (),
- _ = self.resync_notify.notified().fuse() => (),
- _ = must_exit.recv().fuse() => (),
- }
+ if let Err(e) = self.resync_iter(&mut must_exit).await {
+ warn!("Error in block resync loop: {}", e);
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => (),
+ _ = must_exit.changed().fuse() => (),
}
+ }
+ }
+ }
+
+ async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
+ if let Some(first_item) = self.resync_queue.iter().next() {
+ let (time_bytes, hash_bytes) = first_item?;
+ let time_msec = u64_from_be_bytes(&time_bytes[0..8]);
+ let now = now_msec();
+ if now >= time_msec {
+ let hash = Hash::try_from(&hash_bytes[..]).unwrap();
+ let res = self.resync_block(&hash).await;
+ if let Err(e) = &res {
+ warn!("Error when resyncing {:?}: {}", hash, e);
+ self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?;
+ }
+ self.resync_queue.remove(&time_bytes)?;
+ res?; // propagate error to delay main loop
} else {
+ let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
select! {
+ _ = delay.fuse() => (),
_ = self.resync_notify.notified().fuse() => (),
- _ = must_exit.recv().fuse() => (),
+ _ = must_exit.changed().fuse() => (),
}
}
+ } else {
+ select! {
+ _ = self.resync_notify.notified().fuse() => (),
+ _ = must_exit.changed().fuse() => (),
+ }
}
Ok(())
}
- async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> {
+ async fn resync_block(&self, hash: &Hash) -> Result<(), Error> {
+ let lock = self.data_dir_lock.lock().await;
+
let path = self.block_path(hash);
let exists = fs::metadata(&path).await.is_ok();
let needed = self
.rc
.get(hash.as_ref())?
- .map(|x| u64_from_bytes(x.as_ref()) > 0)
+ .map(|x| u64_from_be_bytes(x) > 0)
.unwrap_or(false);
if exists != needed {
@@ -305,9 +319,10 @@ impl BlockManager {
if exists && !needed {
trace!("Offloading block {:?}", hash);
- let ring = self.system.ring.borrow().clone();
-
- let mut who = self.replication.replication_nodes(&hash, &ring);
+ let mut who = self.replication.write_nodes(&hash);
+ if who.len() < self.replication.write_quorum() {
+ return Err(Error::Message(format!("Not trying to offload block because we don't have a quorum of nodes to write to")));
+ }
who.retain(|id| *id != self.system.id);
let msg = Arc::new(Message::NeedBlockQuery(*hash));
@@ -340,17 +355,17 @@ impl BlockManager {
need_nodes.len()
);
- let put_block_message = Arc::new(self.read_block(hash).await?);
- let put_resps = join_all(need_nodes.iter().map(|to| {
- self.rpc_client
- .call_arc(*to, put_block_message.clone(), BLOCK_RW_TIMEOUT)
- }))
- .await;
- for resp in put_resps {
- resp?;
- }
+ let put_block_message = self.read_block(hash).await?;
+ self.rpc_client
+ .try_call_many(
+ &need_nodes[..],
+ put_block_message,
+ RequestStrategy::with_quorum(need_nodes.len())
+ .with_timeout(BLOCK_RW_TIMEOUT),
+ )
+ .await?;
}
- trace!(
+ info!(
"Deleting block {:?}, offload finished ({} / {})",
hash,
need_nodes.len(),
@@ -358,10 +373,11 @@ impl BlockManager {
);
fs::remove_file(path).await?;
- self.resync_queue.remove(&hash)?;
}
if needed && !exists {
+ drop(lock);
+
// TODO find a way to not do this if they are sending it to us
// Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay
// between the RC being incremented and this part being called.
@@ -373,7 +389,7 @@ impl BlockManager {
}
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
- let who = self.replication.read_nodes(&hash, &self.system);
+ let who = self.replication.read_nodes(&hash);
let resps = self
.rpc_client
.try_call_many(
@@ -397,12 +413,12 @@ impl BlockManager {
}
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
- let who = self.replication.write_nodes(&hash, &self.system);
+ let who = self.replication.write_nodes(&hash);
self.rpc_client
.try_call_many(
&who[..],
Message::PutBlock(PutBlockMessage { hash, data }),
- RequestStrategy::with_quorum(self.replication.write_quorum(&self.system))
+ RequestStrategy::with_quorum(self.replication.write_quorum())
.with_timeout(BLOCK_RW_TIMEOUT),
)
.await?;
@@ -414,15 +430,15 @@ impl BlockManager {
let garage = self.garage.load_full().unwrap();
let mut last_hash = None;
let mut i = 0usize;
- for entry in garage.block_ref_table.store.iter() {
+ for entry in garage.block_ref_table.data.store.iter() {
let (_k, v_bytes) = entry?;
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(v_bytes.as_ref())?;
if Some(&block_ref.block) == last_hash.as_ref() {
continue;
}
- if !block_ref.deleted {
+ if !block_ref.deleted.get() {
last_hash = Some(block_ref.block);
- self.put_to_resync(&block_ref.block, 0)?;
+ self.put_to_resync(&block_ref.block, Duration::from_secs(0))?;
}
i += 1;
if i & 0xFF == 0 && *must_exit.borrow() {
@@ -447,8 +463,12 @@ impl BlockManager {
// so that we can offload them if necessary and then delete them locally.
async move {
let mut ls_data_dir = fs::read_dir(path).await?;
- while let Some(data_dir_ent) = ls_data_dir.next().await {
- let data_dir_ent = data_dir_ent?;
+ loop {
+ let data_dir_ent = ls_data_dir.next_entry().await?;
+ let data_dir_ent = match data_dir_ent {
+ Some(x) => x,
+ None => break,
+ };
let name = data_dir_ent.file_name();
let name = match name.into_string() {
Ok(x) => x,
@@ -466,7 +486,7 @@ impl BlockManager {
};
let mut hash = [0u8; 32];
hash.copy_from_slice(&hash_bytes[..]);
- self.put_to_resync(&hash.into(), 0)?;
+ self.put_to_resync(&hash.into(), Duration::from_secs(0))?;
}
if *must_exit.borrow() {
@@ -477,32 +497,19 @@ impl BlockManager {
}
.boxed()
}
+
+ pub fn resync_queue_len(&self) -> usize {
+ self.resync_queue.len()
+ }
+
+ pub fn rc_len(&self) -> usize {
+ self.rc.len()
+ }
}
-fn u64_from_bytes(bytes: &[u8]) -> u64 {
- assert!(bytes.len() == 8);
+fn u64_from_be_bytes<T: AsRef<[u8]>>(bytes: T) -> u64 {
+ assert!(bytes.as_ref().len() == 8);
let mut x8 = [0u8; 8];
- x8.copy_from_slice(bytes);
+ x8.copy_from_slice(bytes.as_ref());
u64::from_be_bytes(x8)
}
-
-fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> {
- let old = old.map(u64_from_bytes).unwrap_or(0);
- assert!(new.len() == 1);
- let new = match new[0] {
- 0 => {
- if old > 0 {
- old - 1
- } else {
- 0
- }
- }
- 1 => old + 1,
- _ => unreachable!(),
- };
- if new == 0 {
- None
- } else {
- Some(u64::to_be_bytes(new).to_vec())
- }
-}
diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs
index 9ab67737..e4372717 100644
--- a/src/model/block_ref_table.rs
+++ b/src/model/block_ref_table.rs
@@ -1,9 +1,9 @@
use serde::{Deserialize, Serialize};
use std::sync::Arc;
-use garage_util::background::*;
use garage_util::data::*;
+use garage_table::crdt::CRDT;
use garage_table::*;
use crate::block::*;
@@ -17,7 +17,7 @@ pub struct BlockRef {
pub version: UUID,
// Keep track of deleted status
- pub deleted: bool,
+ pub deleted: crdt::Bool,
}
impl Entry<Hash, UUID> for BlockRef {
@@ -27,16 +27,18 @@ impl Entry<Hash, UUID> for BlockRef {
fn sort_key(&self) -> &UUID {
&self.version
}
+ fn is_tombstone(&self) -> bool {
+ self.deleted.get()
+ }
+}
+impl CRDT for BlockRef {
fn merge(&mut self, other: &Self) {
- if other.deleted {
- self.deleted = true;
- }
+ self.deleted.merge(&other.deleted);
}
}
pub struct BlockRefTable {
- pub background: Arc<BackgroundRunner>,
pub block_manager: Arc<BlockManager>,
}
@@ -48,8 +50,8 @@ impl TableSchema for BlockRefTable {
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let block = &old.as_ref().or(new.as_ref()).unwrap().block;
- let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false);
- let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false);
+ let was_before = old.as_ref().map(|x| !x.deleted.get()).unwrap_or(false);
+ let is_after = new.as_ref().map(|x| !x.deleted.get()).unwrap_or(false);
if is_after && !was_before {
if let Err(e) = self.block_manager.block_incref(block) {
warn!("block_incref failed for block {:?}: {}", block, e);
@@ -63,6 +65,6 @@ impl TableSchema for BlockRefTable {
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
- filter.apply(entry.deleted)
+ filter.apply(entry.deleted.get())
}
}
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index 2878aa38..6330dced 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -5,11 +5,6 @@ use garage_table::*;
use crate::key_table::PermissionSet;
-// We import the same file but in its version 0.1.0.
-// We can then access v0.1.0 data structures.
-// We use them to perform migrations.
-use model010::bucket_table as prev;
-
/// A bucket is a collection of objects
///
/// Its parameters are not directly accessible as:
@@ -89,7 +84,9 @@ impl Entry<EmptyKey, String> for Bucket {
fn sort_key(&self) -> &String {
&self.name
}
+}
+impl CRDT for Bucket {
fn merge(&mut self, other: &Self) {
self.state.merge(&other.state);
}
@@ -106,39 +103,4 @@ impl TableSchema for BucketTable {
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
filter.apply(entry.is_deleted())
}
-
- fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
- let old = match rmp_serde::decode::from_read_ref::<_, prev::Bucket>(bytes) {
- Ok(x) => x,
- Err(_) => return None,
- };
- if old.deleted {
- Some(Bucket {
- name: old.name,
- state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Deleted),
- })
- } else {
- let mut keys = crdt::LWWMap::new();
- for ak in old.authorized_keys() {
- keys.merge(&crdt::LWWMap::migrate_from_raw_item(
- ak.key_id.clone(),
- ak.timestamp,
- PermissionSet {
- allow_read: ak.allow_read,
- allow_write: ak.allow_write,
- },
- ));
- }
-
- let params = BucketParams {
- authorized_keys: keys,
- website: crdt::LWW::new(false),
- };
-
- Some(Bucket {
- name: old.name,
- state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Present(params)),
- })
- }
- }
}
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 467d0aec..5f7a67c9 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -7,8 +7,8 @@ use garage_rpc::membership::System;
use garage_rpc::rpc_client::RpcHttpClient;
use garage_rpc::rpc_server::RpcServer;
-use garage_table::table_fullcopy::*;
-use garage_table::table_sharded::*;
+use garage_table::replication::fullcopy::*;
+use garage_table::replication::sharded::*;
use garage_table::*;
use crate::block::*;
@@ -35,7 +35,7 @@ pub struct Garage {
}
impl Garage {
- pub async fn new(
+ pub fn new(
config: Config,
db: sled::Db,
background: Arc<BackgroundRunner>,
@@ -54,18 +54,23 @@ impl Garage {
);
let data_rep_param = TableShardedReplication {
+ system: system.clone(),
replication_factor: config.data_replication_factor,
write_quorum: (config.data_replication_factor + 1) / 2,
read_quorum: 1,
};
let meta_rep_param = TableShardedReplication {
+ system: system.clone(),
replication_factor: config.meta_replication_factor,
write_quorum: (config.meta_replication_factor + 1) / 2,
read_quorum: (config.meta_replication_factor + 1) / 2,
};
- let control_rep_param = TableFullReplication::new(config.control_write_max_faults);
+ let control_rep_param = TableFullReplication {
+ system: system.clone(),
+ max_faults: config.control_write_max_faults,
+ };
info!("Initialize block manager...");
let block_manager = BlockManager::new(
@@ -79,7 +84,6 @@ impl Garage {
info!("Initialize block_ref_table...");
let block_ref_table = Table::new(
BlockRefTable {
- background: background.clone(),
block_manager: block_manager.clone(),
},
data_rep_param.clone(),
@@ -87,8 +91,7 @@ impl Garage {
&db,
"block_ref".to_string(),
rpc_server,
- )
- .await;
+ );
info!("Initialize version_table...");
let version_table = Table::new(
@@ -101,8 +104,7 @@ impl Garage {
&db,
"version".to_string(),
rpc_server,
- )
- .await;
+ );
info!("Initialize object_table...");
let object_table = Table::new(
@@ -115,8 +117,7 @@ impl Garage {
&db,
"object".to_string(),
rpc_server,
- )
- .await;
+ );
info!("Initialize bucket_table...");
let bucket_table = Table::new(
@@ -126,8 +127,7 @@ impl Garage {
&db,
"bucket".to_string(),
rpc_server,
- )
- .await;
+ );
info!("Initialize key_table_table...");
let key_table = Table::new(
@@ -137,8 +137,7 @@ impl Garage {
&db,
"key".to_string(),
rpc_server,
- )
- .await;
+ );
info!("Initialize Garage...");
let garage = Arc::new(Self {
@@ -156,7 +155,7 @@ impl Garage {
info!("Start block manager background thread...");
garage.block_manager.garage.swap(Some(garage.clone()));
- garage.block_manager.clone().spawn_background_worker().await;
+ garage.block_manager.clone().spawn_background_worker();
garage
}
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index 5942df75..fcca3835 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -1,10 +1,8 @@
use serde::{Deserialize, Serialize};
-use garage_table::crdt::CRDT;
+use garage_table::crdt::*;
use garage_table::*;
-use model010::key_table as prev;
-
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Key {
// Primary key
@@ -36,6 +34,15 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(),
}
}
+ pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self {
+ Self {
+ key_id: key_id.to_string(),
+ secret_key: secret_key.to_string(),
+ name: crdt::LWW::new(name.to_string()),
+ deleted: crdt::Bool::new(false),
+ authorized_buckets: crdt::LWWMap::new(),
+ }
+ }
pub fn delete(key_id: String) -> Self {
Self {
key_id,
@@ -66,6 +73,10 @@ pub struct PermissionSet {
pub allow_write: bool,
}
+impl AutoCRDT for PermissionSet {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
impl Entry<EmptyKey, String> for Key {
fn partition_key(&self) -> &EmptyKey {
&EmptyKey
@@ -73,55 +84,43 @@ impl Entry<EmptyKey, String> for Key {
fn sort_key(&self) -> &String {
&self.key_id
}
+}
+impl CRDT for Key {
fn merge(&mut self, other: &Self) {
self.name.merge(&other.name);
self.deleted.merge(&other.deleted);
if self.deleted.get() {
self.authorized_buckets.clear();
- return;
+ } else {
+ self.authorized_buckets.merge(&other.authorized_buckets);
}
-
- self.authorized_buckets.merge(&other.authorized_buckets);
}
}
pub struct KeyTable;
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub enum KeyFilter {
+ Deleted(DeletedFilter),
+ Matches(String),
+}
+
impl TableSchema for KeyTable {
type P = EmptyKey;
type S = String;
type E = Key;
- type Filter = DeletedFilter;
+ type Filter = KeyFilter;
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
- filter.apply(entry.deleted.get())
- }
-
- fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
- let old = match rmp_serde::decode::from_read_ref::<_, prev::Key>(bytes) {
- Ok(x) => x,
- Err(_) => return None,
- };
- let mut new = Self::E {
- key_id: old.key_id.clone(),
- secret_key: old.secret_key.clone(),
- name: crdt::LWW::migrate_from_raw(old.name_timestamp, old.name.clone()),
- deleted: crdt::Bool::new(old.deleted),
- authorized_buckets: crdt::LWWMap::new(),
- };
- for ab in old.authorized_buckets() {
- let it = crdt::LWWMap::migrate_from_raw_item(
- ab.bucket.clone(),
- ab.timestamp,
- PermissionSet {
- allow_read: ab.allow_read,
- allow_write: ab.allow_write,
- },
- );
- new.authorized_buckets.merge(&it);
+ match filter {
+ KeyFilter::Deleted(df) => df.apply(entry.deleted.get()),
+ KeyFilter::Matches(pat) => {
+ let pat = pat.to_lowercase();
+ entry.key_id.to_lowercase().starts_with(&pat)
+ || entry.name.get().to_lowercase() == pat
+ }
}
- Some(new)
}
}
diff --git a/src/model/object_table.rs b/src/model/object_table.rs
index 16cce72c..62606df4 100644
--- a/src/model/object_table.rs
+++ b/src/model/object_table.rs
@@ -5,13 +5,12 @@ use std::sync::Arc;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
-use garage_table::table_sharded::*;
+use garage_table::crdt::*;
+use garage_table::replication::sharded::*;
use garage_table::*;
use crate::version_table::*;
-use model010::object_table as prev;
-
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
// Primary key
@@ -70,7 +69,7 @@ pub enum ObjectVersionState {
Aborted,
}
-impl ObjectVersionState {
+impl CRDT for ObjectVersionState {
fn merge(&mut self, other: &Self) {
use ObjectVersionState::*;
match other {
@@ -91,37 +90,30 @@ impl ObjectVersionState {
}
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData {
DeleteMarker,
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
FirstBlock(ObjectVersionMeta, Hash),
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+impl AutoCRDT for ObjectVersionData {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta {
pub headers: ObjectVersionHeaders,
pub size: u64,
pub etag: String,
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionHeaders {
pub content_type: String,
pub other: BTreeMap<String, String>,
}
-impl ObjectVersionData {
- fn merge(&mut self, b: &Self) {
- if *self != *b {
- warn!(
- "Inconsistent object version data: {:?} (local) vs {:?} (remote)",
- self, b
- );
- }
- }
-}
-
impl ObjectVersion {
fn cmp_key(&self) -> (u64, UUID) {
(self.timestamp, self.uuid)
@@ -154,8 +146,14 @@ impl Entry<String, String> for Object {
fn sort_key(&self) -> &String {
&self.key
}
+ fn is_tombstone(&self) -> bool {
+ self.versions.len() == 1 && self.versions[0].state == ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
+ }
+}
+impl CRDT for Object {
fn merge(&mut self, other: &Self) {
+ // Merge versions from other into here
for other_v in other.versions.iter() {
match self
.versions
@@ -169,6 +167,9 @@ impl Entry<String, String> for Object {
}
}
}
+
+ // Remove versions which are obsolete, i.e. those that come
+ // before the last version which .is_complete().
let last_complete = self
.versions
.iter()
@@ -212,13 +213,8 @@ impl TableSchema for ObjectTable {
}
};
if newly_deleted {
- let deleted_version = Version::new(
- v.uuid,
- old_v.bucket.clone(),
- old_v.key.clone(),
- true,
- vec![],
- );
+ let deleted_version =
+ Version::new(v.uuid, old_v.bucket.clone(), old_v.key.clone(), true);
version_table.insert(&deleted_version).await?;
}
}
@@ -231,55 +227,4 @@ impl TableSchema for ObjectTable {
let deleted = !entry.versions.iter().any(|v| v.is_data());
filter.apply(deleted)
}
-
- fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
- let old = match rmp_serde::decode::from_read_ref::<_, prev::Object>(bytes) {
- Ok(x) => x,
- Err(_) => return None,
- };
- let new_v = old
- .versions()
- .iter()
- .map(migrate_version)
- .collect::<Vec<_>>();
- let new = Object::new(old.bucket.clone(), old.key.clone(), new_v);
- Some(new)
- }
-}
-
-fn migrate_version(old: &prev::ObjectVersion) -> ObjectVersion {
- let headers = ObjectVersionHeaders {
- content_type: old.mime_type.clone(),
- other: BTreeMap::new(),
- };
- let meta = ObjectVersionMeta {
- headers: headers.clone(),
- size: old.size,
- etag: "".to_string(),
- };
- let state = match old.state {
- prev::ObjectVersionState::Uploading => ObjectVersionState::Uploading(headers),
- prev::ObjectVersionState::Aborted => ObjectVersionState::Aborted,
- prev::ObjectVersionState::Complete => match &old.data {
- prev::ObjectVersionData::Uploading => ObjectVersionState::Uploading(headers),
- prev::ObjectVersionData::DeleteMarker => {
- ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
- }
- prev::ObjectVersionData::Inline(x) => {
- ObjectVersionState::Complete(ObjectVersionData::Inline(meta, x.clone()))
- }
- prev::ObjectVersionData::FirstBlock(h) => {
- let mut hash = [0u8; 32];
- hash.copy_from_slice(h.as_ref());
- ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, Hash::from(hash)))
- }
- },
- };
- let mut uuid = [0u8; 32];
- uuid.copy_from_slice(old.uuid.as_ref());
- ObjectVersion {
- uuid: UUID::from(uuid),
- timestamp: old.timestamp,
- state,
- }
}
diff --git a/src/model/version_table.rs b/src/model/version_table.rs
index cf9fbe98..841fbfea 100644
--- a/src/model/version_table.rs
+++ b/src/model/version_table.rs
@@ -4,7 +4,8 @@ use std::sync::Arc;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
-use garage_table::table_sharded::*;
+use garage_table::crdt::*;
+use garage_table::replication::sharded::*;
use garage_table::*;
use crate::block_ref_table::*;
@@ -15,8 +16,11 @@ pub struct Version {
pub uuid: UUID,
// Actual data: the blocks for this version
- pub deleted: bool,
- blocks: Vec<VersionBlock>,
+ // In the case of a multipart upload, also store the etags
+ // of individual parts and check them when doing CompleteMultipartUpload
+ pub deleted: crdt::Bool,
+ pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
+ pub parts_etags: crdt::Map<u64, String>,
// Back link to bucket+key so that we can figure if
// this was deleted later on
@@ -25,56 +29,46 @@ pub struct Version {
}
impl Version {
- pub fn new(
- uuid: UUID,
- bucket: String,
- key: String,
- deleted: bool,
- blocks: Vec<VersionBlock>,
- ) -> Self {
- let mut ret = Self {
+ pub fn new(uuid: UUID, bucket: String, key: String, deleted: bool) -> Self {
+ Self {
uuid,
- deleted,
- blocks: vec![],
+ deleted: deleted.into(),
+ blocks: crdt::Map::new(),
+ parts_etags: crdt::Map::new(),
bucket,
key,
- };
- for b in blocks {
- ret.add_block(b)
- .expect("Twice the same VersionBlock in Version constructor");
}
- ret
}
- /// Adds a block if it wasn't already present
- pub fn add_block(&mut self, new: VersionBlock) -> Result<(), ()> {
- match self
- .blocks
- .binary_search_by(|b| b.cmp_key().cmp(&new.cmp_key()))
- {
- Err(i) => {
- self.blocks.insert(i, new);
- Ok(())
- }
- Ok(_) => Err(()),
- }
+}
+
+#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
+pub struct VersionBlockKey {
+ pub part_number: u64,
+ pub offset: u64,
+}
+
+impl Ord for VersionBlockKey {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ self.part_number
+ .cmp(&other.part_number)
+ .then(self.offset.cmp(&other.offset))
}
- pub fn blocks(&self) -> &[VersionBlock] {
- &self.blocks[..]
+}
+
+impl PartialOrd for VersionBlockKey {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ Some(self.cmp(other))
}
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlock {
- pub part_number: u64,
- pub offset: u64,
pub hash: Hash,
pub size: u64,
}
-impl VersionBlock {
- fn cmp_key(&self) -> (u64, u64) {
- (self.part_number, self.offset)
- }
+impl AutoCRDT for VersionBlock {
+ const WARN_IF_DIFFERENT: bool = true;
}
impl Entry<Hash, EmptyKey> for Version {
@@ -84,23 +78,21 @@ impl Entry<Hash, EmptyKey> for Version {
fn sort_key(&self) -> &EmptyKey {
&EmptyKey
}
+ fn is_tombstone(&self) -> bool {
+ self.deleted.get()
+ }
+}
+impl CRDT for Version {
fn merge(&mut self, other: &Self) {
- if other.deleted {
- self.deleted = true;
+ self.deleted.merge(&other.deleted);
+
+ if self.deleted.get() {
self.blocks.clear();
- } else if !self.deleted {
- for bi in other.blocks.iter() {
- match self
- .blocks
- .binary_search_by(|x| x.cmp_key().cmp(&bi.cmp_key()))
- {
- Ok(_) => (),
- Err(pos) => {
- self.blocks.insert(pos, bi.clone());
- }
- }
- }
+ self.parts_etags.clear();
+ } else {
+ self.blocks.merge(&other.blocks);
+ self.parts_etags.merge(&other.parts_etags);
}
}
}
@@ -121,14 +113,15 @@ impl TableSchema for VersionTable {
self.background.spawn(async move {
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of version blocks
- if new_v.deleted && !old_v.deleted {
+ if new_v.deleted.get() && !old_v.deleted.get() {
let deleted_block_refs = old_v
.blocks
+ .items()
.iter()
- .map(|vb| BlockRef {
+ .map(|(_k, vb)| BlockRef {
block: vb.hash,
version: old_v.uuid,
- deleted: true,
+ deleted: true.into(),
})
.collect::<Vec<_>>();
block_ref_table.insert_many(&deleted_block_refs[..]).await?;
@@ -139,6 +132,6 @@ impl TableSchema for VersionTable {
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
- filter.apply(entry.deleted)
+ filter.apply(entry.deleted.get())
}
}
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 48f05755..fbe826a8 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -15,27 +15,26 @@ path = "lib.rs"
[dependencies]
garage_util = { version = "0.1.1", path = "../util" }
-bytes = "0.4"
-rand = "0.7"
-hex = "0.3"
-sha2 = "0.8"
-arc-swap = "0.4"
+bytes = "1.0"
+hex = "0.4"
+arc-swap = "1.0"
gethostname = "0.2"
log = "0.4"
-rmp-serde = "0.14.3"
+rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_json = "1.0"
futures = "0.3"
futures-util = "0.3"
-tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
+tokio-stream = { version = "0.1", features = ["net"] }
http = "0.2"
-hyper = "0.13"
-rustls = "0.17"
-tokio-rustls = "0.13"
-hyper-rustls = { version = "0.20", default-features = false }
+hyper = { version = "0.14", features = ["full"] }
+rustls = "0.19"
+tokio-rustls = "0.22"
+hyper-rustls = { version = "0.22", default-features = false }
webpki = "0.21"
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index 44d7122a..4e9822fa 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -11,13 +11,14 @@ use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use serde::{Deserialize, Serialize};
-use tokio::prelude::*;
+use tokio::io::AsyncWriteExt;
use tokio::sync::watch;
use tokio::sync::Mutex;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
+use garage_util::time::*;
use crate::consul::get_consul_nodes;
use crate::ring::*;
@@ -315,23 +316,17 @@ impl System {
self.clone().ping_nodes(bootstrap_peers).await;
let self2 = self.clone();
- self.clone()
- .background
+ self.background
.spawn_worker(format!("ping loop"), |stop_signal| {
- self2.ping_loop(stop_signal).map(Ok)
- })
- .await;
+ self2.ping_loop(stop_signal)
+ });
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
let self2 = self.clone();
- self.clone()
- .background
+ self.background
.spawn_worker(format!("Consul loop"), |stop_signal| {
- self2
- .consul_loop(stop_signal, consul_host, consul_service_name)
- .map(Ok)
- })
- .await;
+ self2.consul_loop(stop_signal, consul_host, consul_service_name)
+ });
}
}
@@ -399,7 +394,7 @@ impl System {
if has_changes {
status.recalculate_hash();
}
- if let Err(e) = update_locked.0.broadcast(Arc::new(status)) {
+ if let Err(e) = update_locked.0.send(Arc::new(status)) {
error!("In ping_nodes: could not save status update ({})", e);
}
drop(update_locked);
@@ -425,7 +420,7 @@ impl System {
let status_hash = status.hash;
let config_version = self.ring.borrow().config.version;
- update_locked.0.broadcast(Arc::new(status))?;
+ update_locked.0.send(Arc::new(status))?;
drop(update_locked);
if is_new || status_hash != ping.status_hash {
@@ -507,7 +502,7 @@ impl System {
if has_changed {
status.recalculate_hash();
}
- update_lock.0.broadcast(Arc::new(status))?;
+ update_lock.0.send(Arc::new(status))?;
drop(update_lock);
if to_ping.len() > 0 {
@@ -527,7 +522,7 @@ impl System {
if adv.version > ring.config.version {
let ring = Ring::new(adv.clone());
- update_lock.1.broadcast(Arc::new(ring))?;
+ update_lock.1.send(Arc::new(ring))?;
drop(update_lock);
self.background.spawn_cancellable(
@@ -543,7 +538,7 @@ impl System {
async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
loop {
- let restart_at = tokio::time::delay_for(PING_INTERVAL);
+ let restart_at = tokio::time::sleep(PING_INTERVAL);
let status = self.status.borrow().clone();
let ping_addrs = status
@@ -557,10 +552,9 @@ impl System {
select! {
_ = restart_at.fuse() => (),
- must_exit = stop_signal.recv().fuse() => {
- match must_exit {
- None | Some(true) => return,
- _ => (),
+ _ = stop_signal.changed().fuse() => {
+ if *stop_signal.borrow() {
+ return;
}
}
}
@@ -573,8 +567,8 @@ impl System {
consul_host: String,
consul_service_name: String,
) {
- loop {
- let restart_at = tokio::time::delay_for(CONSUL_INTERVAL);
+ while !*stop_signal.borrow() {
+ let restart_at = tokio::time::sleep(CONSUL_INTERVAL);
match get_consul_nodes(&consul_host, &consul_service_name).await {
Ok(mut node_list) => {
@@ -588,12 +582,7 @@ impl System {
select! {
_ = restart_at.fuse() => (),
- must_exit = stop_signal.recv().fuse() => {
- match must_exit {
- None | Some(true) => return,
- _ => (),
- }
- }
+ _ = stop_signal.changed().fuse() => (),
}
}
}
diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs
index 85caafeb..2e997523 100644
--- a/src/rpc/ring.rs
+++ b/src/rpc/ring.rs
@@ -5,6 +5,11 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*;
+// A partition number is encoded on 16 bits,
+// i.e. we have up to 2**16 partitions.
+// (in practice we have exactly 2**PARTITION_BITS partitions)
+pub type Partition = u16;
+
// TODO: make this constant parametrizable in the config file
// For deployments with many nodes it might make sense to bump
// it up to 10.
@@ -161,29 +166,48 @@ impl Ring {
})
.collect::<Vec<_>>();
- eprintln!("RING: --");
- for e in ring.iter() {
- eprintln!("{:?}", e);
- }
- eprintln!("END --");
+ // eprintln!("RING: --");
+ // for e in ring.iter() {
+ // eprintln!("{:?}", e);
+ // }
+ // eprintln!("END --");
Self { config, ring }
}
+ pub fn partition_of(&self, from: &Hash) -> Partition {
+ let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
+ top >> (16 - PARTITION_BITS)
+ }
+
+ pub fn partitions(&self) -> Vec<(Partition, Hash)> {
+ let mut ret = vec![];
+
+ for (i, entry) in self.ring.iter().enumerate() {
+ ret.push((i as u16, entry.location));
+ }
+ if ret.len() > 0 {
+ assert_eq!(ret[0].1, [0u8; 32].into());
+ }
+
+ ret
+ }
+
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
if self.ring.len() != 1 << PARTITION_BITS {
- warn!("Ring not yet ready, read/writes will be lost");
+ warn!("Ring not yet ready, read/writes will be lost!");
return vec![];
}
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
-
let partition_idx = (top >> (16 - PARTITION_BITS)) as usize;
+ assert_eq!(partition_idx, self.partition_of(from) as usize);
+
let partition = &self.ring[partition_idx];
let partition_top =
u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap());
- assert!(partition_top & PARTITION_MASK_U16 == top & PARTITION_MASK_U16);
+ assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16);
assert!(n <= partition.nodes.len());
partition.nodes[..n].iter().cloned().collect::<Vec<_>>()
diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs
index 70384391..eb4f6620 100644
--- a/src/rpc/rpc_client.rs
+++ b/src/rpc/rpc_client.rs
@@ -7,7 +7,6 @@ use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwapOption;
-use bytes::IntoBuf;
use futures::future::Future;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
@@ -197,11 +196,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
if !strategy.rs_interrupt_after_quorum {
let wait_finished_fut = tokio::spawn(async move {
resp_stream.collect::<Vec<_>>().await;
- Ok(())
});
- self.background.spawn(wait_finished_fut.map(|x| {
- x.unwrap_or_else(|e| Err(Error::Message(format!("Await failed: {}", e))))
- }));
+ self.background.spawn(wait_finished_fut.map(|_| Ok(())));
}
Ok(results)
@@ -336,7 +332,7 @@ impl RpcHttpClient {
let body = hyper::body::to_bytes(resp.into_body()).await?;
drop(slot);
- match rmp_serde::decode::from_read::<_, Result<M, String>>(body.into_buf())? {
+ match rmp_serde::decode::from_read::<_, Result<M, String>>(&body[..])? {
Err(e) => Ok(Err(Error::RemoteError(e, status))),
Ok(x) => Ok(Ok(x)),
}
diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs
index 1c6bc8d2..0d82d796 100644
--- a/src/rpc/rpc_server.rs
+++ b/src/rpc/rpc_server.rs
@@ -4,7 +4,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
-use bytes::IntoBuf;
use futures::future::Future;
use futures_util::future::*;
use futures_util::stream::*;
@@ -15,6 +14,7 @@ use serde::{Deserialize, Serialize};
use tokio::net::{TcpListener, TcpStream};
use tokio_rustls::server::TlsStream;
use tokio_rustls::TlsAcceptor;
+use tokio_stream::wrappers::TcpListenerStream;
use garage_util::config::TlsConfig;
use garage_util::data::*;
@@ -47,11 +47,15 @@ where
{
let begin_time = Instant::now();
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
- let msg = rmp_serde::decode::from_read::<_, M>(whole_body.into_buf())?;
+ let msg = rmp_serde::decode::from_read::<_, M>(&whole_body[..])?;
trace!(
"Request message: {}",
- serde_json::to_string(&msg).unwrap_or("<json error>".into())
+ serde_json::to_string(&msg)
+ .unwrap_or("<json error>".into())
+ .chars()
+ .take(100)
+ .collect::<String>()
);
match handler(msg, sockaddr).await {
@@ -171,8 +175,8 @@ impl RpcServer {
config.set_single_cert([&node_certs[..], &ca_certs[..]].concat(), node_key)?;
let tls_acceptor = Arc::new(TlsAcceptor::from(Arc::new(config)));
- let mut listener = TcpListener::bind(&self.bind_addr).await?;
- let incoming = listener.incoming().filter_map(|socket| async {
+ let listener = TcpListener::bind(&self.bind_addr).await?;
+ let incoming = TcpListenerStream::new(listener).filter_map(|socket| async {
match socket {
Ok(stream) => match tls_acceptor.clone().accept(stream).await {
Ok(x) => Some(Ok::<_, hyper::Error>(x)),
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index 6485a542..f9d98dec 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -16,21 +16,18 @@ path = "lib.rs"
garage_util = { version = "0.1.1", path = "../util" }
garage_rpc = { version = "0.1.1", path = "../rpc" }
-bytes = "0.4"
-rand = "0.7"
-hex = "0.3"
-arc-swap = "0.4"
+bytes = "1.0"
+rand = "0.8"
log = "0.4"
hexdump = "0.1"
sled = "0.34"
-rmp-serde = "0.14.3"
+rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
-async-trait = "0.1.30"
futures = "0.3"
futures-util = "0.3"
-tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
diff --git a/src/table/crdt.rs b/src/table/crdt.rs
deleted file mode 100644
index 4cba10ce..00000000
--- a/src/table/crdt.rs
+++ /dev/null
@@ -1,327 +0,0 @@
-//! This package provides a simple implementation of conflict-free replicated data types (CRDTs)
-//!
-//! CRDTs are a type of data structures that do not require coordination. In other words, we can
-//! edit them in parallel, we will always find a way to merge it.
-//!
-//! A general example is a counter. Its initial value is 0. Alice and Bob get a copy of the
-//! counter. Alice does +1 on her copy, she reads 1. Bob does +3 on his copy, he reads 3. Now,
-//! it is easy to merge their counters, order does not count: we always get 4.
-//!
-//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type)
-
-use serde::{Deserialize, Serialize};
-
-use garage_util::data::*;
-
-/// Definition of a CRDT - all CRDT Rust types implement this.
-///
-/// A CRDT is defined as a merge operator that respects a certain set of axioms.
-///
-/// In particular, the merge operator must be commutative, associative,
-/// idempotent, and monotonic.
-/// In other words, if `a`, `b` and `c` are CRDTs, and `⊔` denotes the merge operator,
-/// the following axioms must apply:
-///
-/// ```text
-/// a ⊔ b = b ⊔ a (commutativity)
-/// (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c) (associativity)
-/// (a ⊔ b) ⊔ b = a ⊔ b (idempotence)
-/// ```
-///
-/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order.
-/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`,
-/// as this would imply a cycle in the partial order.
-pub trait CRDT {
- /// Merge the two datastructures according to the CRDT rules.
- /// `self` is modified to contain the merged CRDT value. `other` is not modified.
- ///
- /// # Arguments
- ///
- /// * `other` - the other CRDT we wish to merge with
- fn merge(&mut self, other: &Self);
-}
-
-/// All types that implement `Ord` (a total order) also implement a trivial CRDT
-/// defined by the merge rule: `a ⊔ b = max(a, b)`.
-impl<T> CRDT for T
-where
- T: Ord + Clone,
-{
- fn merge(&mut self, other: &Self) {
- if other > self {
- *self = other.clone();
- }
- }
-}
-
-// ---- LWW Register ----
-
-/// Last Write Win (LWW)
-///
-/// An LWW CRDT associates a timestamp with a value, in order to implement a
-/// time-based reconciliation rule: the most recent write wins.
-/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs
-/// with the same timestamp but different values.
-///
-/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must
-/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to
-/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types
-/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value.
-/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is
-/// generally desirable in this case to never explicitly produce LWW values with the same timestamp
-/// but different inner values, as the rule to keep the maximum value isn't generally the desired
-/// semantics.)
-///
-/// As multiple computers clocks are always desynchronized,
-/// when operations are close enough, it is equivalent to
-/// take one copy and drop the other one.
-///
-/// Given that clocks are not too desynchronized, this assumption
-/// is enough for most cases, as there is few chance that two humans
-/// coordonate themself faster than the time difference between two NTP servers.
-///
-/// As a more concret example, let's suppose you want to upload a file
-/// with the same key (path) in the same bucket at the very same time.
-/// For each request, the file will be timestamped by the receiving server
-/// and may differ from what you observed with your atomic clock!
-///
-/// This scheme is used by AWS S3 or Soundcloud and often without knowing
-/// in entreprise when reconciliating databases with ad-hoc scripts.
-#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
-pub struct LWW<T> {
- ts: u64,
- v: T,
-}
-
-impl<T> LWW<T>
-where
- T: CRDT,
-{
- /// Creates a new CRDT
- ///
- /// CRDT's internal timestamp is set with current node's clock.
- pub fn new(value: T) -> Self {
- Self {
- ts: now_msec(),
- v: value,
- }
- }
-
- /// Build a new CRDT from a previous non-compatible one
- ///
- /// Compared to new, the CRDT's timestamp is not set to now
- /// but must be set to the previous, non-compatible, CRDT's timestamp.
- pub fn migrate_from_raw(ts: u64, value: T) -> Self {
- Self { ts, v: value }
- }
-
- /// Update the LWW CRDT while keeping some causal ordering.
- ///
- /// The timestamp of the LWW CRDT is updated to be the current node's clock
- /// at time of update, or the previous timestamp + 1 if that's bigger,
- /// so that the new timestamp is always strictly larger than the previous one.
- /// This ensures that merging the update with the old value will result in keeping
- /// the updated value.
- pub fn update(&mut self, new_value: T) {
- self.ts = std::cmp::max(self.ts + 1, now_msec());
- self.v = new_value;
- }
-
- /// Get the CRDT value
- pub fn get(&self) -> &T {
- &self.v
- }
-
- /// Get a mutable reference to the CRDT's value
- ///
- /// This is usefull to mutate the inside value without changing the LWW timestamp.
- /// When such mutation is done, the merge between two LWW values is done using the inner
- /// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large
- /// data type, such as a map, and we only want to change a single item in the map.
- /// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification.
- /// This delta consists in a LWW with the same timestamp, and the map
- /// inside only contains the updated value.
- /// The advantage of such a delta is that it is much smaller than the whole map.
- ///
- /// Avoid using this if the inner data type is a primitive type such as a number or a string,
- /// as you will then rely on the merge function defined on `Ord` types by keeping the maximum
- /// of both values.
- pub fn get_mut(&mut self) -> &mut T {
- &mut self.v
- }
-}
-
-impl<T> CRDT for LWW<T>
-where
- T: Clone + CRDT,
-{
- fn merge(&mut self, other: &Self) {
- if other.ts > self.ts {
- self.ts = other.ts;
- self.v = other.v.clone();
- } else if other.ts == self.ts {
- self.v.merge(&other.v);
- }
- }
-}
-
-/// Boolean, where `true` is an absorbing state
-#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
-pub struct Bool(bool);
-
-impl Bool {
- /// Create a new boolean with the specified value
- pub fn new(b: bool) -> Self {
- Self(b)
- }
- /// Set the boolean to true
- pub fn set(&mut self) {
- self.0 = true;
- }
- /// Get the boolean value
- pub fn get(&self) -> bool {
- self.0
- }
-}
-
-impl CRDT for Bool {
- fn merge(&mut self, other: &Self) {
- self.0 = self.0 || other.0;
- }
-}
-
-/// Last Write Win Map
-///
-/// This types defines a CRDT for a map from keys to values.
-/// The values have an associated timestamp, such that the last written value
-/// takes precedence over previous ones. As for the simpler `LWW` type, the value
-/// type `V` is also required to implement the CRDT trait.
-/// We do not encourage mutating the values associated with a given key
-/// without updating the timestamp, in fact at the moment we do not provide a `.get_mut()`
-/// method that would allow that.
-///
-/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order.
-/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization,
-/// such that two values can be compared for equality based on their hashes). As a consequence,
-/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps.
-/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
-/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
-/// actually not losing anything here.
-#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
-pub struct LWWMap<K, V> {
- vals: Vec<(K, u64, V)>,
-}
-
-impl<K, V> LWWMap<K, V>
-where
- K: Ord,
- V: CRDT,
-{
- /// Create a new empty map CRDT
- pub fn new() -> Self {
- Self { vals: vec![] }
- }
- /// Used to migrate from a map defined in an incompatible format. This produces
- /// a map that contains a single item with the specified timestamp (copied from
- /// the incompatible format). Do this as many times as you have items to migrate,
- /// and put them all together using the CRDT merge operator.
- pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
- Self {
- vals: vec![(k, ts, v)],
- }
- }
- /// Returns a map that contains a single mapping from the specified key to the specified value.
- /// This map is a mutator, or a delta-CRDT, such that when it is merged with the original map,
- /// the previous value will be replaced with the one specified here.
- /// The timestamp in the provided mutator is set to the maximum of the current system's clock
- /// and 1 + the previous value's timestamp (if there is one), so that the new value will always
- /// take precedence (LWW rule).
- ///
- /// Typically, to update the value associated to a key in the map, you would do the following:
- ///
- /// ```ignore
- /// let my_update = my_crdt.update_mutator(key_to_modify, new_value);
- /// my_crdt.merge(&my_update);
- /// ```
- ///
- /// However extracting the mutator on its own and only sending that on the network is very
- /// interesting as it is much smaller than the whole map.
- pub fn update_mutator(&self, k: K, new_v: V) -> Self {
- let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
- Ok(i) => {
- let (_, old_ts, _) = self.vals[i];
- let new_ts = std::cmp::max(old_ts + 1, now_msec());
- vec![(k, new_ts, new_v)]
- }
- Err(_) => vec![(k, now_msec(), new_v)],
- };
- Self { vals: new_vals }
- }
- /// Takes all of the values of the map and returns them. The current map is reset to the
- /// empty map. This is very usefull to produce in-place a new map that contains only a delta
- /// that modifies a certain value:
- ///
- /// ```ignore
- /// let mut a = get_my_crdt_value();
- /// let old_a = a.take_and_clear();
- /// a.merge(&old_a.update_mutator(key_to_modify, new_value));
- /// put_my_crdt_value(a);
- /// ```
- ///
- /// Of course in this simple example we could have written simply
- /// `pyt_my_crdt_value(a.update_mutator(key_to_modify, new_value))`,
- /// but in the case where the map is a field in a struct for instance (as is always the case),
- /// this becomes very handy:
- ///
- /// ```ignore
- /// let mut a = get_my_crdt_value();
- /// let old_a_map = a.map_field.take_and_clear();
- /// a.map_field.merge(&old_a_map.update_mutator(key_to_modify, new_value));
- /// put_my_crdt_value(a);
- /// ```
- pub fn take_and_clear(&mut self) -> Self {
- let vals = std::mem::replace(&mut self.vals, vec![]);
- Self { vals }
- }
- /// Removes all values from the map
- pub fn clear(&mut self) {
- self.vals.clear();
- }
- /// Get a reference to the value assigned to a key
- pub fn get(&self, k: &K) -> Option<&V> {
- match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
- Ok(i) => Some(&self.vals[i].2),
- Err(_) => None,
- }
- }
- /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
- /// In most case you will want to ignore the timestamp (second item of the tuple).
- pub fn items(&self) -> &[(K, u64, V)] {
- &self.vals[..]
- }
-}
-
-impl<K, V> CRDT for LWWMap<K, V>
-where
- K: Clone + Ord,
- V: Clone + CRDT,
-{
- fn merge(&mut self, other: &Self) {
- for (k, ts2, v2) in other.vals.iter() {
- match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
- Ok(i) => {
- let (_, ts1, _v1) = &self.vals[i];
- if ts2 > ts1 {
- self.vals[i].1 = *ts2;
- self.vals[i].2 = v2.clone();
- } else if ts1 == ts2 {
- self.vals[i].2.merge(&v2);
- }
- }
- Err(i) => {
- self.vals.insert(i, (k.clone(), *ts2, v2.clone()));
- }
- }
- }
- }
-}
diff --git a/src/table/crdt/bool.rs b/src/table/crdt/bool.rs
new file mode 100644
index 00000000..1989c92e
--- /dev/null
+++ b/src/table/crdt/bool.rs
@@ -0,0 +1,34 @@
+use serde::{Deserialize, Serialize};
+
+use crate::crdt::crdt::*;
+
+/// Boolean, where `true` is an absorbing state
+#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
+pub struct Bool(bool);
+
+impl Bool {
+ /// Create a new boolean with the specified value
+ pub fn new(b: bool) -> Self {
+ Self(b)
+ }
+ /// Set the boolean to true
+ pub fn set(&mut self) {
+ self.0 = true;
+ }
+ /// Get the boolean value
+ pub fn get(&self) -> bool {
+ self.0
+ }
+}
+
+impl From<bool> for Bool {
+ fn from(b: bool) -> Bool {
+ Bool::new(b)
+ }
+}
+
+impl CRDT for Bool {
+ fn merge(&mut self, other: &Self) {
+ self.0 = self.0 || other.0;
+ }
+}
diff --git a/src/table/crdt/crdt.rs b/src/table/crdt/crdt.rs
new file mode 100644
index 00000000..636b6df6
--- /dev/null
+++ b/src/table/crdt/crdt.rs
@@ -0,0 +1,73 @@
+use garage_util::data::*;
+
+/// Definition of a CRDT - all CRDT Rust types implement this.
+///
+/// A CRDT is defined as a merge operator that respects a certain set of axioms.
+///
+/// In particular, the merge operator must be commutative, associative,
+/// idempotent, and monotonic.
+/// In other words, if `a`, `b` and `c` are CRDTs, and `⊔` denotes the merge operator,
+/// the following axioms must apply:
+///
+/// ```text
+/// a ⊔ b = b ⊔ a (commutativity)
+/// (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c) (associativity)
+/// (a ⊔ b) ⊔ b = a ⊔ b (idempotence)
+/// ```
+///
+/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order.
+/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`,
+/// as this would imply a cycle in the partial order.
+pub trait CRDT {
+ /// Merge the two datastructures according to the CRDT rules.
+ /// `self` is modified to contain the merged CRDT value. `other` is not modified.
+ ///
+ /// # Arguments
+ ///
+ /// * `other` - the other CRDT we wish to merge with
+ fn merge(&mut self, other: &Self);
+}
+
+/// All types that implement `Ord` (a total order) can also implement a trivial CRDT
+/// defined by the merge rule: `a ⊔ b = max(a, b)`. Implement this trait for your type
+/// to enable this behavior.
+pub trait AutoCRDT: Ord + Clone + std::fmt::Debug {
+ /// WARN_IF_DIFFERENT: emit a warning when values differ. Set this to true if
+ /// different values in your application should never happen. Set this to false
+ /// if you are actually relying on the semantics of `a ⊔ b = max(a, b)`.
+ const WARN_IF_DIFFERENT: bool;
+}
+
+impl<T> CRDT for T
+where
+ T: AutoCRDT,
+{
+ fn merge(&mut self, other: &Self) {
+ if Self::WARN_IF_DIFFERENT && self != other {
+ warn!(
+ "Different CRDT values should be the same (logic error!): {:?} vs {:?}",
+ self, other
+ );
+ if other > self {
+ *self = other.clone();
+ }
+ warn!("Making an arbitrary choice: {:?}", self);
+ } else {
+ if other > self {
+ *self = other.clone();
+ }
+ }
+ }
+}
+
+impl AutoCRDT for String {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl AutoCRDT for bool {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl AutoCRDT for FixedBytes32 {
+ const WARN_IF_DIFFERENT: bool = true;
+}
diff --git a/src/table/crdt/lww.rs b/src/table/crdt/lww.rs
new file mode 100644
index 00000000..25ecdb07
--- /dev/null
+++ b/src/table/crdt/lww.rs
@@ -0,0 +1,114 @@
+use serde::{Deserialize, Serialize};
+
+use garage_util::time::now_msec;
+
+use crate::crdt::crdt::*;
+
+/// Last Write Win (LWW)
+///
+/// An LWW CRDT associates a timestamp with a value, in order to implement a
+/// time-based reconciliation rule: the most recent write wins.
+/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs
+/// with the same timestamp but different values.
+///
+/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must
+/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to
+/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types
+/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value.
+/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is
+/// generally desirable in this case to never explicitly produce LWW values with the same timestamp
+/// but different inner values, as the rule to keep the maximum value isn't generally the desired
+/// semantics.)
+///
+/// As multiple computers clocks are always desynchronized,
+/// when operations are close enough, it is equivalent to
+/// take one copy and drop the other one.
+///
+/// Given that clocks are not too desynchronized, this assumption
+/// is enough for most cases, as there is few chance that two humans
+/// coordonate themself faster than the time difference between two NTP servers.
+///
+/// As a more concret example, let's suppose you want to upload a file
+/// with the same key (path) in the same bucket at the very same time.
+/// For each request, the file will be timestamped by the receiving server
+/// and may differ from what you observed with your atomic clock!
+///
+/// This scheme is used by AWS S3 or Soundcloud and often without knowing
+/// in entreprise when reconciliating databases with ad-hoc scripts.
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+pub struct LWW<T> {
+ ts: u64,
+ v: T,
+}
+
+impl<T> LWW<T>
+where
+ T: CRDT,
+{
+ /// Creates a new CRDT
+ ///
+ /// CRDT's internal timestamp is set with current node's clock.
+ pub fn new(value: T) -> Self {
+ Self {
+ ts: now_msec(),
+ v: value,
+ }
+ }
+
+ /// Build a new CRDT from a previous non-compatible one
+ ///
+ /// Compared to new, the CRDT's timestamp is not set to now
+ /// but must be set to the previous, non-compatible, CRDT's timestamp.
+ pub fn migrate_from_raw(ts: u64, value: T) -> Self {
+ Self { ts, v: value }
+ }
+
+ /// Update the LWW CRDT while keeping some causal ordering.
+ ///
+ /// The timestamp of the LWW CRDT is updated to be the current node's clock
+ /// at time of update, or the previous timestamp + 1 if that's bigger,
+ /// so that the new timestamp is always strictly larger than the previous one.
+ /// This ensures that merging the update with the old value will result in keeping
+ /// the updated value.
+ pub fn update(&mut self, new_value: T) {
+ self.ts = std::cmp::max(self.ts + 1, now_msec());
+ self.v = new_value;
+ }
+
+ /// Get the CRDT value
+ pub fn get(&self) -> &T {
+ &self.v
+ }
+
+ /// Get a mutable reference to the CRDT's value
+ ///
+ /// This is usefull to mutate the inside value without changing the LWW timestamp.
+ /// When such mutation is done, the merge between two LWW values is done using the inner
+ /// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large
+ /// data type, such as a map, and we only want to change a single item in the map.
+ /// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification.
+ /// This delta consists in a LWW with the same timestamp, and the map
+ /// inside only contains the updated value.
+ /// The advantage of such a delta is that it is much smaller than the whole map.
+ ///
+ /// Avoid using this if the inner data type is a primitive type such as a number or a string,
+ /// as you will then rely on the merge function defined on `Ord` types by keeping the maximum
+ /// of both values.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.v
+ }
+}
+
+impl<T> CRDT for LWW<T>
+where
+ T: Clone + CRDT,
+{
+ fn merge(&mut self, other: &Self) {
+ if other.ts > self.ts {
+ self.ts = other.ts;
+ self.v = other.v.clone();
+ } else if other.ts == self.ts {
+ self.v.merge(&other.v);
+ }
+ }
+}
diff --git a/src/table/crdt/lww_map.rs b/src/table/crdt/lww_map.rs
new file mode 100644
index 00000000..7b372191
--- /dev/null
+++ b/src/table/crdt/lww_map.rs
@@ -0,0 +1,145 @@
+use serde::{Deserialize, Serialize};
+
+use garage_util::time::now_msec;
+
+use crate::crdt::crdt::*;
+
+/// Last Write Win Map
+///
+/// This types defines a CRDT for a map from keys to values.
+/// The values have an associated timestamp, such that the last written value
+/// takes precedence over previous ones. As for the simpler `LWW` type, the value
+/// type `V` is also required to implement the CRDT trait.
+/// We do not encourage mutating the values associated with a given key
+/// without updating the timestamp, in fact at the moment we do not provide a `.get_mut()`
+/// method that would allow that.
+///
+/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order.
+/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization,
+/// such that two values can be compared for equality based on their hashes). As a consequence,
+/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps.
+/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
+/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
+/// actually not losing anything here.
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+pub struct LWWMap<K, V> {
+ vals: Vec<(K, u64, V)>,
+}
+
+impl<K, V> LWWMap<K, V>
+where
+ K: Ord,
+ V: CRDT,
+{
+ /// Create a new empty map CRDT
+ pub fn new() -> Self {
+ Self { vals: vec![] }
+ }
+ /// Used to migrate from a map defined in an incompatible format. This produces
+ /// a map that contains a single item with the specified timestamp (copied from
+ /// the incompatible format). Do this as many times as you have items to migrate,
+ /// and put them all together using the CRDT merge operator.
+ pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
+ Self {
+ vals: vec![(k, ts, v)],
+ }
+ }
+ /// Returns a map that contains a single mapping from the specified key to the specified value.
+ /// This map is a mutator, or a delta-CRDT, such that when it is merged with the original map,
+ /// the previous value will be replaced with the one specified here.
+ /// The timestamp in the provided mutator is set to the maximum of the current system's clock
+ /// and 1 + the previous value's timestamp (if there is one), so that the new value will always
+ /// take precedence (LWW rule).
+ ///
+ /// Typically, to update the value associated to a key in the map, you would do the following:
+ ///
+ /// ```ignore
+ /// let my_update = my_crdt.update_mutator(key_to_modify, new_value);
+ /// my_crdt.merge(&my_update);
+ /// ```
+ ///
+ /// However extracting the mutator on its own and only sending that on the network is very
+ /// interesting as it is much smaller than the whole map.
+ pub fn update_mutator(&self, k: K, new_v: V) -> Self {
+ let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
+ Ok(i) => {
+ let (_, old_ts, _) = self.vals[i];
+ let new_ts = std::cmp::max(old_ts + 1, now_msec());
+ vec![(k, new_ts, new_v)]
+ }
+ Err(_) => vec![(k, now_msec(), new_v)],
+ };
+ Self { vals: new_vals }
+ }
+ /// Takes all of the values of the map and returns them. The current map is reset to the
+ /// empty map. This is very usefull to produce in-place a new map that contains only a delta
+ /// that modifies a certain value:
+ ///
+ /// ```ignore
+ /// let mut a = get_my_crdt_value();
+ /// let old_a = a.take_and_clear();
+ /// a.merge(&old_a.update_mutator(key_to_modify, new_value));
+ /// put_my_crdt_value(a);
+ /// ```
+ ///
+ /// Of course in this simple example we could have written simply
+ /// `pyt_my_crdt_value(a.update_mutator(key_to_modify, new_value))`,
+ /// but in the case where the map is a field in a struct for instance (as is always the case),
+ /// this becomes very handy:
+ ///
+ /// ```ignore
+ /// let mut a = get_my_crdt_value();
+ /// let old_a_map = a.map_field.take_and_clear();
+ /// a.map_field.merge(&old_a_map.update_mutator(key_to_modify, new_value));
+ /// put_my_crdt_value(a);
+ /// ```
+ pub fn take_and_clear(&mut self) -> Self {
+ let vals = std::mem::replace(&mut self.vals, vec![]);
+ Self { vals }
+ }
+ /// Removes all values from the map
+ pub fn clear(&mut self) {
+ self.vals.clear();
+ }
+ /// Get a reference to the value assigned to a key
+ pub fn get(&self, k: &K) -> Option<&V> {
+ match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
+ Ok(i) => Some(&self.vals[i].2),
+ Err(_) => None,
+ }
+ }
+ /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
+ /// In most case you will want to ignore the timestamp (second item of the tuple).
+ pub fn items(&self) -> &[(K, u64, V)] {
+ &self.vals[..]
+ }
+ /// Returns the number of items in the map
+ pub fn len(&self) -> usize {
+ self.vals.len()
+ }
+}
+
+impl<K, V> CRDT for LWWMap<K, V>
+where
+ K: Clone + Ord,
+ V: Clone + CRDT,
+{
+ fn merge(&mut self, other: &Self) {
+ for (k, ts2, v2) in other.vals.iter() {
+ match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
+ Ok(i) => {
+ let (_, ts1, _v1) = &self.vals[i];
+ if ts2 > ts1 {
+ self.vals[i].1 = *ts2;
+ self.vals[i].2 = v2.clone();
+ } else if ts1 == ts2 {
+ self.vals[i].2.merge(&v2);
+ }
+ }
+ Err(i) => {
+ self.vals.insert(i, (k.clone(), *ts2, v2.clone()));
+ }
+ }
+ }
+ }
+}
diff --git a/src/table/crdt/map.rs b/src/table/crdt/map.rs
new file mode 100644
index 00000000..1193e6db
--- /dev/null
+++ b/src/table/crdt/map.rs
@@ -0,0 +1,83 @@
+use serde::{Deserialize, Serialize};
+
+use crate::crdt::crdt::*;
+
+/// Simple CRDT Map
+///
+/// This types defines a CRDT for a map from keys to values. Values are CRDT types which
+/// can have their own updating logic.
+///
+/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order.
+/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization,
+/// such that two values can be compared for equality based on their hashes). As a consequence,
+/// insertions take `O(n)` time. This means that Map should be used for reasonably small maps.
+/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
+/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
+/// actually not losing anything here.
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+pub struct Map<K, V> {
+ vals: Vec<(K, V)>,
+}
+
+impl<K, V> Map<K, V>
+where
+ K: Clone + Ord,
+ V: Clone + CRDT,
+{
+ /// Create a new empty map CRDT
+ pub fn new() -> Self {
+ Self { vals: vec![] }
+ }
+
+ /// Returns a map that contains a single mapping from the specified key to the specified value.
+ /// This can be used to build a delta-mutator:
+ /// when merged with another map, the value will be added or CRDT-merged if a previous
+ /// value already exists.
+ pub fn put_mutator(k: K, v: V) -> Self {
+ Self { vals: vec![(k, v)] }
+ }
+
+ pub fn put(&mut self, k: K, v: V) {
+ self.merge(&Self::put_mutator(k, v));
+ }
+
+ /// Removes all values from the map
+ pub fn clear(&mut self) {
+ self.vals.clear();
+ }
+
+ /// Get a reference to the value assigned to a key
+ pub fn get(&self, k: &K) -> Option<&V> {
+ match self.vals.binary_search_by(|(k2, _)| k2.cmp(&k)) {
+ Ok(i) => Some(&self.vals[i].1),
+ Err(_) => None,
+ }
+ }
+ /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
+ pub fn items(&self) -> &[(K, V)] {
+ &self.vals[..]
+ }
+ /// Returns the number of items in the map
+ pub fn len(&self) -> usize {
+ self.vals.len()
+ }
+}
+
+impl<K, V> CRDT for Map<K, V>
+where
+ K: Clone + Ord,
+ V: Clone + CRDT,
+{
+ fn merge(&mut self, other: &Self) {
+ for (k, v2) in other.vals.iter() {
+ match self.vals.binary_search_by(|(k2, _)| k2.cmp(&k)) {
+ Ok(i) => {
+ self.vals[i].1.merge(&v2);
+ }
+ Err(i) => {
+ self.vals.insert(i, (k.clone(), v2.clone()));
+ }
+ }
+ }
+ }
+}
diff --git a/src/table/crdt/mod.rs b/src/table/crdt/mod.rs
new file mode 100644
index 00000000..eb75d061
--- /dev/null
+++ b/src/table/crdt/mod.rs
@@ -0,0 +1,22 @@
+//! This package provides a simple implementation of conflict-free replicated data types (CRDTs)
+//!
+//! CRDTs are a type of data structures that do not require coordination. In other words, we can
+//! edit them in parallel, we will always find a way to merge it.
+//!
+//! A general example is a counter. Its initial value is 0. Alice and Bob get a copy of the
+//! counter. Alice does +1 on her copy, she reads 1. Bob does +3 on his copy, he reads 3. Now,
+//! it is easy to merge their counters, order does not count: we always get 4.
+//!
+//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type)
+
+mod bool;
+mod crdt;
+mod lww;
+mod lww_map;
+mod map;
+
+pub use self::bool::*;
+pub use crdt::*;
+pub use lww::*;
+pub use lww_map::*;
+pub use map::*;
diff --git a/src/table/data.rs b/src/table/data.rs
new file mode 100644
index 00000000..e07a21d2
--- /dev/null
+++ b/src/table/data.rs
@@ -0,0 +1,254 @@
+use core::borrow::Borrow;
+use std::sync::Arc;
+
+use log::warn;
+use serde_bytes::ByteBuf;
+use sled::Transactional;
+use tokio::sync::Notify;
+
+use garage_util::data::*;
+use garage_util::error::*;
+
+use garage_rpc::membership::System;
+
+use crate::crdt::CRDT;
+use crate::replication::*;
+use crate::schema::*;
+
+pub struct TableData<F: TableSchema, R: TableReplication> {
+ system: Arc<System>,
+
+ pub name: String,
+ pub(crate) instance: F,
+ pub(crate) replication: R,
+
+ pub store: sled::Tree,
+
+ pub(crate) merkle_tree: sled::Tree,
+ pub(crate) merkle_todo: sled::Tree,
+ pub(crate) merkle_todo_notify: Notify,
+ pub(crate) gc_todo: sled::Tree,
+}
+
+impl<F, R> TableData<F, R>
+where
+ F: TableSchema,
+ R: TableReplication,
+{
+ pub fn new(system: Arc<System>, name: String, instance: F, replication: R, db: &sled::Db) -> Arc<Self> {
+ let store = db
+ .open_tree(&format!("{}:table", name))
+ .expect("Unable to open DB tree");
+
+ let merkle_tree = db
+ .open_tree(&format!("{}:merkle_tree", name))
+ .expect("Unable to open DB Merkle tree tree");
+ let merkle_todo = db
+ .open_tree(&format!("{}:merkle_todo", name))
+ .expect("Unable to open DB Merkle TODO tree");
+
+ let gc_todo = db
+ .open_tree(&format!("{}:gc_todo", name))
+ .expect("Unable to open DB tree");
+
+ Arc::new(Self {
+ system,
+ name,
+ instance,
+ replication,
+ store,
+ merkle_tree,
+ merkle_todo,
+ merkle_todo_notify: Notify::new(),
+ gc_todo,
+ })
+ }
+
+ // Read functions
+
+ pub fn read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> {
+ let tree_key = self.tree_key(p, s);
+ if let Some(bytes) = self.store.get(&tree_key)? {
+ Ok(Some(ByteBuf::from(bytes.to_vec())))
+ } else {
+ Ok(None)
+ }
+ }
+
+ pub fn read_range(
+ &self,
+ p: &F::P,
+ s: &Option<F::S>,
+ filter: &Option<F::Filter>,
+ limit: usize,
+ ) -> Result<Vec<Arc<ByteBuf>>, Error> {
+ let partition_hash = p.hash();
+ let first_key = match s {
+ None => partition_hash.to_vec(),
+ Some(sk) => self.tree_key(p, sk),
+ };
+ let mut ret = vec![];
+ for item in self.store.range(first_key..) {
+ let (key, value) = item?;
+ if &key[..32] != partition_hash.as_slice() {
+ break;
+ }
+ let keep = match filter {
+ None => true,
+ Some(f) => {
+ let entry = self.decode_entry(value.as_ref())?;
+ F::matches_filter(&entry, f)
+ }
+ };
+ if keep {
+ ret.push(Arc::new(ByteBuf::from(value.as_ref())));
+ }
+ if ret.len() >= limit {
+ break;
+ }
+ }
+ Ok(ret)
+ }
+
+ // Mutation functions
+ // When changing this code, take care of propagating modifications correctly:
+ // - When an entry is modified or deleted, call the updated() function
+ // on the table instance
+ // - When an entry is modified or deleted, add it to the merkle updater's todo list.
+ // This has to be done atomically with the modification for the merkle updater
+ // to maintain consistency. The merkle updater must then be notified with todo_notify.
+ // - When an entry is updated to be a tombstone, add it to the gc_todo tree
+
+ pub(crate) fn update_many<T: Borrow<ByteBuf>>(&self, entries: &[T]) -> Result<(), Error> {
+ for update_bytes in entries.iter() {
+ self.update_entry(update_bytes.borrow().as_slice())?;
+ }
+ Ok(())
+ }
+
+ pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> {
+ let update = self.decode_entry(update_bytes)?;
+ let tree_key = self.tree_key(update.partition_key(), update.sort_key());
+
+ let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
+ let (old_entry, new_entry) = match store.get(&tree_key)? {
+ Some(prev_bytes) => {
+ let old_entry = self
+ .decode_entry(&prev_bytes)
+ .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ let mut new_entry = old_entry.clone();
+ new_entry.merge(&update);
+ (Some(old_entry), new_entry)
+ }
+ None => (None, update.clone()),
+ };
+
+ if Some(&new_entry) != old_entry.as_ref() {
+ let new_bytes = rmp_to_vec_all_named(&new_entry)
+ .map_err(Error::RMPEncode)
+ .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ let new_bytes_hash = blake2sum(&new_bytes[..]);
+ mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?;
+ store.insert(tree_key.clone(), new_bytes)?;
+ Ok(Some((old_entry, new_entry, new_bytes_hash)))
+ } else {
+ Ok(None)
+ }
+ })?;
+
+ if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
+ let is_tombstone = new_entry.is_tombstone();
+ self.instance.updated(old_entry, Some(new_entry));
+ self.merkle_todo_notify.notify_one();
+ if is_tombstone {
+ // We are only responsible for GC'ing this item if we are the
+ // "leader" of the partition, i.e. the first node in the
+ // set of nodes that replicates this partition.
+ // This avoids GC loops and does not change the termination properties
+ // of the GC algorithm, as in all cases GC is suspended if
+ // any node of the partition is unavailable.
+ let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
+ let nodes = self.replication.write_nodes(&pk_hash);
+ if nodes.first() == Some(&self.system.id) {
+ self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?;
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
+ let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
+ if let Some(cur_v) = store.get(k)? {
+ if cur_v == v {
+ store.remove(k)?;
+ mkl_todo.insert(k, vec![])?;
+ return Ok(true);
+ }
+ }
+ Ok(false)
+ })?;
+
+ if removed {
+ let old_entry = self.decode_entry(v)?;
+ self.instance.updated(Some(old_entry), None);
+ self.merkle_todo_notify.notify_one();
+ }
+ Ok(removed)
+ }
+
+ pub(crate) fn delete_if_equal_hash(
+ self: &Arc<Self>,
+ k: &[u8],
+ vhash: Hash,
+ ) -> Result<bool, Error> {
+ let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
+ if let Some(cur_v) = store.get(k)? {
+ if blake2sum(&cur_v[..]) == vhash {
+ store.remove(k)?;
+ mkl_todo.insert(k, vec![])?;
+ return Ok(Some(cur_v));
+ }
+ }
+ Ok(None)
+ })?;
+
+ if let Some(old_v) = removed {
+ let old_entry = self.decode_entry(&old_v[..])?;
+ self.instance.updated(Some(old_entry), None);
+ self.merkle_todo_notify.notify_one();
+ Ok(true)
+ } else {
+ Ok(false)
+ }
+ }
+
+ // ---- Utility functions ----
+
+ pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
+ let mut ret = p.hash().to_vec();
+ ret.extend(s.sort_key());
+ ret
+ }
+
+ pub(crate) fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
+ match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
+ Ok(x) => Ok(x),
+ Err(e) => match F::try_migrate(bytes) {
+ Some(x) => Ok(x),
+ None => {
+ warn!("Unable to decode entry of {}: {}", self.name, e);
+ for line in hexdump::hexdump_iter(bytes) {
+ debug!("{}", line);
+ }
+ Err(e.into())
+ }
+ },
+ }
+ }
+
+ pub fn gc_todo_len(&self) -> usize {
+ self.gc_todo.len()
+ }
+}
diff --git a/src/table/gc.rs b/src/table/gc.rs
new file mode 100644
index 00000000..a37c052f
--- /dev/null
+++ b/src/table/gc.rs
@@ -0,0 +1,248 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use serde::{Deserialize, Serialize};
+use serde_bytes::ByteBuf;
+
+use futures::future::join_all;
+use futures::select;
+use futures_util::future::*;
+use tokio::sync::watch;
+
+use garage_util::data::*;
+use garage_util::error::Error;
+
+use garage_rpc::membership::System;
+use garage_rpc::rpc_client::*;
+use garage_rpc::rpc_server::*;
+
+use crate::data::*;
+use crate::replication::*;
+use crate::schema::*;
+
+const TABLE_GC_BATCH_SIZE: usize = 1024;
+const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
+
+pub struct TableGC<F: TableSchema, R: TableReplication> {
+ system: Arc<System>,
+ data: Arc<TableData<F, R>>,
+
+ rpc_client: Arc<RpcClient<GcRPC>>,
+}
+
+#[derive(Serialize, Deserialize)]
+enum GcRPC {
+ Update(Vec<ByteBuf>),
+ DeleteIfEqualHash(Vec<(ByteBuf, Hash)>),
+ Ok,
+}
+
+impl RpcMessage for GcRPC {}
+
+impl<F, R> TableGC<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ pub(crate) fn launch(
+ system: Arc<System>,
+ data: Arc<TableData<F, R>>,
+ rpc_server: &mut RpcServer,
+ ) -> Arc<Self> {
+ let rpc_path = format!("table_{}/gc", data.name);
+ let rpc_client = system.rpc_client::<GcRPC>(&rpc_path);
+
+ let gc = Arc::new(Self {
+ system: system.clone(),
+ data: data.clone(),
+ rpc_client,
+ });
+
+ gc.register_handler(rpc_server, rpc_path);
+
+ let gc1 = gc.clone();
+ system.background.spawn_worker(
+ format!("GC loop for {}", data.name),
+ move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit),
+ );
+
+ gc
+ }
+
+ async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
+ while !*must_exit.borrow() {
+ match self.gc_loop_iter().await {
+ Ok(true) => {
+ // Stuff was done, loop imediately
+ continue;
+ }
+ Ok(false) => {
+ // Nothing was done, sleep for some time (below)
+ }
+ Err(e) => {
+ warn!("({}) Error doing GC: {}", self.data.name, e);
+ }
+ }
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => (),
+ _ = must_exit.changed().fuse() => (),
+ }
+ }
+ }
+
+ async fn gc_loop_iter(&self) -> Result<bool, Error> {
+ let mut entries = vec![];
+ let mut excluded = vec![];
+
+ for item in self.data.gc_todo.iter() {
+ let (k, vhash) = item?;
+
+ let vhash = Hash::try_from(&vhash[..]).unwrap();
+
+ let v_opt = self
+ .data
+ .store
+ .get(&k[..])?
+ .filter(|v| blake2sum(&v[..]) == vhash);
+
+ if let Some(v) = v_opt {
+ entries.push((ByteBuf::from(k.to_vec()), vhash, ByteBuf::from(v.to_vec())));
+ if entries.len() >= TABLE_GC_BATCH_SIZE {
+ break;
+ }
+ } else {
+ excluded.push((k, vhash));
+ }
+ }
+
+ for (k, vhash) in excluded {
+ self.todo_remove_if_equal(&k[..], vhash)?;
+ }
+
+ if entries.len() == 0 {
+ // Nothing to do in this iteration
+ return Ok(false);
+ }
+
+ debug!("({}) GC: doing {} items", self.data.name, entries.len());
+
+ let mut partitions = HashMap::new();
+ for (k, vhash, v) in entries {
+ let pkh = Hash::try_from(&k[..32]).unwrap();
+ let mut nodes = self.data.replication.write_nodes(&pkh);
+ nodes.retain(|x| *x != self.system.id);
+ nodes.sort();
+
+ if !partitions.contains_key(&nodes) {
+ partitions.insert(nodes.clone(), vec![]);
+ }
+ partitions.get_mut(&nodes).unwrap().push((k, vhash, v));
+ }
+
+ let resps = join_all(
+ partitions
+ .into_iter()
+ .map(|(nodes, items)| self.try_send_and_delete(nodes, items)),
+ )
+ .await;
+
+ let mut errs = vec![];
+ for resp in resps {
+ if let Err(e) = resp {
+ errs.push(e);
+ }
+ }
+
+ if errs.is_empty() {
+ Ok(true)
+ } else {
+ Err(Error::Message(errs.into_iter().map(|x| format!("{}", x)).collect::<Vec<_>>().join(", ")))
+ }
+ }
+
+ async fn try_send_and_delete(
+ &self,
+ nodes: Vec<UUID>,
+ items: Vec<(ByteBuf, Hash, ByteBuf)>,
+ ) -> Result<(), Error> {
+ let n_items = items.len();
+
+ let mut updates = vec![];
+ let mut deletes = vec![];
+ for (k, vhash, v) in items {
+ updates.push(v);
+ deletes.push((k, vhash));
+ }
+
+ self.rpc_client
+ .try_call_many(
+ &nodes[..],
+ GcRPC::Update(updates),
+ RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
+ )
+ .await?;
+
+ info!(
+ "({}) GC: {} items successfully pushed, will try to delete.",
+ self.data.name, n_items
+ );
+
+ self.rpc_client
+ .try_call_many(
+ &nodes[..],
+ GcRPC::DeleteIfEqualHash(deletes.clone()),
+ RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
+ )
+ .await?;
+
+ for (k, vhash) in deletes {
+ self.data.delete_if_equal_hash(&k[..], vhash)?;
+ self.todo_remove_if_equal(&k[..], vhash)?;
+ }
+
+ Ok(())
+ }
+
+ fn todo_remove_if_equal(&self, key: &[u8], vhash: Hash) -> Result<(), Error> {
+ let _ = self
+ .data
+ .gc_todo
+ .compare_and_swap::<_, _, Vec<u8>>(key, Some(vhash), None)?;
+ Ok(())
+ }
+
+ // ---- RPC HANDLER ----
+
+ fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
+ let self2 = self.clone();
+ rpc_server.add_handler::<GcRPC, _, _>(path, move |msg, _addr| {
+ let self2 = self2.clone();
+ async move { self2.handle_rpc(&msg).await }
+ });
+
+ let self2 = self.clone();
+ self.rpc_client
+ .set_local_handler(self.system.id, move |msg| {
+ let self2 = self2.clone();
+ async move { self2.handle_rpc(&msg).await }
+ });
+ }
+
+ async fn handle_rpc(self: &Arc<Self>, message: &GcRPC) -> Result<GcRPC, Error> {
+ match message {
+ GcRPC::Update(items) => {
+ self.data.update_many(items)?;
+ Ok(GcRPC::Ok)
+ }
+ GcRPC::DeleteIfEqualHash(items) => {
+ for (key, vhash) in items.iter() {
+ self.data.delete_if_equal_hash(&key[..], *vhash)?;
+ self.todo_remove_if_equal(&key[..], *vhash)?;
+ }
+ Ok(GcRPC::Ok)
+ }
+ _ => Err(Error::Message(format!("Unexpected GC RPC"))),
+ }
+ }
+}
diff --git a/src/table/lib.rs b/src/table/lib.rs
index 704f8f1e..3b73163b 100644
--- a/src/table/lib.rs
+++ b/src/table/lib.rs
@@ -7,10 +7,12 @@ pub mod crdt;
pub mod schema;
pub mod util;
+pub mod data;
+pub mod gc;
+pub mod merkle;
+pub mod replication;
+pub mod sync;
pub mod table;
-pub mod table_fullcopy;
-pub mod table_sharded;
-pub mod table_sync;
pub use schema::*;
pub use table::*;
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
new file mode 100644
index 00000000..3001786f
--- /dev/null
+++ b/src/table/merkle.rs
@@ -0,0 +1,454 @@
+use std::sync::Arc;
+use std::time::Duration;
+
+use futures::select;
+use futures_util::future::*;
+use log::{debug, warn};
+use serde::{Deserialize, Serialize};
+use sled::transaction::{
+ ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree,
+};
+use tokio::sync::watch;
+
+use garage_util::background::BackgroundRunner;
+use garage_util::data::*;
+use garage_util::error::Error;
+
+use garage_rpc::ring::*;
+
+use crate::data::*;
+use crate::replication::*;
+use crate::schema::*;
+
+// This modules partitions the data in 2**16 partitions, based on the top
+// 16 bits (two bytes) of item's partition keys' hashes.
+// It builds one Merkle tree for each of these 2**16 partitions.
+
+pub struct MerkleUpdater<F: TableSchema, R: TableReplication> {
+ data: Arc<TableData<F, R>>,
+
+ // Content of the todo tree: items where
+ // - key = the key of an item in the main table, ie hash(partition_key)+sort_key
+ // - value = the hash of the full serialized item, if present,
+ // or an empty vec if item is absent (deleted)
+ // Fields in data:
+ // pub(crate) merkle_todo: sled::Tree,
+ // pub(crate) merkle_todo_notify: Notify,
+
+ // Content of the merkle tree: items where
+ // - key = .bytes() for MerkleNodeKey
+ // - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found
+ // Field in data:
+ // pub(crate) merkle_tree: sled::Tree,
+ empty_node_hash: Hash,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct MerkleNodeKey {
+ // partition number
+ pub partition: Partition,
+
+ // prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key)
+ #[serde(with = "serde_bytes")]
+ pub prefix: Vec<u8>,
+}
+
+#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
+pub enum MerkleNode {
+ // The empty Merkle node
+ Empty,
+
+ // An intermediate Merkle tree node for a prefix
+ // Contains the hashes of the 256 possible next prefixes
+ Intermediate(Vec<(u8, Hash)>),
+
+ // A final node for an item
+ // Contains the full key of the item and the hash of the value
+ Leaf(Vec<u8>, Hash),
+}
+
+impl<F, R> MerkleUpdater<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ pub(crate) fn launch(background: &BackgroundRunner, data: Arc<TableData<F, R>>) -> Arc<Self> {
+ let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
+
+ let ret = Arc::new(Self {
+ data,
+ empty_node_hash,
+ });
+
+ let ret2 = ret.clone();
+ background.spawn_worker(
+ format!("Merkle tree updater for {}", ret.data.name),
+ |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit),
+ );
+
+ ret
+ }
+
+ async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
+ while !*must_exit.borrow() {
+ if let Some(x) = self.data.merkle_todo.iter().next() {
+ match x {
+ Ok((key, valhash)) => {
+ if let Err(e) = self.update_item(&key[..], &valhash[..]) {
+ warn!(
+ "({}) Error while updating Merkle tree item: {}",
+ self.data.name, e
+ );
+ }
+ }
+ Err(e) => {
+ warn!(
+ "({}) Error while iterating on Merkle todo tree: {}",
+ self.data.name, e
+ );
+ tokio::time::sleep(Duration::from_secs(10)).await;
+ }
+ }
+ } else {
+ select! {
+ _ = self.data.merkle_todo_notify.notified().fuse() => (),
+ _ = must_exit.changed().fuse() => (),
+ }
+ }
+ }
+ }
+
+ fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> {
+ let khash = blake2sum(k);
+
+ let new_vhash = if vhash_by.len() == 0 {
+ None
+ } else {
+ Some(Hash::try_from(&vhash_by[..]).unwrap())
+ };
+
+ let key = MerkleNodeKey {
+ partition: self
+ .data
+ .replication
+ .partition_of(&Hash::try_from(&k[0..32]).unwrap()),
+ prefix: vec![],
+ };
+ self.data
+ .merkle_tree
+ .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?;
+
+ let deleted = self
+ .data
+ .merkle_todo
+ .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)?
+ .is_ok();
+
+ if !deleted {
+ debug!(
+ "({}) Item not deleted from Merkle todo because it changed: {:?}",
+ self.data.name, k
+ );
+ }
+ Ok(())
+ }
+
+ fn update_item_rec(
+ &self,
+ tx: &TransactionalTree,
+ k: &[u8],
+ khash: &Hash,
+ key: &MerkleNodeKey,
+ new_vhash: Option<Hash>,
+ ) -> ConflictableTransactionResult<Option<Hash>, Error> {
+ let i = key.prefix.len();
+
+ // Read node at current position (defined by the prefix stored in key)
+ // Calculate an update to apply to this node
+ // This update is an Option<_>, so that it is None if the update is a no-op
+ // and we can thus skip recalculating and re-storing everything
+ let mutate = match self.read_node_txn(tx, &key)? {
+ MerkleNode::Empty => {
+ if let Some(vhv) = new_vhash {
+ Some(MerkleNode::Leaf(k.to_vec(), vhv))
+ } else {
+ // Nothing to do, keep empty node
+ None
+ }
+ }
+ MerkleNode::Intermediate(mut children) => {
+ let key2 = key.next_key(khash);
+ if let Some(subhash) = self.update_item_rec(tx, k, khash, &key2, new_vhash)? {
+ // Subtree changed, update this node as well
+ if subhash == self.empty_node_hash {
+ intermediate_rm_child(&mut children, key2.prefix[i]);
+ } else {
+ intermediate_set_child(&mut children, key2.prefix[i], subhash);
+ }
+
+ if children.len() == 0 {
+ // should not happen
+ warn!(
+ "({}) Replacing intermediate node with empty node, should not happen.",
+ self.data.name
+ );
+ Some(MerkleNode::Empty)
+ } else if children.len() == 1 {
+ // We now have a single node (case when the update deleted one of only two
+ // children). If that node is a leaf, move it to this level.
+ let key_sub = key.add_byte(children[0].0);
+ let subnode = self.read_node_txn(tx, &key_sub)?;
+ match subnode {
+ MerkleNode::Empty => {
+ warn!("({}) Single subnode in tree is empty Merkle node", self.data.name);
+ Some(MerkleNode::Empty)
+ }
+ MerkleNode::Intermediate(_) => {
+ Some(MerkleNode::Intermediate(children))
+ }
+ x @ MerkleNode::Leaf(_, _) => {
+ tx.remove(key_sub.encode())?;
+ Some(x)
+ }
+ }
+ } else {
+ Some(MerkleNode::Intermediate(children))
+ }
+ } else {
+ // Subtree not changed, nothing to do
+ None
+ }
+ }
+ MerkleNode::Leaf(exlf_k, exlf_vhash) => {
+ if exlf_k == k {
+ // This leaf is for the same key that the one we are updating
+ match new_vhash {
+ Some(vhv) if vhv == exlf_vhash => None,
+ Some(vhv) => Some(MerkleNode::Leaf(k.to_vec(), vhv)),
+ None => Some(MerkleNode::Empty),
+ }
+ } else {
+ // This is an only leaf for another key
+ if new_vhash.is_some() {
+ // Move that other key to a subnode, create another subnode for our
+ // insertion and replace current node by an intermediary node
+ let mut int = vec![];
+
+ let exlf_khash = blake2sum(&exlf_k[..]);
+ assert_eq!(khash.as_slice()[..i], exlf_khash.as_slice()[..i]);
+
+ {
+ let exlf_subkey = key.next_key(&exlf_khash);
+ let exlf_sub_hash = self.update_item_rec(tx, &exlf_k[..], &exlf_khash, &exlf_subkey, Some(exlf_vhash))?.unwrap();
+ intermediate_set_child(&mut int, exlf_subkey.prefix[i], exlf_sub_hash);
+ assert_eq!(int.len(), 1);
+ }
+
+ {
+ let key2 = key.next_key(khash);
+ let subhash = self.update_item_rec(tx, k, khash, &key2, new_vhash)?.unwrap();
+ intermediate_set_child(&mut int, key2.prefix[i], subhash);
+ if exlf_khash.as_slice()[i] == khash.as_slice()[i] {
+ assert_eq!(int.len(), 1);
+ } else {
+ assert_eq!(int.len(), 2);
+ }
+ }
+ Some(MerkleNode::Intermediate(int))
+ } else {
+ // Nothing to do, we don't want to insert this value because it is None,
+ // and we don't want to change the other value because it's for something
+ // else
+ None
+ }
+ }
+ }
+ };
+
+ if let Some(new_node) = mutate {
+ let hash = self.put_node_txn(tx, &key, &new_node)?;
+ Ok(Some(hash))
+ } else {
+ Ok(None)
+ }
+ }
+
+ // Merkle tree node manipulation
+
+ fn read_node_txn(
+ &self,
+ tx: &TransactionalTree,
+ k: &MerkleNodeKey,
+ ) -> ConflictableTransactionResult<MerkleNode, Error> {
+ let ent = tx.get(k.encode())?;
+ MerkleNode::decode_opt(ent).map_err(ConflictableTransactionError::Abort)
+ }
+
+ fn put_node_txn(
+ &self,
+ tx: &TransactionalTree,
+ k: &MerkleNodeKey,
+ v: &MerkleNode,
+ ) -> ConflictableTransactionResult<Hash, Error> {
+ trace!("Put Merkle node: {:?} => {:?}", k, v);
+ if *v == MerkleNode::Empty {
+ tx.remove(k.encode())?;
+ Ok(self.empty_node_hash)
+ } else {
+ let vby = rmp_to_vec_all_named(v)
+ .map_err(|e| ConflictableTransactionError::Abort(e.into()))?;
+ let rethash = blake2sum(&vby[..]);
+ tx.insert(k.encode(), vby)?;
+ Ok(rethash)
+ }
+ }
+
+ // Access a node in the Merkle tree, used by the sync protocol
+ pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result<MerkleNode, Error> {
+ let ent = self.data.merkle_tree.get(k.encode())?;
+ MerkleNode::decode_opt(ent)
+ }
+
+ pub fn merkle_tree_len(&self) -> usize {
+ self.data.merkle_tree.len()
+ }
+
+ pub fn todo_len(&self) -> usize {
+ self.data.merkle_todo.len()
+ }
+}
+
+impl MerkleNodeKey {
+ fn encode(&self) -> Vec<u8> {
+ let mut ret = Vec::with_capacity(2 + self.prefix.len());
+ ret.extend(&u16::to_be_bytes(self.partition)[..]);
+ ret.extend(&self.prefix[..]);
+ ret
+ }
+
+ pub fn next_key(&self, h: &Hash) -> Self {
+ assert_eq!(h.as_slice()[0..self.prefix.len()], self.prefix[..]);
+ let mut s2 = self.clone();
+ s2.prefix.push(h.as_slice()[self.prefix.len()]);
+ s2
+ }
+
+ pub fn add_byte(&self, b: u8) -> Self {
+ let mut s2 = self.clone();
+ s2.prefix.push(b);
+ s2
+ }
+}
+
+impl MerkleNode {
+ fn decode_opt(ent: Option<sled::IVec>) -> Result<Self, Error> {
+ match ent {
+ None => Ok(MerkleNode::Empty),
+ Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),
+ }
+ }
+
+ pub fn is_empty(&self) -> bool {
+ *self == MerkleNode::Empty
+ }
+}
+
+fn intermediate_set_child(ch: &mut Vec<(u8, Hash)>, pos: u8, v: Hash) {
+ for i in 0..ch.len() {
+ if ch[i].0 == pos {
+ ch[i].1 = v;
+ return;
+ } else if ch[i].0 > pos {
+ ch.insert(i, (pos, v));
+ return;
+ }
+ }
+ ch.push((pos, v));
+}
+
+fn intermediate_rm_child(ch: &mut Vec<(u8, Hash)>, pos: u8) {
+ for i in 0..ch.len() {
+ if ch[i].0 == pos {
+ ch.remove(i);
+ return;
+ }
+ }
+}
+
+#[test]
+fn test_intermediate_aux() {
+ let mut v = vec![];
+
+ intermediate_set_child(&mut v, 12u8, [12u8; 32].into());
+ assert_eq!(v, vec![(12u8, [12u8; 32].into())]);
+
+ intermediate_set_child(&mut v, 42u8, [42u8; 32].into());
+ assert_eq!(
+ v,
+ vec![(12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]
+ );
+
+ intermediate_set_child(&mut v, 4u8, [4u8; 32].into());
+ assert_eq!(
+ v,
+ vec![
+ (4u8, [4u8; 32].into()),
+ (12u8, [12u8; 32].into()),
+ (42u8, [42u8; 32].into())
+ ]
+ );
+
+ intermediate_set_child(&mut v, 12u8, [8u8; 32].into());
+ assert_eq!(
+ v,
+ vec![
+ (4u8, [4u8; 32].into()),
+ (12u8, [8u8; 32].into()),
+ (42u8, [42u8; 32].into())
+ ]
+ );
+
+ intermediate_set_child(&mut v, 6u8, [6u8; 32].into());
+ assert_eq!(
+ v,
+ vec![
+ (4u8, [4u8; 32].into()),
+ (6u8, [6u8; 32].into()),
+ (12u8, [8u8; 32].into()),
+ (42u8, [42u8; 32].into())
+ ]
+ );
+
+ intermediate_rm_child(&mut v, 42u8);
+ assert_eq!(
+ v,
+ vec![
+ (4u8, [4u8; 32].into()),
+ (6u8, [6u8; 32].into()),
+ (12u8, [8u8; 32].into())
+ ]
+ );
+
+ intermediate_rm_child(&mut v, 11u8);
+ assert_eq!(
+ v,
+ vec![
+ (4u8, [4u8; 32].into()),
+ (6u8, [6u8; 32].into()),
+ (12u8, [8u8; 32].into())
+ ]
+ );
+
+ intermediate_rm_child(&mut v, 6u8);
+ assert_eq!(v, vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into())]);
+
+ intermediate_set_child(&mut v, 6u8, [7u8; 32].into());
+ assert_eq!(
+ v,
+ vec![
+ (4u8, [4u8; 32].into()),
+ (6u8, [7u8; 32].into()),
+ (12u8, [8u8; 32].into())
+ ]
+ );
+}
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
new file mode 100644
index 00000000..bd658f63
--- /dev/null
+++ b/src/table/replication/fullcopy.rs
@@ -0,0 +1,51 @@
+use std::sync::Arc;
+
+use garage_rpc::membership::System;
+use garage_rpc::ring::*;
+use garage_util::data::*;
+
+use crate::replication::*;
+
+#[derive(Clone)]
+pub struct TableFullReplication {
+ pub system: Arc<System>,
+ pub max_faults: usize,
+}
+
+impl TableReplication for TableFullReplication {
+ // Full replication schema: all nodes store everything
+ // Writes are disseminated in an epidemic manner in the network
+
+ // Advantage: do all reads locally, extremely fast
+ // Inconvenient: only suitable to reasonably small tables
+
+ fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> {
+ vec![self.system.id]
+ }
+ fn read_quorum(&self) -> usize {
+ 1
+ }
+
+ fn write_nodes(&self, _hash: &Hash) -> Vec<UUID> {
+ let ring = self.system.ring.borrow();
+ ring.config.members.keys().cloned().collect::<Vec<_>>()
+ }
+ fn write_quorum(&self) -> usize {
+ let nmembers = self.system.ring.borrow().config.members.len();
+ if nmembers > self.max_faults {
+ nmembers - self.max_faults
+ } else {
+ 1
+ }
+ }
+ fn max_write_errors(&self) -> usize {
+ self.max_faults
+ }
+
+ fn partition_of(&self, _hash: &Hash) -> Partition {
+ 0u16
+ }
+ fn partitions(&self) -> Vec<(Partition, Hash)> {
+ vec![(0u16, [0u8; 32].into())]
+ }
+}
diff --git a/src/table/replication/mod.rs b/src/table/replication/mod.rs
new file mode 100644
index 00000000..d43d7f19
--- /dev/null
+++ b/src/table/replication/mod.rs
@@ -0,0 +1,6 @@
+mod parameters;
+
+pub mod fullcopy;
+pub mod sharded;
+
+pub use parameters::*;
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
new file mode 100644
index 00000000..e46bd172
--- /dev/null
+++ b/src/table/replication/parameters.rs
@@ -0,0 +1,21 @@
+use garage_rpc::ring::*;
+
+use garage_util::data::*;
+
+pub trait TableReplication: Send + Sync {
+ // See examples in table_sharded.rs and table_fullcopy.rs
+ // To understand various replication methods
+
+ // Which nodes to send reads from
+ fn read_nodes(&self, hash: &Hash) -> Vec<UUID>;
+ fn read_quorum(&self) -> usize;
+
+ // Which nodes to send writes to
+ fn write_nodes(&self, hash: &Hash) -> Vec<UUID>;
+ fn write_quorum(&self) -> usize;
+ fn max_write_errors(&self) -> usize;
+
+ // Accessing partitions, for Merkle tree & sync
+ fn partition_of(&self, hash: &Hash) -> Partition;
+ fn partitions(&self) -> Vec<(Partition, Hash)>;
+}
diff --git a/src/table/table_sharded.rs b/src/table/replication/sharded.rs
index 098637dd..dce74b03 100644
--- a/src/table/table_sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -1,11 +1,14 @@
+use std::sync::Arc;
+
use garage_rpc::membership::System;
-use garage_rpc::ring::Ring;
+use garage_rpc::ring::*;
use garage_util::data::*;
-use crate::*;
+use crate::replication::*;
#[derive(Clone)]
pub struct TableShardedReplication {
+ pub system: Arc<System>,
pub replication_factor: usize,
pub read_quorum: usize,
pub write_quorum: usize,
@@ -19,35 +22,29 @@ impl TableReplication for TableShardedReplication {
// - reads are done on all of the nodes that replicate the data
// - writes as well
- fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> {
- let ring = system.ring.borrow().clone();
+ fn read_nodes(&self, hash: &Hash) -> Vec<UUID> {
+ let ring = self.system.ring.borrow().clone();
ring.walk_ring(&hash, self.replication_factor)
}
fn read_quorum(&self) -> usize {
self.read_quorum
}
- fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> {
- let ring = system.ring.borrow().clone();
+ fn write_nodes(&self, hash: &Hash) -> Vec<UUID> {
+ let ring = self.system.ring.borrow();
ring.walk_ring(&hash, self.replication_factor)
}
- fn write_quorum(&self, _system: &System) -> usize {
+ fn write_quorum(&self) -> usize {
self.write_quorum
}
fn max_write_errors(&self) -> usize {
self.replication_factor - self.write_quorum
}
- fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID> {
- ring.walk_ring(&hash, self.replication_factor)
+ fn partition_of(&self, hash: &Hash) -> Partition {
+ self.system.ring.borrow().partition_of(hash)
}
- fn split_points(&self, ring: &Ring) -> Vec<Hash> {
- let mut ret = vec![];
-
- for entry in ring.ring.iter() {
- ret.push(entry.location);
- }
- ret.push([0xFFu8; 32].into());
- ret
+ fn partitions(&self) -> Vec<(Partition, Hash)> {
+ self.system.ring.borrow().partitions()
}
}
diff --git a/src/table/schema.rs b/src/table/schema.rs
index edd04000..4d754664 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -2,13 +2,15 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*;
+use crate::crdt::CRDT;
+
pub trait PartitionKey {
fn hash(&self) -> Hash;
}
impl PartitionKey for String {
fn hash(&self) -> Hash {
- sha256sum(self.as_bytes())
+ blake2sum(self.as_bytes())
}
}
@@ -35,12 +37,14 @@ impl SortKey for Hash {
}
pub trait Entry<P: PartitionKey, S: SortKey>:
- PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
+ CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
{
fn partition_key(&self) -> &P;
fn sort_key(&self) -> &S;
- fn merge(&mut self, other: &Self);
+ fn is_tombstone(&self) -> bool {
+ false
+ }
}
pub trait TableSchema: Send + Sync {
diff --git a/src/table/sync.rs b/src/table/sync.rs
new file mode 100644
index 00000000..3130abe8
--- /dev/null
+++ b/src/table/sync.rs
@@ -0,0 +1,614 @@
+use std::collections::VecDeque;
+use std::sync::{Arc, Mutex};
+use std::time::{Duration, Instant};
+
+use futures::select;
+use futures_util::future::*;
+use futures_util::stream::*;
+use rand::Rng;
+use serde::{Deserialize, Serialize};
+use serde_bytes::ByteBuf;
+use tokio::sync::{mpsc, watch};
+
+use garage_util::data::*;
+use garage_util::error::Error;
+
+use garage_rpc::membership::System;
+use garage_rpc::ring::*;
+use garage_rpc::rpc_client::*;
+use garage_rpc::rpc_server::*;
+
+use crate::data::*;
+use crate::merkle::*;
+use crate::replication::*;
+use crate::*;
+
+const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
+
+// Do anti-entropy every 10 minutes
+const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
+
+pub struct TableSyncer<F: TableSchema, R: TableReplication> {
+ system: Arc<System>,
+ data: Arc<TableData<F, R>>,
+ merkle: Arc<MerkleUpdater<F, R>>,
+
+ todo: Mutex<SyncTodo>,
+ rpc_client: Arc<RpcClient<SyncRPC>>,
+}
+
+#[derive(Serialize, Deserialize)]
+pub(crate) enum SyncRPC {
+ RootCkHash(Partition, Hash),
+ RootCkDifferent(bool),
+ GetNode(MerkleNodeKey),
+ Node(MerkleNodeKey, MerkleNode),
+ Items(Vec<Arc<ByteBuf>>),
+ Ok,
+}
+
+impl RpcMessage for SyncRPC {}
+
+struct SyncTodo {
+ todo: Vec<TodoPartition>,
+}
+
+#[derive(Debug, Clone)]
+struct TodoPartition {
+ partition: Partition,
+ begin: Hash,
+ end: Hash,
+
+ // Are we a node that stores this partition or not?
+ retain: bool,
+}
+
+impl<F, R> TableSyncer<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ pub(crate) fn launch(
+ system: Arc<System>,
+ data: Arc<TableData<F, R>>,
+ merkle: Arc<MerkleUpdater<F, R>>,
+ rpc_server: &mut RpcServer,
+ ) -> Arc<Self> {
+ let rpc_path = format!("table_{}/sync", data.name);
+ let rpc_client = system.rpc_client::<SyncRPC>(&rpc_path);
+
+ let todo = SyncTodo { todo: vec![] };
+
+ let syncer = Arc::new(Self {
+ system: system.clone(),
+ data: data.clone(),
+ merkle,
+ todo: Mutex::new(todo),
+ rpc_client,
+ });
+
+ syncer.register_handler(rpc_server, rpc_path);
+
+ let (busy_tx, busy_rx) = mpsc::unbounded_channel();
+
+ let s1 = syncer.clone();
+ system.background.spawn_worker(
+ format!("table sync watcher for {}", data.name),
+ move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
+ );
+
+ let s2 = syncer.clone();
+ system.background.spawn_worker(
+ format!("table syncer for {}", data.name),
+ move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
+ );
+
+ let s3 = syncer.clone();
+ tokio::spawn(async move {
+ tokio::time::sleep(Duration::from_secs(20)).await;
+ s3.add_full_sync();
+ });
+
+ syncer
+ }
+
+ fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
+ let self2 = self.clone();
+ rpc_server.add_handler::<SyncRPC, _, _>(path, move |msg, _addr| {
+ let self2 = self2.clone();
+ async move { self2.handle_rpc(&msg).await }
+ });
+
+ let self2 = self.clone();
+ self.rpc_client
+ .set_local_handler(self.system.id, move |msg| {
+ let self2 = self2.clone();
+ async move { self2.handle_rpc(&msg).await }
+ });
+ }
+
+ async fn watcher_task(
+ self: Arc<Self>,
+ mut must_exit: watch::Receiver<bool>,
+ mut busy_rx: mpsc::UnboundedReceiver<bool>,
+ ) {
+ let mut prev_ring: Arc<Ring> = self.system.ring.borrow().clone();
+ let mut ring_recv: watch::Receiver<Arc<Ring>> = self.system.ring.clone();
+ let mut nothing_to_do_since = Some(Instant::now());
+
+ while !*must_exit.borrow() {
+ select! {
+ _ = ring_recv.changed().fuse() => {
+ let new_ring = ring_recv.borrow();
+ if !Arc::ptr_eq(&new_ring, &prev_ring) {
+ debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name);
+ self.add_full_sync();
+ prev_ring = new_ring.clone();
+ }
+ }
+ busy_opt = busy_rx.recv().fuse() => {
+ if let Some(busy) = busy_opt {
+ if busy {
+ nothing_to_do_since = None;
+ } else {
+ if nothing_to_do_since.is_none() {
+ nothing_to_do_since = Some(Instant::now());
+ }
+ }
+ }
+ }
+ _ = must_exit.changed().fuse() => (),
+ _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
+ if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
+ nothing_to_do_since = None;
+ debug!("({}) Interval passed, adding full sync to syncer todo list", self.data.name);
+ self.add_full_sync();
+ }
+ }
+ }
+ }
+ }
+
+ pub fn add_full_sync(&self) {
+ self.todo
+ .lock()
+ .unwrap()
+ .add_full_sync(&self.data, &self.system);
+ }
+
+ async fn syncer_task(
+ self: Arc<Self>,
+ mut must_exit: watch::Receiver<bool>,
+ busy_tx: mpsc::UnboundedSender<bool>,
+ ) {
+ while !*must_exit.borrow() {
+ let task = self.todo.lock().unwrap().pop_task();
+ if let Some(partition) = task {
+ busy_tx.send(true).unwrap();
+ let res = self
+ .clone()
+ .sync_partition(&partition, &mut must_exit)
+ .await;
+ if let Err(e) = res {
+ warn!(
+ "({}) Error while syncing {:?}: {}",
+ self.data.name, partition, e
+ );
+ }
+ } else {
+ busy_tx.send(false).unwrap();
+ tokio::time::sleep(Duration::from_secs(1)).await;
+ }
+ }
+ }
+
+ async fn sync_partition(
+ self: Arc<Self>,
+ partition: &TodoPartition,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ if partition.retain {
+ let my_id = self.system.id;
+
+ let nodes = self
+ .data
+ .replication
+ .write_nodes(&partition.begin)
+ .into_iter()
+ .filter(|node| *node != my_id)
+ .collect::<Vec<_>>();
+
+ debug!(
+ "({}) Syncing {:?} with {:?}...",
+ self.data.name, partition, nodes
+ );
+ let mut sync_futures = nodes
+ .iter()
+ .map(|node| {
+ self.clone()
+ .do_sync_with(partition.clone(), *node, must_exit.clone())
+ })
+ .collect::<FuturesUnordered<_>>();
+
+ let mut n_errors = 0;
+ while let Some(r) = sync_futures.next().await {
+ if let Err(e) = r {
+ n_errors += 1;
+ warn!("({}) Sync error: {}", self.data.name, e);
+ }
+ }
+ if n_errors > self.data.replication.max_write_errors() {
+ return Err(Error::Message(format!(
+ "Sync failed with too many nodes (should have been: {:?}).",
+ nodes
+ )));
+ }
+ } else {
+ self.offload_partition(&partition.begin, &partition.end, must_exit)
+ .await?;
+ }
+
+ Ok(())
+ }
+
+ // Offload partition: this partition is not something we are storing,
+ // so send it out to all other nodes that store it and delete items locally.
+ // We don't bother checking if the remote nodes already have the items,
+ // we just batch-send everything. Offloading isn't supposed to happen very often.
+ // If any of the nodes that are supposed to store the items is unable to
+ // save them, we interrupt the process.
+ async fn offload_partition(
+ self: &Arc<Self>,
+ begin: &Hash,
+ end: &Hash,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ let mut counter: usize = 0;
+
+ while !*must_exit.borrow() {
+ let mut items = Vec::new();
+
+ for item in self.data.store.range(begin.to_vec()..end.to_vec()) {
+ let (key, value) = item?;
+ items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
+
+ if items.len() >= 1024 {
+ break;
+ }
+ }
+
+ if items.len() > 0 {
+ let nodes = self
+ .data
+ .replication
+ .write_nodes(&begin)
+ .into_iter()
+ .collect::<Vec<_>>();
+ if nodes.contains(&self.system.id) {
+ warn!(
+ "({}) Interrupting offload as partitions seem to have changed",
+ self.data.name
+ );
+ break;
+ }
+ if nodes.len() < self.data.replication.write_quorum() {
+ return Err(Error::Message(format!(
+ "Not offloading as we don't have a quorum of nodes to write to."
+ )));
+ }
+
+ counter += 1;
+ info!(
+ "({}) Offloading {} items from {:?}..{:?} ({})",
+ self.data.name,
+ items.len(),
+ begin,
+ end,
+ counter
+ );
+ self.offload_items(&items, &nodes[..]).await?;
+ } else {
+ break;
+ }
+ }
+
+ Ok(())
+ }
+
+ async fn offload_items(
+ self: &Arc<Self>,
+ items: &Vec<(Vec<u8>, Arc<ByteBuf>)>,
+ nodes: &[UUID],
+ ) -> Result<(), Error> {
+ let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
+
+ self.rpc_client
+ .try_call_many(
+ &nodes[..],
+ SyncRPC::Items(values),
+ RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ )
+ .await?;
+
+ // All remote nodes have written those items, now we can delete them locally
+ let mut not_removed = 0;
+ for (k, v) in items.iter() {
+ if !self.data.delete_if_equal(&k[..], &v[..])? {
+ not_removed += 1;
+ }
+ }
+
+ if not_removed > 0 {
+ debug!("({}) {} items not removed during offload because they changed in between (trying again...)", self.data.name, not_removed);
+ }
+
+ Ok(())
+ }
+
+ // ======= SYNCHRONIZATION PROCEDURE -- DRIVER SIDE ======
+ // The driver side is only concerned with sending out the item it has
+ // and the other side might not have. Receiving items that differ from one
+ // side to the other will happen when the other side syncs with us,
+ // which they also do regularly.
+
+ fn get_root_ck(&self, partition: Partition) -> Result<(MerkleNodeKey, MerkleNode), Error> {
+ let key = MerkleNodeKey {
+ partition,
+ prefix: vec![],
+ };
+ let node = self.merkle.read_node(&key)?;
+ Ok((key, node))
+ }
+
+ async fn do_sync_with(
+ self: Arc<Self>,
+ partition: TodoPartition,
+ who: UUID,
+ must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?;
+ if root_ck.is_empty() {
+ debug!(
+ "({}) Sync {:?} with {:?}: partition is empty.",
+ self.data.name, partition, who
+ );
+ return Ok(());
+ }
+ let root_ck_hash = hash_of::<MerkleNode>(&root_ck)?;
+
+ // Check if they have the same root checksum
+ // If so, do nothing.
+ let root_resp = self
+ .rpc_client
+ .call(
+ who,
+ SyncRPC::RootCkHash(partition.partition, root_ck_hash),
+ TABLE_SYNC_RPC_TIMEOUT,
+ )
+ .await?;
+
+ let mut todo = match root_resp {
+ SyncRPC::RootCkDifferent(false) => {
+ debug!(
+ "({}) Sync {:?} with {:?}: no difference",
+ self.data.name, partition, who
+ );
+ return Ok(());
+ }
+ SyncRPC::RootCkDifferent(true) => VecDeque::from(vec![root_ck_key]),
+ x => {
+ return Err(Error::Message(format!(
+ "Invalid respone to RootCkHash RPC: {}",
+ debug_serialize(x)
+ )));
+ }
+ };
+
+ let mut todo_items = vec![];
+
+ while !todo.is_empty() && !*must_exit.borrow() {
+ let key = todo.pop_front().unwrap();
+ let node = self.merkle.read_node(&key)?;
+
+ match node {
+ MerkleNode::Empty => {
+ // They have items we don't have.
+ // We don't request those items from them, they will send them.
+ // We only bother with pushing items that differ
+ }
+ MerkleNode::Leaf(ik, ivhash) => {
+ // Just send that item directly
+ if let Some(val) = self.data.store.get(&ik[..])? {
+ if blake2sum(&val[..]) != ivhash {
+ warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik);
+ }
+ todo_items.push(val.to_vec());
+ } else {
+ warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik);
+ }
+ }
+ MerkleNode::Intermediate(l) => {
+ // Get Merkle node for this tree position at remote node
+ // and compare it with local node
+ let remote_node = match self
+ .rpc_client
+ .call(who, SyncRPC::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT)
+ .await?
+ {
+ SyncRPC::Node(_, node) => node,
+ x => {
+ return Err(Error::Message(format!(
+ "Invalid respone to GetNode RPC: {}",
+ debug_serialize(x)
+ )));
+ }
+ };
+ let int_l2 = match remote_node {
+ // If they have an intermediate node at this tree position,
+ // we can compare them to find differences
+ MerkleNode::Intermediate(l2) => l2,
+ // Otherwise, treat it as if they have nothing for this subtree,
+ // which will have the consequence of sending them everything
+ _ => vec![],
+ };
+
+ let join = join_ordered(&l[..], &int_l2[..]);
+ for (p, v1, v2) in join.into_iter() {
+ let diff = match (v1, v2) {
+ (Some(_), None) | (None, Some(_)) => true,
+ (Some(a), Some(b)) => a != b,
+ _ => false,
+ };
+ if diff {
+ todo.push_back(key.add_byte(*p));
+ }
+ }
+ }
+ }
+
+ if todo_items.len() >= 256 {
+ self.send_items(who, std::mem::replace(&mut todo_items, vec![]))
+ .await?;
+ }
+ }
+
+ if !todo_items.is_empty() {
+ self.send_items(who, todo_items).await?;
+ }
+
+ Ok(())
+ }
+
+ async fn send_items(&self, who: UUID, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
+ info!(
+ "({}) Sending {} items to {:?}",
+ self.data.name,
+ item_value_list.len(),
+ who
+ );
+
+ let values = item_value_list
+ .into_iter()
+ .map(|x| Arc::new(ByteBuf::from(x)))
+ .collect::<Vec<_>>();
+
+ let rpc_resp = self
+ .rpc_client
+ .call(who, SyncRPC::Items(values), TABLE_SYNC_RPC_TIMEOUT)
+ .await?;
+ if let SyncRPC::Ok = rpc_resp {
+ Ok(())
+ } else {
+ Err(Error::Message(format!(
+ "Unexpected response to RPC Update: {}",
+ debug_serialize(&rpc_resp)
+ )))
+ }
+ }
+
+ // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
+
+ async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> {
+ match message {
+ SyncRPC::RootCkHash(range, h) => {
+ let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
+ let hash = hash_of::<MerkleNode>(&root_ck)?;
+ Ok(SyncRPC::RootCkDifferent(hash != *h))
+ }
+ SyncRPC::GetNode(k) => {
+ let node = self.merkle.read_node(&k)?;
+ Ok(SyncRPC::Node(k.clone(), node))
+ }
+ SyncRPC::Items(items) => {
+ self.data.update_many(items)?;
+ Ok(SyncRPC::Ok)
+ }
+ _ => Err(Error::Message(format!("Unexpected sync RPC"))),
+ }
+ }
+}
+
+impl SyncTodo {
+ fn add_full_sync<F: TableSchema, R: TableReplication>(
+ &mut self,
+ data: &TableData<F, R>,
+ system: &System,
+ ) {
+ let my_id = system.id;
+
+ self.todo.clear();
+
+ let partitions = data.replication.partitions();
+
+ for i in 0..partitions.len() {
+ let begin = partitions[i].1;
+
+ let end = if i + 1 < partitions.len() {
+ partitions[i + 1].1
+ } else {
+ [0xFFu8; 32].into()
+ };
+
+ let nodes = data.replication.write_nodes(&begin);
+
+ let retain = nodes.contains(&my_id);
+ if !retain {
+ // Check if we have some data to send, otherwise skip
+ if data.store.range(begin..end).next().is_none() {
+ continue;
+ }
+ }
+
+ self.todo.push(TodoPartition {
+ partition: partitions[i].0,
+ begin,
+ end,
+ retain,
+ });
+ }
+ }
+
+ fn pop_task(&mut self) -> Option<TodoPartition> {
+ if self.todo.is_empty() {
+ return None;
+ }
+
+ let i = rand::thread_rng().gen_range(0..self.todo.len());
+ if i == self.todo.len() - 1 {
+ self.todo.pop()
+ } else {
+ let replacement = self.todo.pop().unwrap();
+ let ret = std::mem::replace(&mut self.todo[i], replacement);
+ Some(ret)
+ }
+ }
+}
+
+fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
+ Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
+}
+
+fn join_ordered<'a, K: Ord + Eq, V1, V2>(
+ x: &'a [(K, V1)],
+ y: &'a [(K, V2)],
+) -> Vec<(&'a K, Option<&'a V1>, Option<&'a V2>)> {
+ let mut ret = vec![];
+ let mut i = 0;
+ let mut j = 0;
+ while i < x.len() || j < y.len() {
+ if i < x.len() && j < y.len() && x[i].0 == y[j].0 {
+ ret.push((&x[i].0, Some(&x[i].1), Some(&y[j].1)));
+ i += 1;
+ j += 1;
+ } else if i < x.len() && (j == y.len() || x[i].0 < y[j].0) {
+ ret.push((&x[i].0, Some(&x[i].1), None));
+ i += 1;
+ } else if j < y.len() && (i == x.len() || x[i].0 > y[j].0) {
+ ret.push((&y[j].0, None, Some(&y[j].1)));
+ j += 1;
+ } else {
+ unreachable!();
+ }
+ }
+ ret
+}
diff --git a/src/table/table.rs b/src/table/table.rs
index 1f6b7d25..e203b178 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -2,9 +2,6 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::Duration;
-use log::warn;
-
-use arc_swap::ArcSwapOption;
use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@@ -13,25 +10,25 @@ use garage_util::data::*;
use garage_util::error::Error;
use garage_rpc::membership::System;
-use garage_rpc::ring::Ring;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
+use crate::crdt::CRDT;
+use crate::data::*;
+use crate::gc::*;
+use crate::merkle::*;
+use crate::replication::*;
use crate::schema::*;
-use crate::table_sync::*;
+use crate::sync::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
pub struct Table<F: TableSchema, R: TableReplication> {
- pub instance: F,
- pub replication: R,
-
- pub name: String,
- pub(crate) rpc_client: Arc<RpcClient<TableRPC<F>>>,
-
pub system: Arc<System>,
- pub store: sled::Tree,
- pub syncer: ArcSwapOption<TableSyncer<F, R>>,
+ pub data: Arc<TableData<F, R>>,
+ pub merkle_updater: Arc<MerkleUpdater<F, R>>,
+ pub syncer: Arc<TableSyncer<F, R>>,
+ rpc_client: Arc<RpcClient<TableRPC<F>>>,
}
#[derive(Serialize, Deserialize)]
@@ -45,30 +42,10 @@ pub(crate) enum TableRPC<F: TableSchema> {
ReadRange(F::P, Option<F::S>, Option<F::Filter>, usize),
Update(Vec<Arc<ByteBuf>>),
-
- SyncRPC(SyncRPC),
}
impl<F: TableSchema> RpcMessage for TableRPC<F> {}
-pub trait TableReplication: Send + Sync {
- // See examples in table_sharded.rs and table_fullcopy.rs
- // To understand various replication methods
-
- // Which nodes to send reads from
- fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
- fn read_quorum(&self) -> usize;
-
- // Which nodes to send writes to
- fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
- fn write_quorum(&self, system: &System) -> usize;
- fn max_write_errors(&self) -> usize;
-
- // Which are the nodes that do actually replicate the data
- fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>;
- fn split_points(&self, ring: &Ring) -> Vec<Hash>;
-}
-
impl<F, R> Table<F, R>
where
F: TableSchema + 'static,
@@ -76,7 +53,7 @@ where
{
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
- pub async fn new(
+ pub fn new(
instance: F,
replication: R,
system: Arc<System>,
@@ -84,31 +61,37 @@ where
name: String,
rpc_server: &mut RpcServer,
) -> Arc<Self> {
- let store = db.open_tree(&name).expect("Unable to open DB tree");
-
let rpc_path = format!("table_{}", name);
let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path);
+ let data = TableData::new(system.clone(), name, instance, replication, db);
+
+ let merkle_updater = MerkleUpdater::launch(&system.background, data.clone());
+
+ let syncer = TableSyncer::launch(
+ system.clone(),
+ data.clone(),
+ merkle_updater.clone(),
+ rpc_server,
+ );
+ TableGC::launch(system.clone(), data.clone(), rpc_server);
+
let table = Arc::new(Self {
- instance,
- replication,
- name,
- rpc_client,
system,
- store,
- syncer: ArcSwapOption::from(None),
+ data,
+ merkle_updater,
+ syncer,
+ rpc_client,
});
- table.clone().register_handler(rpc_server, rpc_path);
- let syncer = TableSyncer::launch(table.clone()).await;
- table.syncer.swap(Some(syncer));
+ table.clone().register_handler(rpc_server, rpc_path);
table
}
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
- let who = self.replication.write_nodes(&hash, &self.system);
+ let who = self.data.replication.write_nodes(&hash);
//eprintln!("insert who: {:?}", who);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
@@ -118,7 +101,7 @@ where
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.replication.write_quorum(&self.system))
+ RequestStrategy::with_quorum(self.data.replication.write_quorum())
.with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
@@ -130,7 +113,7 @@ where
for entry in entries.iter() {
let hash = entry.partition_key().hash();
- let who = self.replication.write_nodes(&hash, &self.system);
+ let who = self.data.replication.write_nodes(&hash);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
for node in who {
if !call_list.contains_key(&node) {
@@ -154,7 +137,7 @@ where
errors.push(e);
}
}
- if errors.len() > self.replication.max_write_errors() {
+ if errors.len() > self.data.replication.max_write_errors() {
Err(Error::Message("Too many errors".into()))
} else {
Ok(())
@@ -167,7 +150,7 @@ where
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash();
- let who = self.replication.read_nodes(&hash, &self.system);
+ let who = self.data.replication.read_nodes(&hash);
//eprintln!("get who: {:?}", who);
let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
@@ -176,7 +159,7 @@ where
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.replication.read_quorum())
+ RequestStrategy::with_quorum(self.data.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
@@ -187,7 +170,7 @@ where
for resp in resps {
if let TableRPC::ReadEntryResponse(value) = resp {
if let Some(v_bytes) = value {
- let v = self.decode_entry(v_bytes.as_slice())?;
+ let v = self.data.decode_entry(v_bytes.as_slice())?;
ret = match ret {
None => Some(v),
Some(mut x) => {
@@ -223,7 +206,7 @@ where
limit: usize,
) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash();
- let who = self.replication.read_nodes(&hash, &self.system);
+ let who = self.data.replication.read_nodes(&hash);
let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
@@ -232,7 +215,7 @@ where
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.replication.read_quorum())
+ RequestStrategy::with_quorum(self.data.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
@@ -243,8 +226,8 @@ where
for resp in resps {
if let TableRPC::Update(entries) = resp {
for entry_bytes in entries.iter() {
- let entry = self.decode_entry(entry_bytes.as_slice())?;
- let entry_key = self.tree_key(entry.partition_key(), entry.sort_key());
+ let entry = self.data.decode_entry(entry_bytes.as_slice())?;
+ let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key());
match ret.remove(&entry_key) {
None => {
ret.insert(entry_key, Some(entry));
@@ -313,146 +296,18 @@ where
async fn handle(self: &Arc<Self>, msg: &TableRPC<F>) -> Result<TableRPC<F>, Error> {
match msg {
TableRPC::ReadEntry(key, sort_key) => {
- let value = self.handle_read_entry(key, sort_key)?;
+ let value = self.data.read_entry(key, sort_key)?;
Ok(TableRPC::ReadEntryResponse(value))
}
TableRPC::ReadRange(key, begin_sort_key, filter, limit) => {
- let values = self.handle_read_range(key, begin_sort_key, filter, *limit)?;
+ let values = self.data.read_range(key, begin_sort_key, filter, *limit)?;
Ok(TableRPC::Update(values))
}
TableRPC::Update(pairs) => {
- self.handle_update(pairs).await?;
+ self.data.update_many(pairs)?;
Ok(TableRPC::Ok)
}
- TableRPC::SyncRPC(rpc) => {
- let syncer = self.syncer.load_full().unwrap();
- let response = syncer
- .handle_rpc(rpc, self.system.background.stop_signal.clone())
- .await?;
- Ok(TableRPC::SyncRPC(response))
- }
_ => Err(Error::BadRPC(format!("Unexpected table RPC"))),
}
}
-
- fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> {
- let tree_key = self.tree_key(p, s);
- if let Some(bytes) = self.store.get(&tree_key)? {
- Ok(Some(ByteBuf::from(bytes.to_vec())))
- } else {
- Ok(None)
- }
- }
-
- fn handle_read_range(
- &self,
- p: &F::P,
- s: &Option<F::S>,
- filter: &Option<F::Filter>,
- limit: usize,
- ) -> Result<Vec<Arc<ByteBuf>>, Error> {
- let partition_hash = p.hash();
- let first_key = match s {
- None => partition_hash.to_vec(),
- Some(sk) => self.tree_key(p, sk),
- };
- let mut ret = vec![];
- for item in self.store.range(first_key..) {
- let (key, value) = item?;
- if &key[..32] != partition_hash.as_slice() {
- break;
- }
- let keep = match filter {
- None => true,
- Some(f) => {
- let entry = self.decode_entry(value.as_ref())?;
- F::matches_filter(&entry, f)
- }
- };
- if keep {
- ret.push(Arc::new(ByteBuf::from(value.as_ref())));
- }
- if ret.len() >= limit {
- break;
- }
- }
- Ok(ret)
- }
-
- pub async fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> {
- let syncer = self.syncer.load_full().unwrap();
-
- for update_bytes in entries.iter() {
- let update = self.decode_entry(update_bytes.as_slice())?;
-
- let tree_key = self.tree_key(update.partition_key(), update.sort_key());
-
- let (old_entry, new_entry) = self.store.transaction(|db| {
- let (old_entry, new_entry) = match db.get(&tree_key)? {
- Some(prev_bytes) => {
- let old_entry = self
- .decode_entry(&prev_bytes)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- let mut new_entry = old_entry.clone();
- new_entry.merge(&update);
- (Some(old_entry), new_entry)
- }
- None => (None, update.clone()),
- };
-
- let new_bytes = rmp_to_vec_all_named(&new_entry)
- .map_err(Error::RMPEncode)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- db.insert(tree_key.clone(), new_bytes)?;
- Ok((old_entry, new_entry))
- })?;
-
- if old_entry.as_ref() != Some(&new_entry) {
- self.instance.updated(old_entry, Some(new_entry));
- syncer.invalidate(&tree_key[..]);
- }
- }
-
- Ok(())
- }
-
- pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
- let removed = self.store.transaction(|txn| {
- if let Some(cur_v) = txn.get(k)? {
- if cur_v == v {
- txn.remove(k)?;
- return Ok(true);
- }
- }
- Ok(false)
- })?;
- if removed {
- let old_entry = self.decode_entry(v)?;
- self.instance.updated(Some(old_entry), None);
- self.syncer.load_full().unwrap().invalidate(k);
- }
- Ok(removed)
- }
-
- fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
- let mut ret = p.hash().to_vec();
- ret.extend(s.sort_key());
- ret
- }
-
- fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
- match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
- Ok(x) => Ok(x),
- Err(e) => match F::try_migrate(bytes) {
- Some(x) => Ok(x),
- None => {
- warn!("Unable to decode entry of {}: {}", self.name, e);
- for line in hexdump::hexdump_iter(bytes) {
- debug!("{}", line);
- }
- Err(e.into())
- }
- },
- }
- }
}
diff --git a/src/table/table_fullcopy.rs b/src/table/table_fullcopy.rs
deleted file mode 100644
index c55879d8..00000000
--- a/src/table/table_fullcopy.rs
+++ /dev/null
@@ -1,59 +0,0 @@
-use std::sync::Arc;
-
-use garage_rpc::membership::System;
-use garage_rpc::ring::Ring;
-use garage_util::data::*;
-
-use crate::*;
-
-#[derive(Clone)]
-pub struct TableFullReplication {
- pub max_faults: usize,
-}
-
-#[derive(Clone)]
-struct Neighbors {
- ring: Arc<Ring>,
- neighbors: Vec<UUID>,
-}
-
-impl TableFullReplication {
- pub fn new(max_faults: usize) -> Self {
- TableFullReplication { max_faults }
- }
-}
-
-impl TableReplication for TableFullReplication {
- // Full replication schema: all nodes store everything
- // Writes are disseminated in an epidemic manner in the network
-
- // Advantage: do all reads locally, extremely fast
- // Inconvenient: only suitable to reasonably small tables
-
- fn read_nodes(&self, _hash: &Hash, system: &System) -> Vec<UUID> {
- vec![system.id]
- }
- fn read_quorum(&self) -> usize {
- 1
- }
-
- fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> {
- self.replication_nodes(hash, system.ring.borrow().as_ref())
- }
- fn write_quorum(&self, system: &System) -> usize {
- system.ring.borrow().config.members.len() - self.max_faults
- }
- fn max_write_errors(&self) -> usize {
- self.max_faults
- }
-
- fn replication_nodes(&self, _hash: &Hash, ring: &Ring) -> Vec<UUID> {
- ring.config.members.keys().cloned().collect::<Vec<_>>()
- }
- fn split_points(&self, _ring: &Ring) -> Vec<Hash> {
- let mut ret = vec![];
- ret.push([0u8; 32].into());
- ret.push([0xFFu8; 32].into());
- ret
- }
-}
diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs
deleted file mode 100644
index 5fa6793b..00000000
--- a/src/table/table_sync.rs
+++ /dev/null
@@ -1,891 +0,0 @@
-use rand::Rng;
-use std::collections::{BTreeMap, VecDeque};
-use std::sync::{Arc, Mutex};
-use std::time::{Duration, Instant};
-
-use futures::future::join_all;
-use futures::{pin_mut, select};
-use futures_util::future::*;
-use futures_util::stream::*;
-use serde::{Deserialize, Serialize};
-use serde_bytes::ByteBuf;
-use tokio::sync::{mpsc, watch};
-
-use garage_rpc::ring::Ring;
-use garage_util::data::*;
-use garage_util::error::Error;
-
-use crate::*;
-
-const MAX_DEPTH: usize = 16;
-const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
-const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800);
-const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
-
-pub struct TableSyncer<F: TableSchema, R: TableReplication> {
- table: Arc<Table<F, R>>,
- todo: Mutex<SyncTodo>,
- cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksumCache>>>,
-}
-
-#[derive(Serialize, Deserialize)]
-pub(crate) enum SyncRPC {
- GetRootChecksumRange(Hash, Hash),
- RootChecksumRange(SyncRange),
- Checksums(Vec<RangeChecksum>),
- Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>),
-}
-
-struct SyncTodo {
- todo: Vec<TodoPartition>,
-}
-
-#[derive(Debug, Clone)]
-struct TodoPartition {
- // Partition consists in hashes between begin included and end excluded
- begin: Hash,
- end: Hash,
-
- // Are we a node that stores this partition or not?
- retain: bool,
-}
-
-// A SyncRange defines a query on the dataset stored by a node, in the following way:
-// - all items whose key are >= `begin`
-// - stopping at the first item whose key hash has at least `level` leading zero bytes (excluded)
-// - except if the first item of the range has such many leading zero bytes
-// - and stopping at `end` (excluded) if such an item is not found
-// The checksum itself does not store all of the items in the database, only the hashes of the "sub-ranges"
-// i.e. of ranges of level `level-1` that cover the same range
-// (ranges of level 0 do not exist and their hash is simply the hash of the first item >= begin)
-// See RangeChecksum for the struct that stores this information.
-#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
-pub(crate) struct SyncRange {
- begin: Vec<u8>,
- end: Vec<u8>,
- level: usize,
-}
-
-impl std::cmp::PartialOrd for SyncRange {
- fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
- Some(self.cmp(other))
- }
-}
-impl std::cmp::Ord for SyncRange {
- fn cmp(&self, other: &Self) -> std::cmp::Ordering {
- self.begin
- .cmp(&other.begin)
- .then(self.level.cmp(&other.level))
- .then(self.end.cmp(&other.end))
- }
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub(crate) struct RangeChecksum {
- bounds: SyncRange,
- children: Vec<(SyncRange, Hash)>,
- found_limit: Option<Vec<u8>>,
-
- #[serde(skip, default = "std::time::Instant::now")]
- time: Instant,
-}
-
-#[derive(Debug, Clone)]
-struct RangeChecksumCache {
- hash: Option<Hash>, // None if no children
- found_limit: Option<Vec<u8>>,
- time: Instant,
-}
-
-impl<F, R> TableSyncer<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- pub(crate) async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> {
- let todo = SyncTodo { todo: Vec::new() };
- let syncer = Arc::new(TableSyncer {
- table: table.clone(),
- todo: Mutex::new(todo),
- cache: (0..MAX_DEPTH)
- .map(|_| Mutex::new(BTreeMap::new()))
- .collect::<Vec<_>>(),
- });
-
- let (busy_tx, busy_rx) = mpsc::unbounded_channel();
-
- let s1 = syncer.clone();
- table
- .system
- .background
- .spawn_worker(
- format!("table sync watcher for {}", table.name),
- move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
- )
- .await;
-
- let s2 = syncer.clone();
- table
- .system
- .background
- .spawn_worker(
- format!("table syncer for {}", table.name),
- move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
- )
- .await;
-
- let s3 = syncer.clone();
- tokio::spawn(async move {
- tokio::time::delay_for(Duration::from_secs(20)).await;
- s3.add_full_scan().await;
- });
-
- syncer
- }
-
- async fn watcher_task(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- mut busy_rx: mpsc::UnboundedReceiver<bool>,
- ) -> Result<(), Error> {
- let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone();
- let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone();
- let mut nothing_to_do_since = Some(Instant::now());
-
- while !*must_exit.borrow() {
- let s_ring_recv = ring_recv.recv().fuse();
- let s_busy = busy_rx.recv().fuse();
- let s_must_exit = must_exit.recv().fuse();
- let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse();
- pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout);
-
- select! {
- new_ring_r = s_ring_recv => {
- if let Some(new_ring) = new_ring_r {
- debug!("({}) Adding ring difference to syncer todo list", self.table.name);
- self.todo.lock().unwrap().add_ring_difference(&self.table, &prev_ring, &new_ring);
- prev_ring = new_ring;
- }
- }
- busy_opt = s_busy => {
- if let Some(busy) = busy_opt {
- if busy {
- nothing_to_do_since = None;
- } else {
- if nothing_to_do_since.is_none() {
- nothing_to_do_since = Some(Instant::now());
- }
- }
- }
- }
- must_exit_v = s_must_exit => {
- if must_exit_v.unwrap_or(false) {
- break;
- }
- }
- _ = s_timeout => {
- if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) {
- nothing_to_do_since = None;
- debug!("({}) Adding full scan to syncer todo list", self.table.name);
- self.add_full_scan().await;
- }
- }
- }
- }
- Ok(())
- }
-
- pub async fn add_full_scan(&self) {
- self.todo.lock().unwrap().add_full_scan(&self.table);
- }
-
- async fn syncer_task(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- busy_tx: mpsc::UnboundedSender<bool>,
- ) -> Result<(), Error> {
- while !*must_exit.borrow() {
- let task = self.todo.lock().unwrap().pop_task();
- if let Some(partition) = task {
- busy_tx.send(true)?;
- let res = self
- .clone()
- .sync_partition(&partition, &mut must_exit)
- .await;
- if let Err(e) = res {
- warn!(
- "({}) Error while syncing {:?}: {}",
- self.table.name, partition, e
- );
- }
- } else {
- busy_tx.send(false)?;
- tokio::time::delay_for(Duration::from_secs(1)).await;
- }
- }
- Ok(())
- }
-
- async fn sync_partition(
- self: Arc<Self>,
- partition: &TodoPartition,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<(), Error> {
- if partition.retain {
- let my_id = self.table.system.id;
- let nodes = self
- .table
- .replication
- .write_nodes(&partition.begin, &self.table.system)
- .into_iter()
- .filter(|node| *node != my_id)
- .collect::<Vec<_>>();
-
- debug!(
- "({}) Preparing to sync {:?} with {:?}...",
- self.table.name, partition, nodes
- );
- let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit)?;
-
- let mut sync_futures = nodes
- .iter()
- .map(|node| {
- self.clone().do_sync_with(
- partition.clone(),
- root_cks.clone(),
- *node,
- must_exit.clone(),
- )
- })
- .collect::<FuturesUnordered<_>>();
-
- let mut n_errors = 0;
- while let Some(r) = sync_futures.next().await {
- if let Err(e) = r {
- n_errors += 1;
- warn!("({}) Sync error: {}", self.table.name, e);
- }
- }
- if n_errors > self.table.replication.max_write_errors() {
- return Err(Error::Message(format!(
- "Sync failed with too many nodes (should have been: {:?}).",
- nodes
- )));
- }
- } else {
- self.offload_partition(&partition.begin, &partition.end, must_exit)
- .await?;
- }
-
- Ok(())
- }
-
- // Offload partition: this partition is not something we are storing,
- // so send it out to all other nodes that store it and delete items locally.
- // We don't bother checking if the remote nodes already have the items,
- // we just batch-send everything. Offloading isn't supposed to happen very often.
- // If any of the nodes that are supposed to store the items is unable to
- // save them, we interrupt the process.
- async fn offload_partition(
- self: &Arc<Self>,
- begin: &Hash,
- end: &Hash,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<(), Error> {
- let mut counter: usize = 0;
-
- while !*must_exit.borrow() {
- let mut items = Vec::new();
-
- for item in self.table.store.range(begin.to_vec()..end.to_vec()) {
- let (key, value) = item?;
- items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
-
- if items.len() >= 1024 {
- break;
- }
- }
-
- if items.len() > 0 {
- let nodes = self
- .table
- .replication
- .write_nodes(&begin, &self.table.system)
- .into_iter()
- .collect::<Vec<_>>();
- if nodes.contains(&self.table.system.id) {
- warn!("Interrupting offload as partitions seem to have changed");
- break;
- }
-
- counter += 1;
- debug!(
- "Offloading {} items from {:?}..{:?} ({})",
- items.len(),
- begin,
- end,
- counter
- );
- self.offload_items(&items, &nodes[..]).await?;
- } else {
- break;
- }
- }
-
- Ok(())
- }
-
- async fn offload_items(
- self: &Arc<Self>,
- items: &Vec<(Vec<u8>, Arc<ByteBuf>)>,
- nodes: &[UUID],
- ) -> Result<(), Error> {
- let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
- let update_msg = Arc::new(TableRPC::<F>::Update(values));
-
- for res in join_all(nodes.iter().map(|to| {
- self.table
- .rpc_client
- .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT)
- }))
- .await
- {
- res?;
- }
-
- // All remote nodes have written those items, now we can delete them locally
- let mut not_removed = 0;
- for (k, v) in items.iter() {
- if !self.table.delete_if_equal(&k[..], &v[..])? {
- not_removed += 1;
- }
- }
-
- if not_removed > 0 {
- debug!("{} items not removed during offload because they changed in between (trying again...)", not_removed);
- }
-
- Ok(())
- }
-
- fn root_checksum(
- self: &Arc<Self>,
- begin: &Hash,
- end: &Hash,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<RangeChecksum, Error> {
- for i in 1..MAX_DEPTH {
- let rc = self.range_checksum(
- &SyncRange {
- begin: begin.to_vec(),
- end: end.to_vec(),
- level: i,
- },
- must_exit,
- )?;
- if rc.found_limit.is_none() {
- return Ok(rc);
- }
- }
- Err(Error::Message(format!(
- "Unable to compute root checksum (this should never happen)"
- )))
- }
-
- fn range_checksum(
- self: &Arc<Self>,
- range: &SyncRange,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<RangeChecksum, Error> {
- assert!(range.level != 0);
- trace!("Call range_checksum {:?}", range);
-
- if range.level == 1 {
- let mut children = vec![];
- for item in self
- .table
- .store
- .range(range.begin.clone()..range.end.clone())
- {
- let (key, value) = item?;
- let key_hash = blake2sum(&key[..]);
- if children.len() > 0
- && key_hash.as_slice()[0..range.level]
- .iter()
- .all(|x| *x == 0u8)
- {
- trace!(
- "range_checksum {:?} returning {} items",
- range,
- children.len()
- );
- return Ok(RangeChecksum {
- bounds: range.clone(),
- children,
- found_limit: Some(key.to_vec()),
- time: Instant::now(),
- });
- }
- let item_range = SyncRange {
- begin: key.to_vec(),
- end: vec![],
- level: 0,
- };
- children.push((item_range, blake2sum(&value[..])));
- }
- trace!(
- "range_checksum {:?} returning {} items",
- range,
- children.len()
- );
- Ok(RangeChecksum {
- bounds: range.clone(),
- children,
- found_limit: None,
- time: Instant::now(),
- })
- } else {
- let mut children = vec![];
- let mut sub_range = SyncRange {
- begin: range.begin.clone(),
- end: range.end.clone(),
- level: range.level - 1,
- };
- let mut time = Instant::now();
- while !*must_exit.borrow() {
- let sub_ck = self.range_checksum_cached_hash(&sub_range, must_exit)?;
-
- if let Some(hash) = sub_ck.hash {
- children.push((sub_range.clone(), hash));
- if sub_ck.time < time {
- time = sub_ck.time;
- }
- }
-
- if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() {
- trace!(
- "range_checksum {:?} returning {} items",
- range,
- children.len()
- );
- return Ok(RangeChecksum {
- bounds: range.clone(),
- children,
- found_limit: None,
- time,
- });
- }
- let found_limit = sub_ck.found_limit.unwrap();
-
- let actual_limit_hash = blake2sum(&found_limit[..]);
- if actual_limit_hash.as_slice()[0..range.level]
- .iter()
- .all(|x| *x == 0u8)
- {
- trace!(
- "range_checksum {:?} returning {} items",
- range,
- children.len()
- );
- return Ok(RangeChecksum {
- bounds: range.clone(),
- children,
- found_limit: Some(found_limit.clone()),
- time,
- });
- }
-
- sub_range.begin = found_limit;
- }
- trace!("range_checksum {:?} exiting due to must_exit", range);
- Err(Error::Message(format!("Exiting.")))
- }
- }
-
- fn range_checksum_cached_hash(
- self: &Arc<Self>,
- range: &SyncRange,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<RangeChecksumCache, Error> {
- {
- let mut cache = self.cache[range.level].lock().unwrap();
- if let Some(v) = cache.get(&range) {
- if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT {
- return Ok(v.clone());
- }
- }
- cache.remove(&range);
- }
-
- let v = self.range_checksum(&range, must_exit)?;
- trace!(
- "({}) New checksum calculated for {}-{}/{}, {} children",
- self.table.name,
- hex::encode(&range.begin)
- .chars()
- .take(16)
- .collect::<String>(),
- hex::encode(&range.end).chars().take(16).collect::<String>(),
- range.level,
- v.children.len()
- );
-
- let hash = if v.children.len() > 0 {
- Some(blake2sum(&rmp_to_vec_all_named(&v)?[..]))
- } else {
- None
- };
- let cache_entry = RangeChecksumCache {
- hash,
- found_limit: v.found_limit,
- time: v.time,
- };
-
- let mut cache = self.cache[range.level].lock().unwrap();
- cache.insert(range.clone(), cache_entry.clone());
- Ok(cache_entry)
- }
-
- async fn do_sync_with(
- self: Arc<Self>,
- partition: TodoPartition,
- root_ck: RangeChecksum,
- who: UUID,
- mut must_exit: watch::Receiver<bool>,
- ) -> Result<(), Error> {
- let mut todo = VecDeque::new();
-
- // If their root checksum has level > than us, use that as a reference
- let root_cks_resp = self
- .table
- .rpc_client
- .call(
- who,
- TableRPC::<F>::SyncRPC(SyncRPC::GetRootChecksumRange(
- partition.begin.clone(),
- partition.end.clone(),
- )),
- TABLE_SYNC_RPC_TIMEOUT,
- )
- .await?;
- if let TableRPC::<F>::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp {
- if range.level > root_ck.bounds.level {
- let their_root_range_ck = self.range_checksum(&range, &mut must_exit)?;
- todo.push_back(their_root_range_ck);
- } else {
- todo.push_back(root_ck);
- }
- } else {
- return Err(Error::Message(format!(
- "Invalid respone to GetRootChecksumRange RPC: {}",
- debug_serialize(root_cks_resp)
- )));
- }
-
- while !todo.is_empty() && !*must_exit.borrow() {
- let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y);
- trace!(
- "({}) Sync with {:?}: {} ({}) remaining",
- self.table.name,
- who,
- todo.len(),
- total_children
- );
-
- let step_size = std::cmp::min(16, todo.len());
- let step = todo.drain(..step_size).collect::<Vec<_>>();
-
- let rpc_resp = self
- .table
- .rpc_client
- .call(
- who,
- TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step)),
- TABLE_SYNC_RPC_TIMEOUT,
- )
- .await?;
- if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) =
- rpc_resp
- {
- if diff_ranges.len() > 0 || diff_items.len() > 0 {
- info!(
- "({}) Sync with {:?}: difference {} ranges, {} items",
- self.table.name,
- who,
- diff_ranges.len(),
- diff_items.len()
- );
- }
- let mut items_to_send = vec![];
- for differing in diff_ranges.drain(..) {
- if differing.level == 0 {
- items_to_send.push(differing.begin);
- } else {
- let checksum = self.range_checksum(&differing, &mut must_exit)?;
- todo.push_back(checksum);
- }
- }
- if diff_items.len() > 0 {
- self.table.handle_update(&diff_items[..]).await?;
- }
- if items_to_send.len() > 0 {
- self.send_items(who, items_to_send).await?;
- }
- } else {
- return Err(Error::Message(format!(
- "Unexpected response to sync RPC checksums: {}",
- debug_serialize(&rpc_resp)
- )));
- }
- }
- Ok(())
- }
-
- async fn send_items(&self, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
- info!(
- "({}) Sending {} items to {:?}",
- self.table.name,
- item_list.len(),
- who
- );
-
- let mut values = vec![];
- for item in item_list.iter() {
- if let Some(v) = self.table.store.get(&item[..])? {
- values.push(Arc::new(ByteBuf::from(v.as_ref())));
- }
- }
- let rpc_resp = self
- .table
- .rpc_client
- .call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT)
- .await?;
- if let TableRPC::<F>::Ok = rpc_resp {
- Ok(())
- } else {
- Err(Error::Message(format!(
- "Unexpected response to RPC Update: {}",
- debug_serialize(&rpc_resp)
- )))
- }
- }
-
- pub(crate) async fn handle_rpc(
- self: &Arc<Self>,
- message: &SyncRPC,
- mut must_exit: watch::Receiver<bool>,
- ) -> Result<SyncRPC, Error> {
- match message {
- SyncRPC::GetRootChecksumRange(begin, end) => {
- let root_cks = self.root_checksum(&begin, &end, &mut must_exit)?;
- Ok(SyncRPC::RootChecksumRange(root_cks.bounds))
- }
- SyncRPC::Checksums(checksums) => {
- self.handle_checksums_rpc(&checksums[..], &mut must_exit)
- .await
- }
- _ => Err(Error::Message(format!("Unexpected sync RPC"))),
- }
- }
-
- async fn handle_checksums_rpc(
- self: &Arc<Self>,
- checksums: &[RangeChecksum],
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<SyncRPC, Error> {
- let mut ret_ranges = vec![];
- let mut ret_items = vec![];
-
- for their_ckr in checksums.iter() {
- let our_ckr = self.range_checksum(&their_ckr.bounds, must_exit)?;
- for (their_range, their_hash) in their_ckr.children.iter() {
- let differs = match our_ckr
- .children
- .binary_search_by(|(our_range, _)| our_range.cmp(&their_range))
- {
- Err(_) => {
- if their_range.level >= 1 {
- let cached_hash =
- self.range_checksum_cached_hash(&their_range, must_exit)?;
- cached_hash.hash.map(|h| h != *their_hash).unwrap_or(true)
- } else {
- true
- }
- }
- Ok(i) => our_ckr.children[i].1 != *their_hash,
- };
- if differs {
- ret_ranges.push(their_range.clone());
- if their_range.level == 0 {
- if let Some(item_bytes) =
- self.table.store.get(their_range.begin.as_slice())?
- {
- ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
- }
- }
- }
- }
- for (our_range, _hash) in our_ckr.children.iter() {
- if let Some(their_found_limit) = &their_ckr.found_limit {
- if our_range.begin.as_slice() > their_found_limit.as_slice() {
- break;
- }
- }
-
- let not_present = our_ckr
- .children
- .binary_search_by(|(their_range, _)| their_range.cmp(&our_range))
- .is_err();
- if not_present {
- if our_range.level > 0 {
- ret_ranges.push(our_range.clone());
- }
- if our_range.level == 0 {
- if let Some(item_bytes) =
- self.table.store.get(our_range.begin.as_slice())?
- {
- ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
- }
- }
- }
- }
- }
- let n_checksums = checksums
- .iter()
- .map(|x| x.children.len())
- .fold(0, |x, y| x + y);
- if ret_ranges.len() > 0 || ret_items.len() > 0 {
- trace!(
- "({}) Checksum comparison RPC: {} different + {} items for {} received",
- self.table.name,
- ret_ranges.len(),
- ret_items.len(),
- n_checksums
- );
- }
- Ok(SyncRPC::Difference(ret_ranges, ret_items))
- }
-
- pub(crate) fn invalidate(self: &Arc<Self>, item_key: &[u8]) {
- for i in 1..MAX_DEPTH {
- let needle = SyncRange {
- begin: item_key.to_vec(),
- end: vec![],
- level: i,
- };
- let mut cache = self.cache[i].lock().unwrap();
- if let Some(cache_entry) = cache.range(..=needle).rev().next() {
- if cache_entry.0.begin[..] <= *item_key && cache_entry.0.end[..] > *item_key {
- let index = cache_entry.0.clone();
- drop(cache_entry);
- cache.remove(&index);
- }
- }
- }
- }
-}
-
-impl SyncTodo {
- fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, table: &Table<F, R>) {
- let my_id = table.system.id;
-
- self.todo.clear();
-
- let ring = table.system.ring.borrow().clone();
- let split_points = table.replication.split_points(&ring);
-
- for i in 0..split_points.len() - 1 {
- let begin = split_points[i];
- let end = split_points[i + 1];
- let nodes = table.replication.replication_nodes(&begin, &ring);
-
- let retain = nodes.contains(&my_id);
- if !retain {
- // Check if we have some data to send, otherwise skip
- if table.store.range(begin..end).next().is_none() {
- continue;
- }
- }
-
- self.todo.push(TodoPartition { begin, end, retain });
- }
- }
-
- fn add_ring_difference<F: TableSchema, R: TableReplication>(
- &mut self,
- table: &Table<F, R>,
- old_ring: &Ring,
- new_ring: &Ring,
- ) {
- let my_id = table.system.id;
-
- // If it is us who are entering or leaving the system,
- // initiate a full sync instead of incremental sync
- if old_ring.config.members.contains_key(&my_id)
- != new_ring.config.members.contains_key(&my_id)
- {
- self.add_full_scan(table);
- return;
- }
-
- let mut all_points = None
- .into_iter()
- .chain(table.replication.split_points(old_ring).drain(..))
- .chain(table.replication.split_points(new_ring).drain(..))
- .chain(self.todo.iter().map(|x| x.begin))
- .chain(self.todo.iter().map(|x| x.end))
- .collect::<Vec<_>>();
- all_points.sort();
- all_points.dedup();
-
- let mut old_todo = std::mem::replace(&mut self.todo, vec![]);
- old_todo.sort_by(|x, y| x.begin.cmp(&y.begin));
- let mut new_todo = vec![];
-
- for i in 0..all_points.len() - 1 {
- let begin = all_points[i];
- let end = all_points[i + 1];
- let was_ours = table
- .replication
- .replication_nodes(&begin, &old_ring)
- .contains(&my_id);
- let is_ours = table
- .replication
- .replication_nodes(&begin, &new_ring)
- .contains(&my_id);
-
- let was_todo = match old_todo.binary_search_by(|x| x.begin.cmp(&begin)) {
- Ok(_) => true,
- Err(j) => {
- (j > 0 && old_todo[j - 1].begin < end && begin < old_todo[j - 1].end)
- || (j < old_todo.len()
- && old_todo[j].begin < end && begin < old_todo[j].end)
- }
- };
- if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) {
- new_todo.push(TodoPartition {
- begin,
- end,
- retain: is_ours,
- });
- }
- }
-
- self.todo = new_todo;
- }
-
- fn pop_task(&mut self) -> Option<TodoPartition> {
- if self.todo.is_empty() {
- return None;
- }
-
- let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len());
- if i == self.todo.len() - 1 {
- self.todo.pop()
- } else {
- let replacement = self.todo.pop().unwrap();
- let ret = std::mem::replace(&mut self.todo[i], replacement);
- Some(ret)
- }
- }
-}
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 35130c96..4698a04f 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -13,29 +13,26 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-rand = "0.7"
-hex = "0.3"
-sha2 = "0.8"
+rand = "0.8"
+hex = "0.4"
+sha2 = "0.9"
blake2 = "0.9"
-err-derive = "0.2.3"
+err-derive = "0.3"
log = "0.4"
fasthash = "0.4"
sled = "0.34"
toml = "0.5"
-rmp-serde = "0.14.3"
+rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_json = "1.0"
+chrono = "0.4"
futures = "0.3"
-futures-util = "0.3"
-tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
http = "0.2"
-hyper = "0.13"
-rustls = "0.17"
+hyper = "0.14"
+rustls = "0.19"
webpki = "0.21"
-
-roxmltree = "0.11"
-
diff --git a/src/util/background.rs b/src/util/background.rs
index 937062dd..b5eb8bc8 100644
--- a/src/util/background.rs
+++ b/src/util/background.rs
@@ -1,12 +1,11 @@
use core::future::Future;
use std::pin::Pin;
+use std::sync::Arc;
+use std::time::Duration;
-use futures::future::join_all;
+use futures::future::*;
use futures::select;
-use futures_util::future::*;
-use std::sync::Arc;
-use tokio::sync::Mutex;
-use tokio::sync::{mpsc, watch, Notify};
+use tokio::sync::{mpsc, watch, Mutex};
use crate::error::Error;
@@ -14,54 +13,106 @@ type JobOutput = Result<(), Error>;
type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
pub struct BackgroundRunner {
- n_runners: usize,
pub stop_signal: watch::Receiver<bool>,
queue_in: mpsc::UnboundedSender<(Job, bool)>,
- queue_out: Mutex<mpsc::UnboundedReceiver<(Job, bool)>>,
- job_notify: Notify,
-
- workers: Mutex<Vec<tokio::task::JoinHandle<()>>>,
+ worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
}
impl BackgroundRunner {
- pub fn new(n_runners: usize, stop_signal: watch::Receiver<bool>) -> Arc<Self> {
+ pub fn new(
+ n_runners: usize,
+ stop_signal: watch::Receiver<bool>,
+ ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
+ let (worker_in, mut worker_out) = mpsc::unbounded_channel();
+
+ let stop_signal_2 = stop_signal.clone();
+ let await_all_done = tokio::spawn(async move {
+ loop {
+ let wkr = {
+ select! {
+ item = worker_out.recv().fuse() => {
+ match item {
+ Some(x) => x,
+ None => break,
+ }
+ }
+ _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => {
+ if *stop_signal_2.borrow() {
+ break;
+ } else {
+ continue;
+ }
+ }
+ }
+ };
+ if let Err(e) = wkr.await {
+ error!("Error while awaiting for worker: {}", e);
+ }
+ }
+ });
+
let (queue_in, queue_out) = mpsc::unbounded_channel();
- Arc::new(Self {
- n_runners,
- stop_signal,
- queue_in,
- queue_out: Mutex::new(queue_out),
- job_notify: Notify::new(),
- workers: Mutex::new(Vec::new()),
- })
- }
+ let queue_out = Arc::new(Mutex::new(queue_out));
- pub async fn run(self: Arc<Self>) {
- let mut workers = self.workers.lock().await;
- for i in 0..self.n_runners {
- workers.push(tokio::spawn(self.clone().runner(i)));
- }
- drop(workers);
+ for i in 0..n_runners {
+ let queue_out = queue_out.clone();
+ let stop_signal = stop_signal.clone();
- let mut stop_signal = self.stop_signal.clone();
- while let Some(exit_now) = stop_signal.recv().await {
- if exit_now {
- let mut workers = self.workers.lock().await;
- let workers_vec = workers.drain(..).collect::<Vec<_>>();
- join_all(workers_vec).await;
- return;
- }
+ worker_in
+ .send(tokio::spawn(async move {
+ loop {
+ let (job, cancellable) = {
+ select! {
+ item = wait_job(&queue_out).fuse() => match item {
+ // We received a task, process it
+ Some(x) => x,
+ // We received a signal that no more tasks will ever be sent
+ // because the sending side was dropped. Exit now.
+ None => break,
+ },
+ _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => {
+ if *stop_signal.borrow() {
+ // Nothing has been going on for 5 secs, and we are shutting
+ // down. Exit now.
+ break;
+ } else {
+ // Nothing is going on but we don't want to exit.
+ continue;
+ }
+ }
+ }
+ };
+ if cancellable && *stop_signal.borrow() {
+ continue;
+ }
+ if let Err(e) = job.await {
+ error!("Job failed: {}", e)
+ }
+ }
+ info!("Background worker {} exiting", i);
+ }))
+ .unwrap();
}
+
+ let bgrunner = Arc::new(Self {
+ stop_signal,
+ queue_in,
+ worker_in,
+ });
+ (bgrunner, await_all_done)
}
+ // Spawn a task to be run in background
pub fn spawn<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
{
let boxed: Job = Box::pin(job);
- let _: Result<_, _> = self.queue_in.clone().send((boxed, false));
- self.job_notify.notify();
+ self.queue_in
+ .send((boxed, false))
+ .map_err(|_| "could not put job in queue")
+ .unwrap();
}
pub fn spawn_cancellable<T>(&self, job: T)
@@ -69,56 +120,30 @@ impl BackgroundRunner {
T: Future<Output = JobOutput> + Send + 'static,
{
let boxed: Job = Box::pin(job);
- let _: Result<_, _> = self.queue_in.clone().send((boxed, true));
- self.job_notify.notify();
+ self.queue_in
+ .send((boxed, true))
+ .map_err(|_| "could not put job in queue")
+ .unwrap();
}
- pub async fn spawn_worker<F, T>(&self, name: String, worker: F)
+ pub fn spawn_worker<F, T>(&self, name: String, worker: F)
where
F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
- T: Future<Output = JobOutput> + Send + 'static,
+ T: Future<Output = ()> + Send + 'static,
{
- let mut workers = self.workers.lock().await;
let stop_signal = self.stop_signal.clone();
- workers.push(tokio::spawn(async move {
- if let Err(e) = worker(stop_signal).await {
- error!("Worker stopped with error: {}, error: {}", name, e);
- } else {
- info!("Worker exited successfully: {}", name);
- }
- }));
- }
-
- async fn runner(self: Arc<Self>, i: usize) {
- let mut stop_signal = self.stop_signal.clone();
- loop {
- let must_exit: bool = *stop_signal.borrow();
- if let Some(job) = self.dequeue_job(must_exit).await {
- if let Err(e) = job.await {
- error!("Job failed: {}", e)
- }
- } else {
- if must_exit {
- info!("Background runner {} exiting", i);
- return;
- }
- select! {
- _ = self.job_notify.notified().fuse() => (),
- _ = stop_signal.recv().fuse() => (),
- }
- }
- }
+ let task = tokio::spawn(async move {
+ info!("Worker started: {}", name);
+ worker(stop_signal).await;
+ info!("Worker exited: {}", name);
+ });
+ self.worker_in
+ .send(task)
+ .map_err(|_| "could not put job in queue")
+ .unwrap();
}
+}
- async fn dequeue_job(&self, must_exit: bool) -> Option<Job> {
- let mut queue = self.queue_out.lock().await;
- while let Ok((job, cancellable)) = queue.try_recv() {
- if cancellable && must_exit {
- continue;
- } else {
- return Some(job);
- }
- }
- None
- }
+async fn wait_job(q: &Mutex<mpsc::UnboundedReceiver<(Job, bool)>>) -> Option<(Job, bool)> {
+ q.lock().await.recv().await
}
diff --git a/src/util/data.rs b/src/util/data.rs
index f46454be..cb784730 100644
--- a/src/util/data.rs
+++ b/src/util/data.rs
@@ -2,7 +2,6 @@ use rand::Rng;
use serde::de::{self, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt;
-use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)]
pub struct FixedBytes32([u8; 32]);
@@ -71,6 +70,14 @@ impl FixedBytes32 {
pub fn to_vec(&self) -> Vec<u8> {
self.0.to_vec()
}
+ pub fn try_from(by: &[u8]) -> Option<Self> {
+ if by.len() != 32 {
+ return None;
+ }
+ let mut ret = [0u8; 32];
+ ret.copy_from_slice(by);
+ Some(Self(ret))
+ }
}
pub type UUID = FixedBytes32;
@@ -80,9 +87,9 @@ pub fn sha256sum(data: &[u8]) -> Hash {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
- hasher.input(data);
+ hasher.update(data);
let mut hash = [0u8; 32];
- hash.copy_from_slice(&hasher.result()[..]);
+ hash.copy_from_slice(&hasher.finalize()[..]);
hash.into()
}
@@ -111,13 +118,6 @@ pub fn gen_uuid() -> UUID {
rand::thread_rng().gen::<[u8; 32]>().into()
}
-pub fn now_msec() -> u64 {
- SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("Fix your clock :o")
- .as_millis() as u64
-}
-
// RMP serialization with names of fields and variants
pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
diff --git a/src/util/error.rs b/src/util/error.rs
index dbf71ac1..a9bf0824 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -8,16 +8,22 @@ use crate::data::*;
pub enum RPCError {
#[error(display = "Node is down: {:?}.", _0)]
NodeDown(UUID),
+
#[error(display = "Timeout: {}", _0)]
- Timeout(#[error(source)] tokio::time::Elapsed),
+ Timeout(#[error(source)] tokio::time::error::Elapsed),
+
#[error(display = "HTTP error: {}", _0)]
HTTP(#[error(source)] http::Error),
+
#[error(display = "Hyper error: {}", _0)]
Hyper(#[error(source)] hyper::Error),
+
#[error(display = "Messagepack encode error: {}", _0)]
RMPEncode(#[error(source)] rmp_serde::encode::Error),
+
#[error(display = "Messagepack decode error: {}", _0)]
RMPDecode(#[error(source)] rmp_serde::decode::Error),
+
#[error(display = "Too many errors: {:?}", _0)]
TooManyErrors(Vec<String>),
}
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 0bf09bf6..e544a872 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -5,3 +5,4 @@ pub mod background;
pub mod config;
pub mod data;
pub mod error;
+pub mod time;
diff --git a/src/util/time.rs b/src/util/time.rs
new file mode 100644
index 00000000..148860e0
--- /dev/null
+++ b/src/util/time.rs
@@ -0,0 +1,16 @@
+use chrono::{SecondsFormat, TimeZone, Utc};
+use std::time::{SystemTime, UNIX_EPOCH};
+
+pub fn now_msec() -> u64 {
+ SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("Fix your clock :o")
+ .as_millis() as u64
+}
+
+pub fn msec_to_rfc3339(msecs: u64) -> String {
+ let secs = msecs as i64 / 1000;
+ let nanos = (msecs as i64 % 1000) as u32 * 1_000_000;
+ let timestamp = Utc.timestamp(secs, nanos);
+ timestamp.to_rfc3339_opts(SecondsFormat::Secs, true)
+}
diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml
index 5cc8683c..9aabfe81 100644
--- a/src/web/Cargo.toml
+++ b/src/web/Cargo.toml
@@ -18,11 +18,10 @@ garage_table = { version = "0.1.1", path = "../table" }
garage_model = { version = "0.1.1", path = "../model" }
garage_api = { version = "0.1.1", path = "../api" }
-err-derive = "0.2.3"
+err-derive = "0.3"
log = "0.4"
futures = "0.3"
http = "0.2"
-hyper = "0.13"
+hyper = "0.14"
percent-encoding = "2.1.0"
-roxmltree = "0.11"
idna = "0.2"