aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/admin/api_server.rs7
-rw-r--r--src/api/admin/bucket.rs96
-rw-r--r--src/api/admin/router.rs8
-rw-r--r--src/api/k2v/index.rs2
-rw-r--r--src/api/s3/api_server.rs4
-rw-r--r--src/api/s3/post_object.rs15
-rw-r--r--src/api/s3/put.rs129
-rw-r--r--src/db/lib.rs6
-rw-r--r--src/db/lmdb_adapter.rs8
-rw-r--r--src/db/sled_adapter.rs6
-rw-r--r--src/db/sqlite_adapter.rs10
-rw-r--r--src/garage/Cargo.toml1
-rw-r--r--src/garage/admin.rs82
-rw-r--r--src/garage/cli/cmd.rs8
-rw-r--r--src/garage/cli/structs.rs47
-rw-r--r--src/garage/cli/util.rs47
-rw-r--r--src/garage/main.rs18
-rw-r--r--src/garage/repair/mod.rs2
-rw-r--r--src/garage/repair/offline.rs55
-rw-r--r--src/garage/repair/online.rs (renamed from src/garage/repair.rs)4
-rw-r--r--src/garage/server.rs50
-rw-r--r--src/model/bucket_table.rs19
-rw-r--r--src/model/garage.rs67
-rw-r--r--src/model/index_counter.rs250
-rw-r--r--src/model/k2v/counter_table.rs20
-rw-r--r--src/model/k2v/item_table.rs102
-rw-r--r--src/model/k2v/mod.rs1
-rw-r--r--src/model/migrate.rs1
-rw-r--r--src/model/s3/object_table.rs61
29 files changed, 854 insertions, 272 deletions
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index 57e3e5cf..c3b16715 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -156,12 +156,7 @@ impl ApiHandler for AdminApiServer {
}
Endpoint::CreateBucket => handle_create_bucket(&self.garage, req).await,
Endpoint::DeleteBucket { id } => handle_delete_bucket(&self.garage, id).await,
- Endpoint::PutBucketWebsite { id } => {
- handle_put_bucket_website(&self.garage, id, req).await
- }
- Endpoint::DeleteBucketWebsite { id } => {
- handle_delete_bucket_website(&self.garage, id).await
- }
+ Endpoint::UpdateBucket { id } => handle_update_bucket(&self.garage, id, req).await,
// Bucket-key permissions
Endpoint::BucketAllowKey => {
handle_bucket_change_key_perm(&self.garage, req, true).await
diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs
index 7f9a813f..ac8a8a40 100644
--- a/src/api/admin/bucket.rs
+++ b/src/api/admin/bucket.rs
@@ -14,6 +14,7 @@ use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::*;
use garage_model::garage::Garage;
use garage_model::permission::*;
+use garage_model::s3::object_table::*;
use crate::admin::error::*;
use crate::admin::key::ApiBucketKeyPerm;
@@ -77,6 +78,13 @@ struct BucketLocalAlias {
alias: String,
}
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct ApiBucketQuotas {
+ max_size: Option<u64>,
+ max_objects: Option<u64>,
+}
+
pub async fn handle_get_bucket_info(
garage: &Arc<Garage>,
id: Option<String>,
@@ -108,6 +116,14 @@ async fn bucket_info_results(
.get_existing_bucket(bucket_id)
.await?;
+ let counters = garage
+ .object_counter_table
+ .table
+ .get(&bucket_id, &EmptyKey)
+ .await?
+ .map(|x| x.filtered_values(&garage.system.ring.borrow()))
+ .unwrap_or_default();
+
let mut relevant_keys = HashMap::new();
for (k, _) in bucket
.state
@@ -148,6 +164,7 @@ async fn bucket_info_results(
let state = bucket.state.as_option().unwrap();
+ let quotas = state.quotas.get();
let res =
GetBucketInfoResult {
id: hex::encode(&bucket.id),
@@ -191,6 +208,16 @@ async fn bucket_info_results(
}
})
.collect::<Vec<_>>(),
+ objects: counters.get(OBJECTS).cloned().unwrap_or_default(),
+ bytes: counters.get(BYTES).cloned().unwrap_or_default(),
+ unfinshed_uploads: counters
+ .get(UNFINISHED_UPLOADS)
+ .cloned()
+ .unwrap_or_default(),
+ quotas: ApiBucketQuotas {
+ max_size: quotas.max_size,
+ max_objects: quotas.max_objects,
+ },
};
Ok(json_ok_response(&res)?)
@@ -205,6 +232,10 @@ struct GetBucketInfoResult {
#[serde(default)]
website_config: Option<GetBucketInfoWebsiteResult>,
keys: Vec<GetBucketInfoKey>,
+ objects: i64,
+ bytes: i64,
+ unfinshed_uploads: i64,
+ quotas: ApiBucketQuotas,
}
#[derive(Serialize)]
@@ -363,14 +394,12 @@ pub async fn handle_delete_bucket(
.body(Body::empty())?)
}
-// ---- BUCKET WEBSITE CONFIGURATION ----
-
-pub async fn handle_put_bucket_website(
+pub async fn handle_update_bucket(
garage: &Arc<Garage>,
id: String,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
- let req = parse_json_body::<PutBucketWebsiteRequest>(req).await?;
+ let req = parse_json_body::<UpdateBucketRequest>(req).await?;
let bucket_id = parse_bucket_id(&id)?;
let mut bucket = garage
@@ -379,10 +408,31 @@ pub async fn handle_put_bucket_website(
.await?;
let state = bucket.state.as_option_mut().unwrap();
- state.website_config.update(Some(WebsiteConfig {
- index_document: req.index_document,
- error_document: req.error_document,
- }));
+
+ if let Some(wa) = req.website_access {
+ if wa.enabled {
+ state.website_config.update(Some(WebsiteConfig {
+ index_document: wa.index_document.ok_or_bad_request(
+ "Please specify indexDocument when enabling website access.",
+ )?,
+ error_document: wa.error_document,
+ }));
+ } else {
+ if wa.index_document.is_some() || wa.error_document.is_some() {
+ return Err(Error::bad_request(
+ "Cannot specify indexDocument or errorDocument when disabling website access.",
+ ));
+ }
+ state.website_config.update(None);
+ }
+ }
+
+ if let Some(q) = req.quotas {
+ state.quotas.update(BucketQuotas {
+ max_size: q.max_size,
+ max_objects: q.max_objects,
+ });
+ }
garage.bucket_table.insert(&bucket).await?;
@@ -391,29 +441,17 @@ pub async fn handle_put_bucket_website(
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
-struct PutBucketWebsiteRequest {
- index_document: String,
- #[serde(default)]
- error_document: Option<String>,
+struct UpdateBucketRequest {
+ website_access: Option<UpdateBucketWebsiteAccess>,
+ quotas: Option<ApiBucketQuotas>,
}
-pub async fn handle_delete_bucket_website(
- garage: &Arc<Garage>,
- id: String,
-) -> Result<Response<Body>, Error> {
- let bucket_id = parse_bucket_id(&id)?;
-
- let mut bucket = garage
- .bucket_helper()
- .get_existing_bucket(bucket_id)
- .await?;
-
- let state = bucket.state.as_option_mut().unwrap();
- state.website_config.update(None);
-
- garage.bucket_table.insert(&bucket).await?;
-
- bucket_info_results(garage, bucket_id).await
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct UpdateBucketWebsiteAccess {
+ enabled: bool,
+ index_document: Option<String>,
+ error_document: Option<String>,
}
// ---- BUCKET/KEY PERMISSIONS ----
diff --git a/src/api/admin/router.rs b/src/api/admin/router.rs
index 93639873..3eee8b67 100644
--- a/src/api/admin/router.rs
+++ b/src/api/admin/router.rs
@@ -48,10 +48,7 @@ pub enum Endpoint {
DeleteBucket {
id: String,
},
- PutBucketWebsite {
- id: String,
- },
- DeleteBucketWebsite {
+ UpdateBucket {
id: String,
},
// Bucket-Key Permissions
@@ -113,8 +110,7 @@ impl Endpoint {
GET "/v0/bucket" => ListBuckets,
POST "/v0/bucket" => CreateBucket,
DELETE "/v0/bucket" if id => DeleteBucket (query::id),
- PUT "/v0/bucket/website" if id => PutBucketWebsite (query::id),
- DELETE "/v0/bucket/website" if id => DeleteBucketWebsite (query::id),
+ PUT "/v0/bucket" if id => UpdateBucket (query::id),
// Bucket-key permissions
POST "/v0/bucket/allow" => BucketAllowKey,
POST "/v0/bucket/deny" => BucketDenyKey,
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
index d5db906d..210950bf 100644
--- a/src/api/k2v/index.rs
+++ b/src/api/k2v/index.rs
@@ -10,7 +10,7 @@ use garage_rpc::ring::Ring;
use garage_table::util::*;
use garage_model::garage::Garage;
-use garage_model::k2v::counter_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
+use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
use crate::k2v::error::*;
use crate::k2v::range::read_range;
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
index d1d6288c..78dfeeac 100644
--- a/src/api/s3/api_server.rs
+++ b/src/api/s3/api_server.rs
@@ -212,7 +212,7 @@ impl ApiHandler for S3ApiServer {
.await
}
Endpoint::PutObject { key } => {
- handle_put(garage, req, bucket_id, &key, content_sha256).await
+ handle_put(garage, req, &bucket, &key, content_sha256).await
}
Endpoint::AbortMultipartUpload { key, upload_id } => {
handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
@@ -226,7 +226,7 @@ impl ApiHandler for S3ApiServer {
garage,
req,
&bucket_name,
- bucket_id,
+ &bucket,
&key,
&upload_id,
content_sha256,
diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs
index dc640f43..d063faa4 100644
--- a/src/api/s3/post_object.rs
+++ b/src/api/s3/post_object.rs
@@ -22,7 +22,7 @@ use crate::signature::payload::{parse_date, verify_v4};
pub async fn handle_post_object(
garage: Arc<Garage>,
req: Request<Body>,
- bucket: String,
+ bucket_name: String,
) -> Result<Response<Body>, Error> {
let boundary = req
.headers()
@@ -126,13 +126,18 @@ pub async fn handle_post_object(
let bucket_id = garage
.bucket_helper()
- .resolve_bucket(&bucket, &api_key)
+ .resolve_bucket(&bucket_name, &api_key)
.await?;
if !api_key.allow_write(&bucket_id) {
return Err(Error::forbidden("Operation is not allowed for this key."));
}
+ let bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
let decoded_policy = base64::decode(&policy).ok_or_bad_request("Invalid policy")?;
let decoded_policy: Policy =
serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?;
@@ -227,7 +232,7 @@ pub async fn handle_post_object(
garage,
headers,
StreamLimiter::new(stream, conditions.content_length),
- bucket_id,
+ &bucket,
&key,
None,
None,
@@ -244,7 +249,7 @@ pub async fn handle_post_object(
{
target
.query_pairs_mut()
- .append_pair("bucket", &bucket)
+ .append_pair("bucket", &bucket_name)
.append_pair("key", &key)
.append_pair("etag", &etag);
let target = target.to_string();
@@ -289,7 +294,7 @@ pub async fn handle_post_object(
let xml = s3_xml::PostObject {
xmlns: (),
location: s3_xml::Value(location),
- bucket: s3_xml::Value(bucket),
+ bucket: s3_xml::Value(bucket_name),
key: s3_xml::Value(key),
etag: s3_xml::Value(etag),
};
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 8b06ef3f..9ef37421 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -1,4 +1,4 @@
-use std::collections::{BTreeMap, BTreeSet, VecDeque};
+use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::sync::Arc;
use futures::prelude::*;
@@ -14,7 +14,9 @@ use garage_util::error::Error as GarageError;
use garage_util::time::*;
use garage_block::manager::INLINE_THRESHOLD;
+use garage_model::bucket_table::Bucket;
use garage_model::garage::Garage;
+use garage_model::index_counter::CountedItem;
use garage_model::s3::block_ref_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
@@ -26,7 +28,7 @@ use crate::signature::verify_signed_content;
pub async fn handle_put(
garage: Arc<Garage>,
req: Request<Body>,
- bucket_id: Uuid,
+ bucket: &Bucket,
key: &str,
content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
@@ -46,7 +48,7 @@ pub async fn handle_put(
garage,
headers,
body,
- bucket_id,
+ bucket,
key,
content_md5,
content_sha256,
@@ -59,7 +61,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage: Arc<Garage>,
headers: ObjectVersionHeaders,
body: S,
- bucket_id: Uuid,
+ bucket: &Bucket,
key: &str,
content_md5: Option<String>,
content_sha256: Option<FixedBytes32>,
@@ -80,6 +82,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let data_md5sum_hex = hex::encode(data_md5sum);
let data_sha256sum = sha256sum(&first_block[..]);
+ let size = first_block.len() as u64;
ensure_checksum_matches(
data_md5sum.as_slice(),
@@ -88,20 +91,22 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
content_sha256,
)?;
+ check_quotas(&garage, bucket, key, size).await?;
+
let object_version = ObjectVersion {
uuid: version_uuid,
timestamp: version_timestamp,
state: ObjectVersionState::Complete(ObjectVersionData::Inline(
ObjectVersionMeta {
headers,
- size: first_block.len() as u64,
+ size,
etag: data_md5sum_hex.clone(),
},
first_block,
)),
};
- let object = Object::new(bucket_id, key.into(), vec![object_version]);
+ let object = Object::new(bucket.id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
return Ok((version_uuid, data_md5sum_hex));
@@ -114,36 +119,42 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
timestamp: version_timestamp,
state: ObjectVersionState::Uploading(headers.clone()),
};
- let object = Object::new(bucket_id, key.into(), vec![object_version.clone()]);
+ let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
// Initialize corresponding entry in version table
// Write this entry now, even with empty block list,
// to prevent block_ref entries from being deleted (they can be deleted
// if the reference a version that isn't found in the version table)
- let version = Version::new(version_uuid, bucket_id, key.into(), false);
+ let version = Version::new(version_uuid, bucket.id, key.into(), false);
garage.version_table.insert(&version).await?;
// Transfer data and verify checksum
let first_block_hash = blake2sum(&first_block[..]);
- let tx_result = read_and_put_blocks(
- &garage,
- &version,
- 1,
- first_block,
- first_block_hash,
- &mut chunker,
- )
- .await
- .and_then(|(total_size, data_md5sum, data_sha256sum)| {
+
+ let tx_result = (|| async {
+ let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks(
+ &garage,
+ &version,
+ 1,
+ first_block,
+ first_block_hash,
+ &mut chunker,
+ )
+ .await?;
+
ensure_checksum_matches(
data_md5sum.as_slice(),
data_sha256sum,
content_md5.as_deref(),
content_sha256,
- )
- .map(|()| (total_size, data_md5sum))
- });
+ )?;
+
+ check_quotas(&garage, bucket, key, total_size).await?;
+
+ Ok((total_size, data_md5sum))
+ })()
+ .await;
// If something went wrong, clean up
let (total_size, md5sum_arr) = match tx_result {
@@ -151,7 +162,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
Err(e) => {
// Mark object as aborted, this will free the blocks further down
object_version.state = ObjectVersionState::Aborted;
- let object = Object::new(bucket_id, key.into(), vec![object_version.clone()]);
+ let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
return Err(e);
}
@@ -167,7 +178,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
},
first_block_hash,
));
- let object = Object::new(bucket_id, key.into(), vec![object_version]);
+ let object = Object::new(bucket.id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
Ok((version_uuid, md5sum_hex))
@@ -200,6 +211,64 @@ fn ensure_checksum_matches(
Ok(())
}
+/// Check that inserting this object with this size doesn't exceed bucket quotas
+async fn check_quotas(
+ garage: &Arc<Garage>,
+ bucket: &Bucket,
+ key: &str,
+ size: u64,
+) -> Result<(), Error> {
+ let quotas = bucket.state.as_option().unwrap().quotas.get();
+ if quotas.max_objects.is_none() && quotas.max_size.is_none() {
+ return Ok(());
+ };
+
+ let key = key.to_string();
+ let (prev_object, counters) = futures::try_join!(
+ garage.object_table.get(&bucket.id, &key),
+ garage.object_counter_table.table.get(&bucket.id, &EmptyKey),
+ )?;
+
+ let counters = counters
+ .map(|x| x.filtered_values(&garage.system.ring.borrow()))
+ .unwrap_or_default();
+
+ let (prev_cnt_obj, prev_cnt_size) = match prev_object {
+ Some(o) => {
+ let prev_cnt = o.counts().into_iter().collect::<HashMap<_, _>>();
+ (
+ prev_cnt.get(OBJECTS).cloned().unwrap_or_default(),
+ prev_cnt.get(BYTES).cloned().unwrap_or_default(),
+ )
+ }
+ None => (0, 0),
+ };
+ let cnt_obj_diff = 1 - prev_cnt_obj;
+ let cnt_size_diff = size as i64 - prev_cnt_size;
+
+ if let Some(mo) = quotas.max_objects {
+ let current_objects = counters.get(OBJECTS).cloned().unwrap_or_default();
+ if cnt_obj_diff > 0 && current_objects + cnt_obj_diff > mo as i64 {
+ return Err(Error::forbidden(format!(
+ "Object quota is reached, maximum objects for this bucket: {}",
+ mo
+ )));
+ }
+ }
+
+ if let Some(ms) = quotas.max_size {
+ let current_size = counters.get(BYTES).cloned().unwrap_or_default();
+ if cnt_size_diff > 0 && current_size + cnt_size_diff > ms as i64 {
+ return Err(Error::forbidden(format!(
+ "Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes.",
+ ms, current_size, size
+ )));
+ }
+ }
+
+ Ok(())
+}
+
async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage: &Garage,
version: &Version,
@@ -473,7 +542,7 @@ pub async fn handle_complete_multipart_upload(
garage: Arc<Garage>,
req: Request<Body>,
bucket_name: &str,
- bucket_id: Uuid,
+ bucket: &Bucket,
key: &str,
upload_id: &str,
content_sha256: Option<Hash>,
@@ -497,7 +566,7 @@ pub async fn handle_complete_multipart_upload(
// Get object and version
let key = key.to_string();
let (object, version) = futures::try_join!(
- garage.object_table.get(&bucket_id, &key),
+ garage.object_table.get(&bucket.id, &key),
garage.version_table.get(&version_uuid, &EmptyKey),
)?;
@@ -590,6 +659,14 @@ pub async fn handle_complete_multipart_upload(
// Calculate total size of final object
let total_size = version.blocks.items().iter().map(|x| x.1.size).sum();
+ if let Err(e) = check_quotas(&garage, bucket, &key, total_size).await {
+ object_version.state = ObjectVersionState::Aborted;
+ let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
+ garage.object_table.insert(&final_object).await?;
+
+ return Err(e);
+ }
+
// Write final object version
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta {
@@ -600,7 +677,7 @@ pub async fn handle_complete_multipart_upload(
version.blocks.items()[0].1.hash,
));
- let final_object = Object::new(bucket_id, key.clone(), vec![object_version]);
+ let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
// Send response saying ok we're done
diff --git a/src/db/lib.rs b/src/db/lib.rs
index e9d3ea18..8188c715 100644
--- a/src/db/lib.rs
+++ b/src/db/lib.rs
@@ -197,6 +197,11 @@ impl Tree {
pub fn remove<T: AsRef<[u8]>>(&self, key: T) -> Result<Option<Value>> {
self.0.remove(self.1, key.as_ref())
}
+ /// Clears all values from the tree
+ #[inline]
+ pub fn clear(&self) -> Result<()> {
+ self.0.clear(self.1)
+ }
#[inline]
pub fn iter(&self) -> Result<ValueIter<'_>> {
@@ -311,6 +316,7 @@ pub(crate) trait IDb: Send + Sync {
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>>;
fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>;
+ fn clear(&self, tree: usize) -> Result<()>;
fn iter(&self, tree: usize) -> Result<ValueIter<'_>>;
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>>;
diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs
index 62fcc3e6..fdb254c6 100644
--- a/src/db/lmdb_adapter.rs
+++ b/src/db/lmdb_adapter.rs
@@ -139,6 +139,14 @@ impl IDb for LmdbDb {
Ok(old_val)
}
+ fn clear(&self, tree: usize) -> Result<()> {
+ let tree = self.get_tree(tree)?;
+ let mut tx = self.db.write_txn()?;
+ tree.clear(&mut tx)?;
+ tx.commit()?;
+ Ok(())
+ }
+
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?;
let tx = self.db.read_txn()?;
diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs
index 982f8d82..cf61867d 100644
--- a/src/db/sled_adapter.rs
+++ b/src/db/sled_adapter.rs
@@ -113,6 +113,12 @@ impl IDb for SledDb {
Ok(old_val.map(|x| x.to_vec()))
}
+ fn clear(&self, tree: usize) -> Result<()> {
+ let tree = self.get_tree(tree)?;
+ tree.clear()?;
+ Ok(())
+ }
+
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?;
Ok(Box::new(tree.iter().map(|v| {
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs
index 14bf35ff..68d96ca0 100644
--- a/src/db/sqlite_adapter.rs
+++ b/src/db/sqlite_adapter.rs
@@ -182,6 +182,16 @@ impl IDb for SqliteDb {
Ok(old_val)
}
+ fn clear(&self, tree: usize) -> Result<()> {
+ trace!("clear {}: lock db", tree);
+ let this = self.0.lock().unwrap();
+ trace!("clear {}: lock acquired", tree);
+
+ let tree = this.get_tree(tree)?;
+ this.db.execute(&format!("DELETE FROM {}", tree), [])?;
+ Ok(())
+ }
+
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
trace!("iter {}: lock db", tree);
let this = self.0.lock().unwrap();
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index eb643160..640e6975 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -30,6 +30,7 @@ garage_util = { version = "0.7.0", path = "../util" }
garage_web = { version = "0.7.0", path = "../web" }
bytes = "1.0"
+bytesize = "1.1"
hex = "0.4"
tracing = { version = "0.1.30", features = ["log-always"] }
pretty_env_logger = "0.4"
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index c662aa00..48914655 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -24,11 +24,12 @@ use garage_model::migrate::Migrate;
use garage_model::permission::*;
use crate::cli::*;
-use crate::repair::Repair;
+use crate::repair::online::OnlineRepair;
pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
#[derive(Debug, Serialize, Deserialize)]
+#[allow(clippy::large_enum_variant)]
pub enum AdminRpc {
BucketOperation(BucketOperation),
KeyOperation(KeyOperation),
@@ -39,7 +40,11 @@ pub enum AdminRpc {
// Replies
Ok(String),
BucketList(Vec<Bucket>),
- BucketInfo(Bucket, HashMap<String, Key>),
+ BucketInfo {
+ bucket: Bucket,
+ relevant_keys: HashMap<String, Key>,
+ counters: HashMap<String, i64>,
+ },
KeyList(Vec<(String, String)>),
KeyInfo(Key, HashMap<Uuid, Bucket>),
}
@@ -72,6 +77,7 @@ impl AdminRpcHandler {
BucketOperation::Allow(query) => self.handle_bucket_allow(query).await,
BucketOperation::Deny(query) => self.handle_bucket_deny(query).await,
BucketOperation::Website(query) => self.handle_bucket_website(query).await,
+ BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await,
}
}
@@ -87,6 +93,7 @@ impl AdminRpcHandler {
EnumerationOrder::Forward,
)
.await?;
+
Ok(AdminRpc::BucketList(buckets))
}
@@ -104,6 +111,15 @@ impl AdminRpcHandler {
.get_existing_bucket(bucket_id)
.await?;
+ let counters = self
+ .garage
+ .object_counter_table
+ .table
+ .get(&bucket_id, &EmptyKey)
+ .await?
+ .map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
+ .unwrap_or_default();
+
let mut relevant_keys = HashMap::new();
for (k, _) in bucket
.state
@@ -139,7 +155,11 @@ impl AdminRpcHandler {
}
}
- Ok(AdminRpc::BucketInfo(bucket, relevant_keys))
+ Ok(AdminRpc::BucketInfo {
+ bucket,
+ relevant_keys,
+ counters,
+ })
}
#[allow(clippy::ptr_arg)]
@@ -431,6 +451,60 @@ impl AdminRpcHandler {
Ok(AdminRpc::Ok(msg))
}
+ async fn handle_bucket_set_quotas(&self, query: &SetQuotasOpt) -> Result<AdminRpc, Error> {
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ let mut bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let bucket_state = bucket.state.as_option_mut().unwrap();
+
+ if query.max_size.is_none() && query.max_objects.is_none() {
+ return Err(Error::BadRequest(
+ "You must specify either --max-size or --max-objects (or both) for this command to do something.".to_string(),
+ ));
+ }
+
+ let mut quotas = bucket_state.quotas.get().clone();
+
+ match query.max_size.as_ref().map(String::as_ref) {
+ Some("none") => quotas.max_size = None,
+ Some(v) => {
+ let bs = v
+ .parse::<bytesize::ByteSize>()
+ .ok_or_bad_request(format!("Invalid size specified: {}", v))?;
+ quotas.max_size = Some(bs.as_u64());
+ }
+ _ => (),
+ }
+
+ match query.max_objects.as_ref().map(String::as_ref) {
+ Some("none") => quotas.max_objects = None,
+ Some(v) => {
+ let mo = v
+ .parse::<u64>()
+ .ok_or_bad_request(format!("Invalid number specified: {}", v))?;
+ quotas.max_objects = Some(mo);
+ }
+ _ => (),
+ }
+
+ bucket_state.quotas.update(quotas);
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Quotas updated for {}",
+ &query.bucket
+ )))
+ }
+
async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
match cmd {
KeyOperation::List => self.handle_list_keys().await,
@@ -619,7 +693,7 @@ impl AdminRpcHandler {
)))
}
} else {
- let repair = Repair {
+ let repair = OnlineRepair {
garage: self.garage.clone(),
};
self.garage
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index b2dd8f14..3a0bd956 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -169,8 +169,12 @@ pub async fn cmd_admin(
AdminRpc::BucketList(bl) => {
print_bucket_list(bl);
}
- AdminRpc::BucketInfo(bucket, rk) => {
- print_bucket_info(&bucket, &rk);
+ AdminRpc::BucketInfo {
+ bucket,
+ relevant_keys,
+ counters,
+ } => {
+ print_bucket_info(&bucket, &relevant_keys, &counters);
}
AdminRpc::KeyList(kl) => {
print_key_list(kl);
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index a0c49aeb..4f2efe19 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -33,10 +33,15 @@ pub enum Command {
#[structopt(name = "migrate")]
Migrate(MigrateOpt),
- /// Start repair of node data
+ /// Start repair of node data on remote node
#[structopt(name = "repair")]
Repair(RepairOpt),
+ /// Offline reparation of node data (these repairs must be run offline
+ /// directly on the server node)
+ #[structopt(name = "offline-repair")]
+ OfflineRepair(OfflineRepairOpt),
+
/// Gather node statistics
#[structopt(name = "stats")]
Stats(StatsOpt),
@@ -175,6 +180,10 @@ pub enum BucketOperation {
/// Expose as website or not
#[structopt(name = "website")]
Website(WebsiteOpt),
+
+ /// Set the quotas for this bucket
+ #[structopt(name = "set-quotas")]
+ SetQuotas(SetQuotasOpt),
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
@@ -262,6 +271,21 @@ pub struct PermBucketOpt {
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub struct SetQuotasOpt {
+ /// Bucket name
+ pub bucket: String,
+
+ /// Set a maximum size for the bucket (specify a size e.g. in MiB or GiB,
+ /// or `none` for no size restriction)
+ #[structopt(long = "max-size")]
+ pub max_size: Option<String>,
+
+ /// Set a maximum number of objects for the bucket (or `none` for no restriction)
+ #[structopt(long = "max-objects")]
+ pub max_objects: Option<String>,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub enum KeyOperation {
/// List keys
#[structopt(name = "list")]
@@ -406,6 +430,27 @@ pub enum RepairWhat {
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+pub struct OfflineRepairOpt {
+ /// Confirm the launch of the repair operation
+ #[structopt(long = "yes")]
+ pub yes: bool,
+
+ #[structopt(subcommand)]
+ pub what: OfflineRepairWhat,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum OfflineRepairWhat {
+ /// Repair K2V item counters
+ #[cfg(feature = "k2v")]
+ #[structopt(name = "k2v_item_counters")]
+ K2VItemCounters,
+ /// Repair object counters
+ #[structopt(name = "object_counters")]
+ ObjectCounters,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct StatsOpt {
/// Gather statistics from all nodes
#[structopt(short = "a", long = "all-nodes")]
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 6d73be3a..329e8a3e 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -7,6 +7,7 @@ use garage_util::formater::format_table;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
+use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
pub fn print_bucket_list(bl: Vec<Bucket>) {
println!("List of buckets:");
@@ -29,11 +30,12 @@ pub fn print_bucket_list(bl: Vec<Bucket>) {
[((k, n), _, _)] => format!("{}:{}", k, n),
s => format!("[{} local aliases]", s.len()),
};
+
table.push(format!(
"\t{}\t{}\t{}",
aliases.join(","),
local_aliases_n,
- hex::encode(bucket.id)
+ hex::encode(bucket.id),
));
}
format_table(table);
@@ -121,7 +123,11 @@ pub fn print_key_info(key: &Key, relevant_buckets: &HashMap<Uuid, Bucket>) {
}
}
-pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>) {
+pub fn print_bucket_info(
+ bucket: &Bucket,
+ relevant_keys: &HashMap<String, Key>,
+ counters: &HashMap<String, i64>,
+) {
let key_name = |k| {
relevant_keys
.get(k)
@@ -133,7 +139,42 @@ pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>)
match &bucket.state {
Deletable::Deleted => println!("Bucket is deleted."),
Deletable::Present(p) => {
- println!("Website access: {}", p.website_config.get().is_some());
+ let size =
+ bytesize::ByteSize::b(counters.get(BYTES).cloned().unwrap_or_default() as u64);
+ println!(
+ "\nSize: {} ({})",
+ size.to_string_as(true),
+ size.to_string_as(false)
+ );
+ println!(
+ "Objects: {}",
+ counters.get(OBJECTS).cloned().unwrap_or_default()
+ );
+ println!(
+ "Unfinished multipart uploads: {}",
+ counters
+ .get(UNFINISHED_UPLOADS)
+ .cloned()
+ .unwrap_or_default()
+ );
+
+ println!("\nWebsite access: {}", p.website_config.get().is_some());
+
+ let quotas = p.quotas.get();
+ if quotas.max_size.is_some() || quotas.max_objects.is_some() {
+ println!("\nQuotas:");
+ if let Some(ms) = quotas.max_size {
+ let ms = bytesize::ByteSize::b(ms);
+ println!(
+ " maximum size: {} ({})",
+ ms.to_string_as(true),
+ ms.to_string_as(false)
+ );
+ }
+ if let Some(mo) = quotas.max_objects {
+ println!(" maximum number of objects: {}", mo);
+ }
+ }
println!("\nGlobal aliases:");
for (alias, _, active) in p.aliases.items().iter() {
diff --git a/src/garage/main.rs b/src/garage/main.rs
index bd09b6ea..3fa5c3c0 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -61,17 +61,17 @@ async fn main() {
pretty_env_logger::init();
sodiumoxide::init().expect("Unable to init sodiumoxide");
- let opt = Opt::from_args();
+ // Abort on panic (same behavior as in Go)
+ std::panic::set_hook(Box::new(|panic_info| {
+ error!("{}", panic_info.to_string());
+ std::process::abort();
+ }));
+ let opt = Opt::from_args();
let res = match opt.cmd {
- Command::Server => {
- // Abort on panic (same behavior as in Go)
- std::panic::set_hook(Box::new(|panic_info| {
- error!("{}", panic_info.to_string());
- std::process::abort();
- }));
-
- server::run_server(opt.config_file).await
+ Command::Server => server::run_server(opt.config_file).await,
+ Command::OfflineRepair(repair_opt) => {
+ repair::offline::offline_repair(opt.config_file, repair_opt).await
}
Command::Node(NodeOperation::NodeId(node_id_opt)) => {
node_id_command(opt.config_file, node_id_opt.quiet)
diff --git a/src/garage/repair/mod.rs b/src/garage/repair/mod.rs
new file mode 100644
index 00000000..4699ace5
--- /dev/null
+++ b/src/garage/repair/mod.rs
@@ -0,0 +1,2 @@
+pub mod offline;
+pub mod online;
diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs
new file mode 100644
index 00000000..7760a8bd
--- /dev/null
+++ b/src/garage/repair/offline.rs
@@ -0,0 +1,55 @@
+use std::path::PathBuf;
+
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::config::*;
+use garage_util::error::*;
+
+use garage_model::garage::Garage;
+
+use crate::cli::structs::*;
+
+pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Result<(), Error> {
+ if !opt.yes {
+ return Err(Error::Message(
+ "Please add the --yes flag to launch repair operation".into(),
+ ));
+ }
+
+ info!("Loading configuration...");
+ let config = read_config(config_file)?;
+
+ info!("Initializing background runner...");
+ let (done_tx, done_rx) = watch::channel(false);
+ let (background, await_background_done) = BackgroundRunner::new(16, done_rx);
+
+ info!("Initializing Garage main data store...");
+ let garage = Garage::new(config.clone(), background)?;
+
+ info!("Launching repair operation...");
+ match opt.what {
+ #[cfg(feature = "k2v")]
+ OfflineRepairWhat::K2VItemCounters => {
+ garage
+ .k2v
+ .counter_table
+ .offline_recount_all(&garage.k2v.item_table)?;
+ }
+ OfflineRepairWhat::ObjectCounters => {
+ garage
+ .object_counter_table
+ .offline_recount_all(&garage.object_table)?;
+ }
+ }
+
+ info!("Repair operation finished, shutting down Garage internals...");
+ done_tx.send(true).unwrap();
+ drop(garage);
+
+ await_background_done.await?;
+
+ info!("Cleaning up...");
+
+ Ok(())
+}
diff --git a/src/garage/repair.rs b/src/garage/repair/online.rs
index 17e14b8b..d6a71742 100644
--- a/src/garage/repair.rs
+++ b/src/garage/repair/online.rs
@@ -11,11 +11,11 @@ use garage_util::error::Error;
use crate::*;
-pub struct Repair {
+pub struct OnlineRepair {
pub garage: Arc<Garage>,
}
-impl Repair {
+impl OnlineRepair {
pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) {
if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
warn!("Repair worker failed with error: {}", e);
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 7aa6185f..6321357a 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -2,8 +2,6 @@ use std::path::PathBuf;
use tokio::sync::watch;
-use garage_db as db;
-
use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
@@ -29,57 +27,14 @@ async fn wait_from(mut chan: watch::Receiver<bool>) {
pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Loading configuration...");
- let config = read_config(config_file).expect("Unable to read config file");
-
- info!("Opening database...");
- let mut db_path = config.metadata_dir.clone();
- std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory");
- let db = match config.db_engine.as_str() {
- "sled" => {
- db_path.push("db");
- info!("Opening Sled database at: {}", db_path.display());
- let db = db::sled_adapter::sled::Config::default()
- .path(&db_path)
- .cache_capacity(config.sled_cache_capacity)
- .flush_every_ms(Some(config.sled_flush_every_ms))
- .open()
- .expect("Unable to open sled DB");
- db::sled_adapter::SledDb::init(db)
- }
- "sqlite" | "sqlite3" | "rusqlite" => {
- db_path.push("db.sqlite");
- info!("Opening Sqlite database at: {}", db_path.display());
- let db = db::sqlite_adapter::rusqlite::Connection::open(db_path)
- .expect("Unable to open sqlite DB");
- db::sqlite_adapter::SqliteDb::init(db)
- }
- "lmdb" | "heed" => {
- db_path.push("db.lmdb");
- info!("Opening LMDB database at: {}", db_path.display());
- std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory");
- let map_size = garage_db::lmdb_adapter::recommended_map_size();
-
- let db = db::lmdb_adapter::heed::EnvOpenOptions::new()
- .max_dbs(100)
- .map_size(map_size)
- .open(&db_path)
- .expect("Unable to open LMDB DB");
- db::lmdb_adapter::LmdbDb::init(db)
- }
- e => {
- return Err(Error::Message(format!(
- "Unsupported DB engine: {} (options: sled, sqlite, lmdb)",
- e
- )));
- }
- };
+ let config = read_config(config_file)?;
info!("Initializing background runner...");
let watch_cancel = netapp::util::watch_ctrl_c();
let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
info!("Initializing Garage main data store...");
- let garage = Garage::new(config.clone(), db, background);
+ let garage = Garage::new(config.clone(), background)?;
info!("Initialize tracing...");
if let Some(export_to) = config.admin.trace_sink {
@@ -89,6 +44,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initialize Admin API server and metrics collector...");
let admin_server = AdminApiServer::new(garage.clone());
+ info!("Launching internal Garage cluster communications...");
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
info!("Create admin RPC handler...");
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index 7c7b9f30..130eb6a6 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
-use garage_table::crdt::Crdt;
+use garage_table::crdt::*;
use garage_table::*;
use garage_util::data::*;
use garage_util::time::*;
@@ -44,6 +44,9 @@ pub struct BucketParams {
pub website_config: crdt::Lww<Option<WebsiteConfig>>,
/// CORS rules
pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>,
+ /// Bucket quotas
+ #[serde(default)]
+ pub quotas: crdt::Lww<BucketQuotas>,
}
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
@@ -62,6 +65,18 @@ pub struct CorsRule {
pub expose_headers: Vec<String>,
}
+#[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+pub struct BucketQuotas {
+ /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket)
+ pub max_size: Option<u64>,
+ /// Maximum number of non-deleted objects in the bucket
+ pub max_objects: Option<u64>,
+}
+
+impl AutoCrdt for BucketQuotas {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
impl BucketParams {
/// Create an empty BucketParams with no authorized keys and no website accesss
pub fn new() -> Self {
@@ -72,6 +87,7 @@ impl BucketParams {
local_aliases: crdt::LwwMap::new(),
website_config: crdt::Lww::new(None),
cors_config: crdt::Lww::new(None),
+ quotas: crdt::Lww::new(BucketQuotas::default()),
}
}
}
@@ -86,6 +102,7 @@ impl Crdt for BucketParams {
self.website_config.merge(&o.website_config);
self.cors_config.merge(&o.cors_config);
+ self.quotas.merge(&o.quotas);
}
}
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 280f3dc7..15769a17 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -6,6 +6,7 @@ use garage_db as db;
use garage_util::background::*;
use garage_util::config::*;
+use garage_util::error::Error;
use garage_rpc::system::System;
@@ -22,12 +23,11 @@ use crate::s3::version_table::*;
use crate::bucket_alias_table::*;
use crate::bucket_table::*;
use crate::helper;
+use crate::index_counter::*;
use crate::key_table::*;
#[cfg(feature = "k2v")]
-use crate::index_counter::*;
-#[cfg(feature = "k2v")]
-use crate::k2v::{counter_table::*, item_table::*, poll::*, rpc::*};
+use crate::k2v::{item_table::*, poll::*, rpc::*};
/// An entire Garage full of data
pub struct Garage {
@@ -52,6 +52,8 @@ pub struct Garage {
/// Table containing S3 objects
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
+ /// Counting table containing object counters
+ pub object_counter_table: Arc<IndexCounter<Object>>,
/// Table containing S3 object versions
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
/// Table containing S3 block references (not blocks themselves)
@@ -66,14 +68,57 @@ pub struct GarageK2V {
/// Table containing K2V items
pub item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
/// Indexing table containing K2V item counters
- pub counter_table: Arc<IndexCounter<K2VCounterTable>>,
+ pub counter_table: Arc<IndexCounter<K2VItem>>,
/// K2V RPC handler
pub rpc: Arc<K2VRpcHandler>,
}
impl Garage {
/// Create and run garage
- pub fn new(config: Config, db: db::Db, background: Arc<BackgroundRunner>) -> Arc<Self> {
+ pub fn new(config: Config, background: Arc<BackgroundRunner>) -> Result<Arc<Self>, Error> {
+ info!("Opening database...");
+ let mut db_path = config.metadata_dir.clone();
+ std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory");
+ let db = match config.db_engine.as_str() {
+ "sled" => {
+ db_path.push("db");
+ info!("Opening Sled database at: {}", db_path.display());
+ let db = db::sled_adapter::sled::Config::default()
+ .path(&db_path)
+ .cache_capacity(config.sled_cache_capacity)
+ .flush_every_ms(Some(config.sled_flush_every_ms))
+ .open()
+ .expect("Unable to open sled DB");
+ db::sled_adapter::SledDb::init(db)
+ }
+ "sqlite" | "sqlite3" | "rusqlite" => {
+ db_path.push("db.sqlite");
+ info!("Opening Sqlite database at: {}", db_path.display());
+ let db = db::sqlite_adapter::rusqlite::Connection::open(db_path)
+ .expect("Unable to open sqlite DB");
+ db::sqlite_adapter::SqliteDb::init(db)
+ }
+ "lmdb" | "heed" => {
+ db_path.push("db.lmdb");
+ info!("Opening LMDB database at: {}", db_path.display());
+ std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory");
+ let map_size = garage_db::lmdb_adapter::recommended_map_size();
+
+ let db = db::lmdb_adapter::heed::EnvOpenOptions::new()
+ .max_dbs(100)
+ .map_size(map_size)
+ .open(&db_path)
+ .expect("Unable to open LMDB DB");
+ db::lmdb_adapter::LmdbDb::init(db)
+ }
+ e => {
+ return Err(Error::Message(format!(
+ "Unsupported DB engine: {} (options: sled, sqlite, lmdb)",
+ e
+ )));
+ }
+ };
+
let network_key = NetworkKey::from_slice(
&hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..],
)
@@ -155,12 +200,16 @@ impl Garage {
&db,
);
+ info!("Initialize object counter table...");
+ let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
+
info!("Initialize object_table...");
#[allow(clippy::redundant_clone)]
let object_table = Table::new(
ObjectTable {
background: background.clone(),
version_table: version_table.clone(),
+ object_counter_table: object_counter_table.clone(),
},
meta_rep_param.clone(),
system.clone(),
@@ -171,9 +220,8 @@ impl Garage {
#[cfg(feature = "k2v")]
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
- info!("Initialize Garage...");
-
- Arc::new(Self {
+ // -- done --
+ Ok(Arc::new(Self {
config,
db,
background,
@@ -183,11 +231,12 @@ impl Garage {
bucket_alias_table,
key_table,
object_table,
+ object_counter_table,
version_table,
block_ref_table,
#[cfg(feature = "k2v")]
k2v,
- })
+ }))
}
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 2602d5d9..36e8172b 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -1,3 +1,4 @@
+use core::ops::Bound;
use std::collections::{hash_map, BTreeMap, HashMap};
use std::marker::PhantomData;
use std::sync::Arc;
@@ -12,30 +13,36 @@ use garage_rpc::ring::Ring;
use garage_rpc::system::System;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::time::*;
use garage_table::crdt::*;
-use garage_table::replication::TableShardedReplication;
+use garage_table::replication::*;
use garage_table::*;
-pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static {
- const NAME: &'static str;
- type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
- type S: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static {
+ const COUNTER_TABLE_NAME: &'static str;
+
+ type CP: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ type CS: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+
+ fn counter_partition_key(&self) -> &Self::CP;
+ fn counter_sort_key(&self) -> &Self::CS;
+ fn counts(&self) -> Vec<(&'static str, i64)>;
}
/// A counter entry in the global table
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
-pub struct CounterEntry<T: CounterSchema> {
- pub pk: T::P,
- pub sk: T::S,
+#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
+pub struct CounterEntry<T: CountedItem> {
+ pub pk: T::CP,
+ pub sk: T::CS,
pub values: BTreeMap<String, CounterValue>,
}
-impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> {
- fn partition_key(&self) -> &T::P {
+impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
+ fn partition_key(&self) -> &T::CP {
&self.pk
}
- fn sort_key(&self) -> &T::S {
+ fn sort_key(&self) -> &T::CS {
&self.sk
}
fn is_tombstone(&self) -> bool {
@@ -45,7 +52,7 @@ impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> {
}
}
-impl<T: CounterSchema> CounterEntry<T> {
+impl<T: CountedItem> CounterEntry<T> {
pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> {
let nodes = &ring.layout.node_id_vec[..];
self.filtered_values_with_nodes(nodes)
@@ -78,7 +85,7 @@ pub struct CounterValue {
pub node_values: BTreeMap<Uuid, (u64, i64)>,
}
-impl<T: CounterSchema> Crdt for CounterEntry<T> {
+impl<T: CountedItem> Crdt for CounterEntry<T> {
fn merge(&mut self, other: &Self) {
for (name, e2) in other.values.iter() {
if let Some(e) = self.values.get_mut(name) {
@@ -104,15 +111,15 @@ impl Crdt for CounterValue {
}
}
-pub struct CounterTable<T: CounterSchema> {
+pub struct CounterTable<T: CountedItem> {
_phantom_t: PhantomData<T>,
}
-impl<T: CounterSchema> TableSchema for CounterTable<T> {
- const TABLE_NAME: &'static str = T::NAME;
+impl<T: CountedItem> TableSchema for CounterTable<T> {
+ const TABLE_NAME: &'static str = T::COUNTER_TABLE_NAME;
- type P = T::P;
- type S = T::S;
+ type P = T::CP;
+ type S = T::CS;
type E = CounterEntry<T>;
type Filter = (DeletedFilter, Vec<Uuid>);
@@ -131,14 +138,14 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> {
// ----
-pub struct IndexCounter<T: CounterSchema> {
+pub struct IndexCounter<T: CountedItem> {
this_node: Uuid,
local_counter: db::Tree,
- propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>,
+ propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry<T>)>,
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
}
-impl<T: CounterSchema> IndexCounter<T> {
+impl<T: CountedItem> IndexCounter<T> {
pub fn new(
system: Arc<System>,
replication: TableShardedReplication,
@@ -151,7 +158,7 @@ impl<T: CounterSchema> IndexCounter<T> {
let this = Arc::new(Self {
this_node: system.id,
local_counter: db
- .open_tree(format!("local_counter:{}", T::NAME))
+ .open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME))
.expect("Unable to open local counter tree"),
propagate_tx,
table: Table::new(
@@ -166,7 +173,7 @@ impl<T: CounterSchema> IndexCounter<T> {
let this2 = this.clone();
background.spawn_worker(
- format!("{} index counter propagator", T::NAME),
+ format!("{} index counter propagator", T::COUNTER_TABLE_NAME),
move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit),
);
this
@@ -175,24 +182,45 @@ impl<T: CounterSchema> IndexCounter<T> {
pub fn count(
&self,
tx: &mut db::Transaction,
- pk: &T::P,
- sk: &T::S,
- counts: &[(&str, i64)],
+ old: Option<&T>,
+ new: Option<&T>,
) -> db::TxResult<(), Error> {
+ let pk = old
+ .map(|e| e.counter_partition_key())
+ .unwrap_or_else(|| new.unwrap().counter_partition_key());
+ let sk = old
+ .map(|e| e.counter_sort_key())
+ .unwrap_or_else(|| new.unwrap().counter_sort_key());
+
+ // calculate counter differences
+ let mut counts = HashMap::new();
+ for (k, v) in old.map(|x| x.counts()).unwrap_or_default() {
+ *counts.entry(k).or_insert(0) -= v;
+ }
+ for (k, v) in new.map(|x| x.counts()).unwrap_or_default() {
+ *counts.entry(k).or_insert(0) += v;
+ }
+
+ // update local counter table
let tree_key = self.table.data.tree_key(pk, sk);
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
- Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes)
- .map_err(Error::RmpDecode)
- .map_err(db::TxError::Abort)?,
+ Some(old_bytes) => {
+ rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes)
+ .map_err(Error::RmpDecode)
+ .map_err(db::TxError::Abort)?
+ }
None => LocalCounterEntry {
+ pk: pk.clone(),
+ sk: sk.clone(),
values: BTreeMap::new(),
},
};
+ let now = now_msec();
for (s, inc) in counts.iter() {
let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
- ent.0 += 1;
+ ent.0 = std::cmp::max(ent.0 + 1, now);
ent.1 += *inc;
}
@@ -213,7 +241,7 @@ impl<T: CounterSchema> IndexCounter<T> {
async fn propagate_loop(
self: Arc<Self>,
- mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>,
+ mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
must_exit: watch::Receiver<bool>,
) {
// This loop batches updates to counters to be sent all at once.
@@ -236,7 +264,7 @@ impl<T: CounterSchema> IndexCounter<T> {
if let Some((pk, sk, counters)) = ent {
let tree_key = self.table.data.tree_key(&pk, &sk);
- let dist_entry = counters.into_counter_entry::<T>(self.this_node, pk, sk);
+ let dist_entry = counters.into_counter_entry(self.this_node);
match buf.entry(tree_key) {
hash_map::Entry::Vacant(e) => {
e.insert(dist_entry);
@@ -255,10 +283,10 @@ impl<T: CounterSchema> IndexCounter<T> {
if let Err(e) = self.table.insert_many(entries).await {
errors += 1;
if errors >= 2 && *must_exit.borrow() {
- error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e);
+ error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e);
break;
}
- warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors);
+ warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors);
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
@@ -272,23 +300,155 @@ impl<T: CounterSchema> IndexCounter<T> {
}
}
}
+
+ pub fn offline_recount_all<TS, TR>(
+ &self,
+ counted_table: &Arc<Table<TS, TR>>,
+ ) -> Result<(), Error>
+ where
+ TS: TableSchema<E = T>,
+ TR: TableReplication,
+ {
+ let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> {
+ let entry_k = self
+ .table
+ .data
+ .tree_key(entry.partition_key(), entry.sort_key());
+ self.table
+ .data
+ .update_entry_with(&entry_k, |ent| match ent {
+ Some(mut ent) => {
+ ent.merge(&entry);
+ ent
+ }
+ None => entry.clone(),
+ })?;
+ Ok(())
+ };
+
+ // 1. Set all old local counters to zero
+ let now = now_msec();
+ let mut next_start: Option<Vec<u8>> = None;
+ loop {
+ let low_bound = match next_start.take() {
+ Some(v) => Bound::Excluded(v),
+ None => Bound::Unbounded,
+ };
+ let mut batch = vec![];
+ for item in self.local_counter.range((low_bound, Bound::Unbounded))? {
+ batch.push(item?);
+ if batch.len() > 1000 {
+ break;
+ }
+ }
+
+ if batch.is_empty() {
+ break;
+ }
+
+ info!("zeroing old counters... ({})", hex::encode(&batch[0].0));
+ for (local_counter_k, local_counter) in batch {
+ let mut local_counter =
+ rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?;
+
+ for (_, tv) in local_counter.values.iter_mut() {
+ tv.0 = std::cmp::max(tv.0 + 1, now);
+ tv.1 = 0;
+ }
+
+ let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
+ self.local_counter
+ .insert(&local_counter_k, &local_counter_bytes)?;
+
+ let counter_entry = local_counter.into_counter_entry(self.this_node);
+ save_counter_entry(counter_entry)?;
+
+ next_start = Some(local_counter_k);
+ }
+ }
+
+ // 2. Recount all table entries
+ let now = now_msec();
+ let mut next_start: Option<Vec<u8>> = None;
+ loop {
+ let low_bound = match next_start.take() {
+ Some(v) => Bound::Excluded(v),
+ None => Bound::Unbounded,
+ };
+ let mut batch = vec![];
+ for item in counted_table
+ .data
+ .store
+ .range((low_bound, Bound::Unbounded))?
+ {
+ batch.push(item?);
+ if batch.len() > 1000 {
+ break;
+ }
+ }
+
+ if batch.is_empty() {
+ break;
+ }
+
+ info!("counting entries... ({})", hex::encode(&batch[0].0));
+ for (counted_entry_k, counted_entry) in batch {
+ let counted_entry = counted_table.data.decode_entry(&counted_entry)?;
+
+ let pk = counted_entry.counter_partition_key();
+ let sk = counted_entry.counter_sort_key();
+ let counts = counted_entry.counts();
+
+ let local_counter_key = self.table.data.tree_key(pk, sk);
+ let mut local_counter = match self.local_counter.get(&local_counter_key)? {
+ Some(old_bytes) => {
+ let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(
+ &old_bytes,
+ )?;
+ assert!(ent.pk == *pk);
+ assert!(ent.sk == *sk);
+ ent
+ }
+ None => LocalCounterEntry {
+ pk: pk.clone(),
+ sk: sk.clone(),
+ values: BTreeMap::new(),
+ },
+ };
+ for (s, v) in counts.iter() {
+ let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0));
+ tv.0 = std::cmp::max(tv.0 + 1, now);
+ tv.1 += v;
+ }
+
+ let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
+ self.local_counter
+ .insert(&local_counter_key, local_counter_bytes)?;
+
+ let counter_entry = local_counter.into_counter_entry(self.this_node);
+ save_counter_entry(counter_entry)?;
+
+ next_start = Some(counted_entry_k);
+ }
+ }
+
+ // Done
+ Ok(())
+ }
}
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
-struct LocalCounterEntry {
+struct LocalCounterEntry<T: CountedItem> {
+ pk: T::CP,
+ sk: T::CS,
values: BTreeMap<String, (u64, i64)>,
}
-impl LocalCounterEntry {
- fn into_counter_entry<T: CounterSchema>(
- self,
- this_node: Uuid,
- pk: T::P,
- sk: T::S,
- ) -> CounterEntry<T> {
+impl<T: CountedItem> LocalCounterEntry<T> {
+ fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> {
CounterEntry {
- pk,
- sk,
+ pk: self.pk,
+ sk: self.sk,
values: self
.values
.into_iter()
diff --git a/src/model/k2v/counter_table.rs b/src/model/k2v/counter_table.rs
deleted file mode 100644
index 4856eb2b..00000000
--- a/src/model/k2v/counter_table.rs
+++ /dev/null
@@ -1,20 +0,0 @@
-use garage_util::data::*;
-
-use crate::index_counter::*;
-
-pub const ENTRIES: &str = "entries";
-pub const CONFLICTS: &str = "conflicts";
-pub const VALUES: &str = "values";
-pub const BYTES: &str = "bytes";
-
-#[derive(PartialEq, Clone)]
-pub struct K2VCounterTable;
-
-impl CounterSchema for K2VCounterTable {
- const NAME: &'static str = "k2v_index_counter";
-
- // Partition key = bucket id
- type P = Uuid;
- // Sort key = K2V item's partition key
- type S = String;
-}
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index 991fe66d..baa1db4b 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -10,9 +10,13 @@ use garage_table::*;
use crate::index_counter::*;
use crate::k2v::causality::*;
-use crate::k2v::counter_table::*;
use crate::k2v::poll::*;
+pub const ENTRIES: &str = "entries";
+pub const CONFLICTS: &str = "conflicts";
+pub const VALUES: &str = "values";
+pub const BYTES: &str = "bytes";
+
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct K2VItem {
pub partition: K2VItemPartition,
@@ -112,27 +116,6 @@ impl K2VItem {
ent.discard();
}
}
-
- // returns counters: (non-deleted entries, conflict entries, non-tombstone values, bytes used)
- fn stats(&self) -> (i64, i64, i64, i64) {
- let values = self.values();
-
- let n_entries = if self.is_tombstone() { 0 } else { 1 };
- let n_conflicts = if values.len() > 1 { 1 } else { 0 };
- let n_values = values
- .iter()
- .filter(|v| matches!(v, DvvsValue::Value(_)))
- .count() as i64;
- let n_bytes = values
- .iter()
- .map(|v| match v {
- DvvsValue::Deleted => 0,
- DvvsValue::Value(v) => v.len() as i64,
- })
- .sum();
-
- (n_entries, n_conflicts, n_values, n_bytes)
- }
}
impl DvvsEntry {
@@ -204,7 +187,7 @@ impl Entry<K2VItemPartition, String> for K2VItem {
}
pub struct K2VItemTable {
- pub(crate) counter_table: Arc<IndexCounter<K2VCounterTable>>,
+ pub(crate) counter_table: Arc<IndexCounter<K2VItem>>,
pub(crate) subscriptions: Arc<SubscriptionManager>,
}
@@ -229,40 +212,14 @@ impl TableSchema for K2VItemTable {
new: Option<&Self::E>,
) -> db::TxOpResult<()> {
// 1. Count
- let (old_entries, old_conflicts, old_values, old_bytes) = match old {
- None => (0, 0, 0, 0),
- Some(e) => e.stats(),
- };
- let (new_entries, new_conflicts, new_values, new_bytes) = match new {
- None => (0, 0, 0, 0),
- Some(e) => e.stats(),
- };
-
- let count_pk = old
- .map(|e| e.partition.bucket_id)
- .unwrap_or_else(|| new.unwrap().partition.bucket_id);
- let count_sk = old
- .map(|e| &e.partition.partition_key)
- .unwrap_or_else(|| &new.unwrap().partition.partition_key);
-
- let counter_res = self.counter_table.count(
- tx,
- &count_pk,
- count_sk,
- &[
- (ENTRIES, new_entries - old_entries),
- (CONFLICTS, new_conflicts - old_conflicts),
- (VALUES, new_values - old_values),
- (BYTES, new_bytes - old_bytes),
- ],
- );
+ let counter_res = self.counter_table.count(tx, old, new);
if let Err(e) = db::unabort(counter_res)? {
// This result can be returned by `counter_table.count()` for instance
// if messagepack serialization or deserialization fails at some step.
// Warn admin but ignore this error for now, that's all we can do.
error!(
- "Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!",
- count_pk, count_sk, e
+ "Unable to update K2V item counter: {}. Index values will be wrong!",
+ e
);
}
@@ -282,6 +239,47 @@ impl TableSchema for K2VItemTable {
}
}
+impl CountedItem for K2VItem {
+ const COUNTER_TABLE_NAME: &'static str = "k2v_index_counter_v2";
+
+ // Partition key = bucket id
+ type CP = Uuid;
+ // Sort key = K2V item's partition key
+ type CS = String;
+
+ fn counter_partition_key(&self) -> &Uuid {
+ &self.partition.bucket_id
+ }
+ fn counter_sort_key(&self) -> &String {
+ &self.partition.partition_key
+ }
+
+ fn counts(&self) -> Vec<(&'static str, i64)> {
+ let values = self.values();
+
+ let n_entries = if self.is_tombstone() { 0 } else { 1 };
+ let n_conflicts = if values.len() > 1 { 1 } else { 0 };
+ let n_values = values
+ .iter()
+ .filter(|v| matches!(v, DvvsValue::Value(_)))
+ .count() as i64;
+ let n_bytes = values
+ .iter()
+ .map(|v| match v {
+ DvvsValue::Deleted => 0,
+ DvvsValue::Value(v) => v.len() as i64,
+ })
+ .sum();
+
+ vec![
+ (ENTRIES, n_entries),
+ (CONFLICTS, n_conflicts),
+ (VALUES, n_values),
+ (BYTES, n_bytes),
+ ]
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs
index 664172a6..f6a96151 100644
--- a/src/model/k2v/mod.rs
+++ b/src/model/k2v/mod.rs
@@ -1,6 +1,5 @@
pub mod causality;
-pub mod counter_table;
pub mod item_table;
pub mod poll;
diff --git a/src/model/migrate.rs b/src/model/migrate.rs
index 25acb4b0..5fc67069 100644
--- a/src/model/migrate.rs
+++ b/src/model/migrate.rs
@@ -77,6 +77,7 @@ impl Migrate {
local_aliases: LwwMap::new(),
website_config: Lww::new(website),
cors_config: Lww::new(None),
+ quotas: Lww::new(Default::default()),
}),
})
.await?;
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs
index 62f5d8d9..a3914c36 100644
--- a/src/model/s3/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -11,10 +11,15 @@ use garage_table::crdt::*;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
+use crate::index_counter::*;
use crate::s3::version_table::*;
use garage_model_050::object_table as old;
+pub const OBJECTS: &str = "objects";
+pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads";
+pub const BYTES: &str = "bytes";
+
/// An object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
@@ -218,6 +223,7 @@ impl Crdt for Object {
pub struct ObjectTable {
pub background: Arc<BackgroundRunner>,
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
+ pub object_counter_table: Arc<IndexCounter<Object>>,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
@@ -236,10 +242,20 @@ impl TableSchema for ObjectTable {
fn updated(
&self,
- _tx: &mut db::Transaction,
+ tx: &mut db::Transaction,
old: Option<&Self::E>,
new: Option<&Self::E>,
) -> db::TxOpResult<()> {
+ // 1. Count
+ let counter_res = self.object_counter_table.count(tx, old, new);
+ if let Err(e) = db::unabort(counter_res)? {
+ error!(
+ "Unable to update object counter: {}. Index values will be wrong!",
+ e
+ );
+ }
+
+ // 2. Spawn threads that propagates deletions to version table
let version_table = self.version_table.clone();
let old = old.cloned();
let new = new.cloned();
@@ -283,6 +299,49 @@ impl TableSchema for ObjectTable {
}
}
+impl CountedItem for Object {
+ const COUNTER_TABLE_NAME: &'static str = "bucket_object_counter";
+
+ // Partition key = bucket id
+ type CP = Uuid;
+ // Sort key = nothing
+ type CS = EmptyKey;
+
+ fn counter_partition_key(&self) -> &Uuid {
+ &self.bucket_id
+ }
+ fn counter_sort_key(&self) -> &EmptyKey {
+ &EmptyKey
+ }
+
+ fn counts(&self) -> Vec<(&'static str, i64)> {
+ let versions = self.versions();
+ let n_objects = if versions.iter().any(|v| v.is_data()) {
+ 1
+ } else {
+ 0
+ };
+ let n_unfinished_uploads = versions
+ .iter()
+ .filter(|v| matches!(v.state, ObjectVersionState::Uploading(_)))
+ .count();
+ let n_bytes = versions
+ .iter()
+ .map(|v| match &v.state {
+ ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _))
+ | ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta.size,
+ _ => 0,
+ })
+ .sum::<u64>();
+
+ vec![
+ (OBJECTS, n_objects),
+ (UNFINISHED_UPLOADS, n_unfinished_uploads as i64),
+ (BYTES, n_bytes as i64),
+ ]
+ }
+}
+
// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv
// (we just want to change bucket into bucket_id by hashing it)