diff options
Diffstat (limited to 'src')
29 files changed, 854 insertions, 272 deletions
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 57e3e5cf..c3b16715 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -156,12 +156,7 @@ impl ApiHandler for AdminApiServer { } Endpoint::CreateBucket => handle_create_bucket(&self.garage, req).await, Endpoint::DeleteBucket { id } => handle_delete_bucket(&self.garage, id).await, - Endpoint::PutBucketWebsite { id } => { - handle_put_bucket_website(&self.garage, id, req).await - } - Endpoint::DeleteBucketWebsite { id } => { - handle_delete_bucket_website(&self.garage, id).await - } + Endpoint::UpdateBucket { id } => handle_update_bucket(&self.garage, id, req).await, // Bucket-key permissions Endpoint::BucketAllowKey => { handle_bucket_change_key_perm(&self.garage, req, true).await diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index 7f9a813f..ac8a8a40 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -14,6 +14,7 @@ use garage_model::bucket_alias_table::*; use garage_model::bucket_table::*; use garage_model::garage::Garage; use garage_model::permission::*; +use garage_model::s3::object_table::*; use crate::admin::error::*; use crate::admin::key::ApiBucketKeyPerm; @@ -77,6 +78,13 @@ struct BucketLocalAlias { alias: String, } +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct ApiBucketQuotas { + max_size: Option<u64>, + max_objects: Option<u64>, +} + pub async fn handle_get_bucket_info( garage: &Arc<Garage>, id: Option<String>, @@ -108,6 +116,14 @@ async fn bucket_info_results( .get_existing_bucket(bucket_id) .await?; + let counters = garage + .object_counter_table + .table + .get(&bucket_id, &EmptyKey) + .await? + .map(|x| x.filtered_values(&garage.system.ring.borrow())) + .unwrap_or_default(); + let mut relevant_keys = HashMap::new(); for (k, _) in bucket .state @@ -148,6 +164,7 @@ async fn bucket_info_results( let state = bucket.state.as_option().unwrap(); + let quotas = state.quotas.get(); let res = GetBucketInfoResult { id: hex::encode(&bucket.id), @@ -191,6 +208,16 @@ async fn bucket_info_results( } }) .collect::<Vec<_>>(), + objects: counters.get(OBJECTS).cloned().unwrap_or_default(), + bytes: counters.get(BYTES).cloned().unwrap_or_default(), + unfinshed_uploads: counters + .get(UNFINISHED_UPLOADS) + .cloned() + .unwrap_or_default(), + quotas: ApiBucketQuotas { + max_size: quotas.max_size, + max_objects: quotas.max_objects, + }, }; Ok(json_ok_response(&res)?) @@ -205,6 +232,10 @@ struct GetBucketInfoResult { #[serde(default)] website_config: Option<GetBucketInfoWebsiteResult>, keys: Vec<GetBucketInfoKey>, + objects: i64, + bytes: i64, + unfinshed_uploads: i64, + quotas: ApiBucketQuotas, } #[derive(Serialize)] @@ -363,14 +394,12 @@ pub async fn handle_delete_bucket( .body(Body::empty())?) } -// ---- BUCKET WEBSITE CONFIGURATION ---- - -pub async fn handle_put_bucket_website( +pub async fn handle_update_bucket( garage: &Arc<Garage>, id: String, req: Request<Body>, ) -> Result<Response<Body>, Error> { - let req = parse_json_body::<PutBucketWebsiteRequest>(req).await?; + let req = parse_json_body::<UpdateBucketRequest>(req).await?; let bucket_id = parse_bucket_id(&id)?; let mut bucket = garage @@ -379,10 +408,31 @@ pub async fn handle_put_bucket_website( .await?; let state = bucket.state.as_option_mut().unwrap(); - state.website_config.update(Some(WebsiteConfig { - index_document: req.index_document, - error_document: req.error_document, - })); + + if let Some(wa) = req.website_access { + if wa.enabled { + state.website_config.update(Some(WebsiteConfig { + index_document: wa.index_document.ok_or_bad_request( + "Please specify indexDocument when enabling website access.", + )?, + error_document: wa.error_document, + })); + } else { + if wa.index_document.is_some() || wa.error_document.is_some() { + return Err(Error::bad_request( + "Cannot specify indexDocument or errorDocument when disabling website access.", + )); + } + state.website_config.update(None); + } + } + + if let Some(q) = req.quotas { + state.quotas.update(BucketQuotas { + max_size: q.max_size, + max_objects: q.max_objects, + }); + } garage.bucket_table.insert(&bucket).await?; @@ -391,29 +441,17 @@ pub async fn handle_put_bucket_website( #[derive(Deserialize)] #[serde(rename_all = "camelCase")] -struct PutBucketWebsiteRequest { - index_document: String, - #[serde(default)] - error_document: Option<String>, +struct UpdateBucketRequest { + website_access: Option<UpdateBucketWebsiteAccess>, + quotas: Option<ApiBucketQuotas>, } -pub async fn handle_delete_bucket_website( - garage: &Arc<Garage>, - id: String, -) -> Result<Response<Body>, Error> { - let bucket_id = parse_bucket_id(&id)?; - - let mut bucket = garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - - let state = bucket.state.as_option_mut().unwrap(); - state.website_config.update(None); - - garage.bucket_table.insert(&bucket).await?; - - bucket_info_results(garage, bucket_id).await +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct UpdateBucketWebsiteAccess { + enabled: bool, + index_document: Option<String>, + error_document: Option<String>, } // ---- BUCKET/KEY PERMISSIONS ---- diff --git a/src/api/admin/router.rs b/src/api/admin/router.rs index 93639873..3eee8b67 100644 --- a/src/api/admin/router.rs +++ b/src/api/admin/router.rs @@ -48,10 +48,7 @@ pub enum Endpoint { DeleteBucket { id: String, }, - PutBucketWebsite { - id: String, - }, - DeleteBucketWebsite { + UpdateBucket { id: String, }, // Bucket-Key Permissions @@ -113,8 +110,7 @@ impl Endpoint { GET "/v0/bucket" => ListBuckets, POST "/v0/bucket" => CreateBucket, DELETE "/v0/bucket" if id => DeleteBucket (query::id), - PUT "/v0/bucket/website" if id => PutBucketWebsite (query::id), - DELETE "/v0/bucket/website" if id => DeleteBucketWebsite (query::id), + PUT "/v0/bucket" if id => UpdateBucket (query::id), // Bucket-key permissions POST "/v0/bucket/allow" => BucketAllowKey, POST "/v0/bucket/deny" => BucketDenyKey, diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index d5db906d..210950bf 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -10,7 +10,7 @@ use garage_rpc::ring::Ring; use garage_table::util::*; use garage_model::garage::Garage; -use garage_model::k2v::counter_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; +use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; use crate::k2v::error::*; use crate::k2v::range::read_range; diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index d1d6288c..78dfeeac 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -212,7 +212,7 @@ impl ApiHandler for S3ApiServer { .await } Endpoint::PutObject { key } => { - handle_put(garage, req, bucket_id, &key, content_sha256).await + handle_put(garage, req, &bucket, &key, content_sha256).await } Endpoint::AbortMultipartUpload { key, upload_id } => { handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await @@ -226,7 +226,7 @@ impl ApiHandler for S3ApiServer { garage, req, &bucket_name, - bucket_id, + &bucket, &key, &upload_id, content_sha256, diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index dc640f43..d063faa4 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -22,7 +22,7 @@ use crate::signature::payload::{parse_date, verify_v4}; pub async fn handle_post_object( garage: Arc<Garage>, req: Request<Body>, - bucket: String, + bucket_name: String, ) -> Result<Response<Body>, Error> { let boundary = req .headers() @@ -126,13 +126,18 @@ pub async fn handle_post_object( let bucket_id = garage .bucket_helper() - .resolve_bucket(&bucket, &api_key) + .resolve_bucket(&bucket_name, &api_key) .await?; if !api_key.allow_write(&bucket_id) { return Err(Error::forbidden("Operation is not allowed for this key.")); } + let bucket = garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + let decoded_policy = base64::decode(&policy).ok_or_bad_request("Invalid policy")?; let decoded_policy: Policy = serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?; @@ -227,7 +232,7 @@ pub async fn handle_post_object( garage, headers, StreamLimiter::new(stream, conditions.content_length), - bucket_id, + &bucket, &key, None, None, @@ -244,7 +249,7 @@ pub async fn handle_post_object( { target .query_pairs_mut() - .append_pair("bucket", &bucket) + .append_pair("bucket", &bucket_name) .append_pair("key", &key) .append_pair("etag", &etag); let target = target.to_string(); @@ -289,7 +294,7 @@ pub async fn handle_post_object( let xml = s3_xml::PostObject { xmlns: (), location: s3_xml::Value(location), - bucket: s3_xml::Value(bucket), + bucket: s3_xml::Value(bucket_name), key: s3_xml::Value(key), etag: s3_xml::Value(etag), }; diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 8b06ef3f..9ef37421 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; use std::sync::Arc; use futures::prelude::*; @@ -14,7 +14,9 @@ use garage_util::error::Error as GarageError; use garage_util::time::*; use garage_block::manager::INLINE_THRESHOLD; +use garage_model::bucket_table::Bucket; use garage_model::garage::Garage; +use garage_model::index_counter::CountedItem; use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; @@ -26,7 +28,7 @@ use crate::signature::verify_signed_content; pub async fn handle_put( garage: Arc<Garage>, req: Request<Body>, - bucket_id: Uuid, + bucket: &Bucket, key: &str, content_sha256: Option<Hash>, ) -> Result<Response<Body>, Error> { @@ -46,7 +48,7 @@ pub async fn handle_put( garage, headers, body, - bucket_id, + bucket, key, content_md5, content_sha256, @@ -59,7 +61,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage: Arc<Garage>, headers: ObjectVersionHeaders, body: S, - bucket_id: Uuid, + bucket: &Bucket, key: &str, content_md5: Option<String>, content_sha256: Option<FixedBytes32>, @@ -80,6 +82,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let data_md5sum_hex = hex::encode(data_md5sum); let data_sha256sum = sha256sum(&first_block[..]); + let size = first_block.len() as u64; ensure_checksum_matches( data_md5sum.as_slice(), @@ -88,20 +91,22 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( content_sha256, )?; + check_quotas(&garage, bucket, key, size).await?; + let object_version = ObjectVersion { uuid: version_uuid, timestamp: version_timestamp, state: ObjectVersionState::Complete(ObjectVersionData::Inline( ObjectVersionMeta { headers, - size: first_block.len() as u64, + size, etag: data_md5sum_hex.clone(), }, first_block, )), }; - let object = Object::new(bucket_id, key.into(), vec![object_version]); + let object = Object::new(bucket.id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; return Ok((version_uuid, data_md5sum_hex)); @@ -114,36 +119,42 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( timestamp: version_timestamp, state: ObjectVersionState::Uploading(headers.clone()), }; - let object = Object::new(bucket_id, key.into(), vec![object_version.clone()]); + let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; // Initialize corresponding entry in version table // Write this entry now, even with empty block list, // to prevent block_ref entries from being deleted (they can be deleted // if the reference a version that isn't found in the version table) - let version = Version::new(version_uuid, bucket_id, key.into(), false); + let version = Version::new(version_uuid, bucket.id, key.into(), false); garage.version_table.insert(&version).await?; // Transfer data and verify checksum let first_block_hash = blake2sum(&first_block[..]); - let tx_result = read_and_put_blocks( - &garage, - &version, - 1, - first_block, - first_block_hash, - &mut chunker, - ) - .await - .and_then(|(total_size, data_md5sum, data_sha256sum)| { + + let tx_result = (|| async { + let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( + &garage, + &version, + 1, + first_block, + first_block_hash, + &mut chunker, + ) + .await?; + ensure_checksum_matches( data_md5sum.as_slice(), data_sha256sum, content_md5.as_deref(), content_sha256, - ) - .map(|()| (total_size, data_md5sum)) - }); + )?; + + check_quotas(&garage, bucket, key, total_size).await?; + + Ok((total_size, data_md5sum)) + })() + .await; // If something went wrong, clean up let (total_size, md5sum_arr) = match tx_result { @@ -151,7 +162,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( Err(e) => { // Mark object as aborted, this will free the blocks further down object_version.state = ObjectVersionState::Aborted; - let object = Object::new(bucket_id, key.into(), vec![object_version.clone()]); + let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; return Err(e); } @@ -167,7 +178,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( }, first_block_hash, )); - let object = Object::new(bucket_id, key.into(), vec![object_version]); + let object = Object::new(bucket.id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; Ok((version_uuid, md5sum_hex)) @@ -200,6 +211,64 @@ fn ensure_checksum_matches( Ok(()) } +/// Check that inserting this object with this size doesn't exceed bucket quotas +async fn check_quotas( + garage: &Arc<Garage>, + bucket: &Bucket, + key: &str, + size: u64, +) -> Result<(), Error> { + let quotas = bucket.state.as_option().unwrap().quotas.get(); + if quotas.max_objects.is_none() && quotas.max_size.is_none() { + return Ok(()); + }; + + let key = key.to_string(); + let (prev_object, counters) = futures::try_join!( + garage.object_table.get(&bucket.id, &key), + garage.object_counter_table.table.get(&bucket.id, &EmptyKey), + )?; + + let counters = counters + .map(|x| x.filtered_values(&garage.system.ring.borrow())) + .unwrap_or_default(); + + let (prev_cnt_obj, prev_cnt_size) = match prev_object { + Some(o) => { + let prev_cnt = o.counts().into_iter().collect::<HashMap<_, _>>(); + ( + prev_cnt.get(OBJECTS).cloned().unwrap_or_default(), + prev_cnt.get(BYTES).cloned().unwrap_or_default(), + ) + } + None => (0, 0), + }; + let cnt_obj_diff = 1 - prev_cnt_obj; + let cnt_size_diff = size as i64 - prev_cnt_size; + + if let Some(mo) = quotas.max_objects { + let current_objects = counters.get(OBJECTS).cloned().unwrap_or_default(); + if cnt_obj_diff > 0 && current_objects + cnt_obj_diff > mo as i64 { + return Err(Error::forbidden(format!( + "Object quota is reached, maximum objects for this bucket: {}", + mo + ))); + } + } + + if let Some(ms) = quotas.max_size { + let current_size = counters.get(BYTES).cloned().unwrap_or_default(); + if cnt_size_diff > 0 && current_size + cnt_size_diff > ms as i64 { + return Err(Error::forbidden(format!( + "Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes.", + ms, current_size, size + ))); + } + } + + Ok(()) +} + async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage: &Garage, version: &Version, @@ -473,7 +542,7 @@ pub async fn handle_complete_multipart_upload( garage: Arc<Garage>, req: Request<Body>, bucket_name: &str, - bucket_id: Uuid, + bucket: &Bucket, key: &str, upload_id: &str, content_sha256: Option<Hash>, @@ -497,7 +566,7 @@ pub async fn handle_complete_multipart_upload( // Get object and version let key = key.to_string(); let (object, version) = futures::try_join!( - garage.object_table.get(&bucket_id, &key), + garage.object_table.get(&bucket.id, &key), garage.version_table.get(&version_uuid, &EmptyKey), )?; @@ -590,6 +659,14 @@ pub async fn handle_complete_multipart_upload( // Calculate total size of final object let total_size = version.blocks.items().iter().map(|x| x.1.size).sum(); + if let Err(e) = check_quotas(&garage, bucket, &key, total_size).await { + object_version.state = ObjectVersionState::Aborted; + let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); + garage.object_table.insert(&final_object).await?; + + return Err(e); + } + // Write final object version object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { @@ -600,7 +677,7 @@ pub async fn handle_complete_multipart_upload( version.blocks.items()[0].1.hash, )); - let final_object = Object::new(bucket_id, key.clone(), vec![object_version]); + let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; // Send response saying ok we're done diff --git a/src/db/lib.rs b/src/db/lib.rs index e9d3ea18..8188c715 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -197,6 +197,11 @@ impl Tree { pub fn remove<T: AsRef<[u8]>>(&self, key: T) -> Result<Option<Value>> { self.0.remove(self.1, key.as_ref()) } + /// Clears all values from the tree + #[inline] + pub fn clear(&self) -> Result<()> { + self.0.clear(self.1) + } #[inline] pub fn iter(&self) -> Result<ValueIter<'_>> { @@ -311,6 +316,7 @@ pub(crate) trait IDb: Send + Sync { fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>>; fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>; + fn clear(&self, tree: usize) -> Result<()>; fn iter(&self, tree: usize) -> Result<ValueIter<'_>>; fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>>; diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs index 62fcc3e6..fdb254c6 100644 --- a/src/db/lmdb_adapter.rs +++ b/src/db/lmdb_adapter.rs @@ -139,6 +139,14 @@ impl IDb for LmdbDb { Ok(old_val) } + fn clear(&self, tree: usize) -> Result<()> { + let tree = self.get_tree(tree)?; + let mut tx = self.db.write_txn()?; + tree.clear(&mut tx)?; + tx.commit()?; + Ok(()) + } + fn iter(&self, tree: usize) -> Result<ValueIter<'_>> { let tree = self.get_tree(tree)?; let tx = self.db.read_txn()?; diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs index 982f8d82..cf61867d 100644 --- a/src/db/sled_adapter.rs +++ b/src/db/sled_adapter.rs @@ -113,6 +113,12 @@ impl IDb for SledDb { Ok(old_val.map(|x| x.to_vec())) } + fn clear(&self, tree: usize) -> Result<()> { + let tree = self.get_tree(tree)?; + tree.clear()?; + Ok(()) + } + fn iter(&self, tree: usize) -> Result<ValueIter<'_>> { let tree = self.get_tree(tree)?; Ok(Box::new(tree.iter().map(|v| { diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index 14bf35ff..68d96ca0 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -182,6 +182,16 @@ impl IDb for SqliteDb { Ok(old_val) } + fn clear(&self, tree: usize) -> Result<()> { + trace!("clear {}: lock db", tree); + let this = self.0.lock().unwrap(); + trace!("clear {}: lock acquired", tree); + + let tree = this.get_tree(tree)?; + this.db.execute(&format!("DELETE FROM {}", tree), [])?; + Ok(()) + } + fn iter(&self, tree: usize) -> Result<ValueIter<'_>> { trace!("iter {}: lock db", tree); let this = self.0.lock().unwrap(); diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index eb643160..640e6975 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -30,6 +30,7 @@ garage_util = { version = "0.7.0", path = "../util" } garage_web = { version = "0.7.0", path = "../web" } bytes = "1.0" +bytesize = "1.1" hex = "0.4" tracing = { version = "0.1.30", features = ["log-always"] } pretty_env_logger = "0.4" diff --git a/src/garage/admin.rs b/src/garage/admin.rs index c662aa00..48914655 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -24,11 +24,12 @@ use garage_model::migrate::Migrate; use garage_model::permission::*; use crate::cli::*; -use crate::repair::Repair; +use crate::repair::online::OnlineRepair; pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; #[derive(Debug, Serialize, Deserialize)] +#[allow(clippy::large_enum_variant)] pub enum AdminRpc { BucketOperation(BucketOperation), KeyOperation(KeyOperation), @@ -39,7 +40,11 @@ pub enum AdminRpc { // Replies Ok(String), BucketList(Vec<Bucket>), - BucketInfo(Bucket, HashMap<String, Key>), + BucketInfo { + bucket: Bucket, + relevant_keys: HashMap<String, Key>, + counters: HashMap<String, i64>, + }, KeyList(Vec<(String, String)>), KeyInfo(Key, HashMap<Uuid, Bucket>), } @@ -72,6 +77,7 @@ impl AdminRpcHandler { BucketOperation::Allow(query) => self.handle_bucket_allow(query).await, BucketOperation::Deny(query) => self.handle_bucket_deny(query).await, BucketOperation::Website(query) => self.handle_bucket_website(query).await, + BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await, } } @@ -87,6 +93,7 @@ impl AdminRpcHandler { EnumerationOrder::Forward, ) .await?; + Ok(AdminRpc::BucketList(buckets)) } @@ -104,6 +111,15 @@ impl AdminRpcHandler { .get_existing_bucket(bucket_id) .await?; + let counters = self + .garage + .object_counter_table + .table + .get(&bucket_id, &EmptyKey) + .await? + .map(|x| x.filtered_values(&self.garage.system.ring.borrow())) + .unwrap_or_default(); + let mut relevant_keys = HashMap::new(); for (k, _) in bucket .state @@ -139,7 +155,11 @@ impl AdminRpcHandler { } } - Ok(AdminRpc::BucketInfo(bucket, relevant_keys)) + Ok(AdminRpc::BucketInfo { + bucket, + relevant_keys, + counters, + }) } #[allow(clippy::ptr_arg)] @@ -431,6 +451,60 @@ impl AdminRpcHandler { Ok(AdminRpc::Ok(msg)) } + async fn handle_bucket_set_quotas(&self, query: &SetQuotasOpt) -> Result<AdminRpc, Error> { + let bucket_id = self + .garage + .bucket_helper() + .resolve_global_bucket_name(&query.bucket) + .await? + .ok_or_bad_request("Bucket not found")?; + + let mut bucket = self + .garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + let bucket_state = bucket.state.as_option_mut().unwrap(); + + if query.max_size.is_none() && query.max_objects.is_none() { + return Err(Error::BadRequest( + "You must specify either --max-size or --max-objects (or both) for this command to do something.".to_string(), + )); + } + + let mut quotas = bucket_state.quotas.get().clone(); + + match query.max_size.as_ref().map(String::as_ref) { + Some("none") => quotas.max_size = None, + Some(v) => { + let bs = v + .parse::<bytesize::ByteSize>() + .ok_or_bad_request(format!("Invalid size specified: {}", v))?; + quotas.max_size = Some(bs.as_u64()); + } + _ => (), + } + + match query.max_objects.as_ref().map(String::as_ref) { + Some("none") => quotas.max_objects = None, + Some(v) => { + let mo = v + .parse::<u64>() + .ok_or_bad_request(format!("Invalid number specified: {}", v))?; + quotas.max_objects = Some(mo); + } + _ => (), + } + + bucket_state.quotas.update(quotas); + self.garage.bucket_table.insert(&bucket).await?; + + Ok(AdminRpc::Ok(format!( + "Quotas updated for {}", + &query.bucket + ))) + } + async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> { match cmd { KeyOperation::List => self.handle_list_keys().await, @@ -619,7 +693,7 @@ impl AdminRpcHandler { ))) } } else { - let repair = Repair { + let repair = OnlineRepair { garage: self.garage.clone(), }; self.garage diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index b2dd8f14..3a0bd956 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -169,8 +169,12 @@ pub async fn cmd_admin( AdminRpc::BucketList(bl) => { print_bucket_list(bl); } - AdminRpc::BucketInfo(bucket, rk) => { - print_bucket_info(&bucket, &rk); + AdminRpc::BucketInfo { + bucket, + relevant_keys, + counters, + } => { + print_bucket_info(&bucket, &relevant_keys, &counters); } AdminRpc::KeyList(kl) => { print_key_list(kl); diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index a0c49aeb..4f2efe19 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -33,10 +33,15 @@ pub enum Command { #[structopt(name = "migrate")] Migrate(MigrateOpt), - /// Start repair of node data + /// Start repair of node data on remote node #[structopt(name = "repair")] Repair(RepairOpt), + /// Offline reparation of node data (these repairs must be run offline + /// directly on the server node) + #[structopt(name = "offline-repair")] + OfflineRepair(OfflineRepairOpt), + /// Gather node statistics #[structopt(name = "stats")] Stats(StatsOpt), @@ -175,6 +180,10 @@ pub enum BucketOperation { /// Expose as website or not #[structopt(name = "website")] Website(WebsiteOpt), + + /// Set the quotas for this bucket + #[structopt(name = "set-quotas")] + SetQuotas(SetQuotasOpt), } #[derive(Serialize, Deserialize, StructOpt, Debug)] @@ -262,6 +271,21 @@ pub struct PermBucketOpt { } #[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct SetQuotasOpt { + /// Bucket name + pub bucket: String, + + /// Set a maximum size for the bucket (specify a size e.g. in MiB or GiB, + /// or `none` for no size restriction) + #[structopt(long = "max-size")] + pub max_size: Option<String>, + + /// Set a maximum number of objects for the bucket (or `none` for no restriction) + #[structopt(long = "max-objects")] + pub max_objects: Option<String>, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] pub enum KeyOperation { /// List keys #[structopt(name = "list")] @@ -406,6 +430,27 @@ pub enum RepairWhat { } #[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +pub struct OfflineRepairOpt { + /// Confirm the launch of the repair operation + #[structopt(long = "yes")] + pub yes: bool, + + #[structopt(subcommand)] + pub what: OfflineRepairWhat, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum OfflineRepairWhat { + /// Repair K2V item counters + #[cfg(feature = "k2v")] + #[structopt(name = "k2v_item_counters")] + K2VItemCounters, + /// Repair object counters + #[structopt(name = "object_counters")] + ObjectCounters, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct StatsOpt { /// Gather statistics from all nodes #[structopt(short = "a", long = "all-nodes")] diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 6d73be3a..329e8a3e 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -7,6 +7,7 @@ use garage_util::formater::format_table; use garage_model::bucket_table::*; use garage_model::key_table::*; +use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS}; pub fn print_bucket_list(bl: Vec<Bucket>) { println!("List of buckets:"); @@ -29,11 +30,12 @@ pub fn print_bucket_list(bl: Vec<Bucket>) { [((k, n), _, _)] => format!("{}:{}", k, n), s => format!("[{} local aliases]", s.len()), }; + table.push(format!( "\t{}\t{}\t{}", aliases.join(","), local_aliases_n, - hex::encode(bucket.id) + hex::encode(bucket.id), )); } format_table(table); @@ -121,7 +123,11 @@ pub fn print_key_info(key: &Key, relevant_buckets: &HashMap<Uuid, Bucket>) { } } -pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>) { +pub fn print_bucket_info( + bucket: &Bucket, + relevant_keys: &HashMap<String, Key>, + counters: &HashMap<String, i64>, +) { let key_name = |k| { relevant_keys .get(k) @@ -133,7 +139,42 @@ pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>) match &bucket.state { Deletable::Deleted => println!("Bucket is deleted."), Deletable::Present(p) => { - println!("Website access: {}", p.website_config.get().is_some()); + let size = + bytesize::ByteSize::b(counters.get(BYTES).cloned().unwrap_or_default() as u64); + println!( + "\nSize: {} ({})", + size.to_string_as(true), + size.to_string_as(false) + ); + println!( + "Objects: {}", + counters.get(OBJECTS).cloned().unwrap_or_default() + ); + println!( + "Unfinished multipart uploads: {}", + counters + .get(UNFINISHED_UPLOADS) + .cloned() + .unwrap_or_default() + ); + + println!("\nWebsite access: {}", p.website_config.get().is_some()); + + let quotas = p.quotas.get(); + if quotas.max_size.is_some() || quotas.max_objects.is_some() { + println!("\nQuotas:"); + if let Some(ms) = quotas.max_size { + let ms = bytesize::ByteSize::b(ms); + println!( + " maximum size: {} ({})", + ms.to_string_as(true), + ms.to_string_as(false) + ); + } + if let Some(mo) = quotas.max_objects { + println!(" maximum number of objects: {}", mo); + } + } println!("\nGlobal aliases:"); for (alias, _, active) in p.aliases.items().iter() { diff --git a/src/garage/main.rs b/src/garage/main.rs index bd09b6ea..3fa5c3c0 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -61,17 +61,17 @@ async fn main() { pretty_env_logger::init(); sodiumoxide::init().expect("Unable to init sodiumoxide"); - let opt = Opt::from_args(); + // Abort on panic (same behavior as in Go) + std::panic::set_hook(Box::new(|panic_info| { + error!("{}", panic_info.to_string()); + std::process::abort(); + })); + let opt = Opt::from_args(); let res = match opt.cmd { - Command::Server => { - // Abort on panic (same behavior as in Go) - std::panic::set_hook(Box::new(|panic_info| { - error!("{}", panic_info.to_string()); - std::process::abort(); - })); - - server::run_server(opt.config_file).await + Command::Server => server::run_server(opt.config_file).await, + Command::OfflineRepair(repair_opt) => { + repair::offline::offline_repair(opt.config_file, repair_opt).await } Command::Node(NodeOperation::NodeId(node_id_opt)) => { node_id_command(opt.config_file, node_id_opt.quiet) diff --git a/src/garage/repair/mod.rs b/src/garage/repair/mod.rs new file mode 100644 index 00000000..4699ace5 --- /dev/null +++ b/src/garage/repair/mod.rs @@ -0,0 +1,2 @@ +pub mod offline; +pub mod online; diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs new file mode 100644 index 00000000..7760a8bd --- /dev/null +++ b/src/garage/repair/offline.rs @@ -0,0 +1,55 @@ +use std::path::PathBuf; + +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::config::*; +use garage_util::error::*; + +use garage_model::garage::Garage; + +use crate::cli::structs::*; + +pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Result<(), Error> { + if !opt.yes { + return Err(Error::Message( + "Please add the --yes flag to launch repair operation".into(), + )); + } + + info!("Loading configuration..."); + let config = read_config(config_file)?; + + info!("Initializing background runner..."); + let (done_tx, done_rx) = watch::channel(false); + let (background, await_background_done) = BackgroundRunner::new(16, done_rx); + + info!("Initializing Garage main data store..."); + let garage = Garage::new(config.clone(), background)?; + + info!("Launching repair operation..."); + match opt.what { + #[cfg(feature = "k2v")] + OfflineRepairWhat::K2VItemCounters => { + garage + .k2v + .counter_table + .offline_recount_all(&garage.k2v.item_table)?; + } + OfflineRepairWhat::ObjectCounters => { + garage + .object_counter_table + .offline_recount_all(&garage.object_table)?; + } + } + + info!("Repair operation finished, shutting down Garage internals..."); + done_tx.send(true).unwrap(); + drop(garage); + + await_background_done.await?; + + info!("Cleaning up..."); + + Ok(()) +} diff --git a/src/garage/repair.rs b/src/garage/repair/online.rs index 17e14b8b..d6a71742 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair/online.rs @@ -11,11 +11,11 @@ use garage_util::error::Error; use crate::*; -pub struct Repair { +pub struct OnlineRepair { pub garage: Arc<Garage>, } -impl Repair { +impl OnlineRepair { pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) { if let Err(e) = self.repair_worker_aux(opt, must_exit).await { warn!("Repair worker failed with error: {}", e); diff --git a/src/garage/server.rs b/src/garage/server.rs index 7aa6185f..6321357a 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -2,8 +2,6 @@ use std::path::PathBuf; use tokio::sync::watch; -use garage_db as db; - use garage_util::background::*; use garage_util::config::*; use garage_util::error::Error; @@ -29,57 +27,14 @@ async fn wait_from(mut chan: watch::Receiver<bool>) { pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Loading configuration..."); - let config = read_config(config_file).expect("Unable to read config file"); - - info!("Opening database..."); - let mut db_path = config.metadata_dir.clone(); - std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory"); - let db = match config.db_engine.as_str() { - "sled" => { - db_path.push("db"); - info!("Opening Sled database at: {}", db_path.display()); - let db = db::sled_adapter::sled::Config::default() - .path(&db_path) - .cache_capacity(config.sled_cache_capacity) - .flush_every_ms(Some(config.sled_flush_every_ms)) - .open() - .expect("Unable to open sled DB"); - db::sled_adapter::SledDb::init(db) - } - "sqlite" | "sqlite3" | "rusqlite" => { - db_path.push("db.sqlite"); - info!("Opening Sqlite database at: {}", db_path.display()); - let db = db::sqlite_adapter::rusqlite::Connection::open(db_path) - .expect("Unable to open sqlite DB"); - db::sqlite_adapter::SqliteDb::init(db) - } - "lmdb" | "heed" => { - db_path.push("db.lmdb"); - info!("Opening LMDB database at: {}", db_path.display()); - std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory"); - let map_size = garage_db::lmdb_adapter::recommended_map_size(); - - let db = db::lmdb_adapter::heed::EnvOpenOptions::new() - .max_dbs(100) - .map_size(map_size) - .open(&db_path) - .expect("Unable to open LMDB DB"); - db::lmdb_adapter::LmdbDb::init(db) - } - e => { - return Err(Error::Message(format!( - "Unsupported DB engine: {} (options: sled, sqlite, lmdb)", - e - ))); - } - }; + let config = read_config(config_file)?; info!("Initializing background runner..."); let watch_cancel = netapp::util::watch_ctrl_c(); let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone()); info!("Initializing Garage main data store..."); - let garage = Garage::new(config.clone(), db, background); + let garage = Garage::new(config.clone(), background)?; info!("Initialize tracing..."); if let Some(export_to) = config.admin.trace_sink { @@ -89,6 +44,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Initialize Admin API server and metrics collector..."); let admin_server = AdminApiServer::new(garage.clone()); + info!("Launching internal Garage cluster communications..."); let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone())); info!("Create admin RPC handler..."); diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 7c7b9f30..130eb6a6 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -use garage_table::crdt::Crdt; +use garage_table::crdt::*; use garage_table::*; use garage_util::data::*; use garage_util::time::*; @@ -44,6 +44,9 @@ pub struct BucketParams { pub website_config: crdt::Lww<Option<WebsiteConfig>>, /// CORS rules pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>, + /// Bucket quotas + #[serde(default)] + pub quotas: crdt::Lww<BucketQuotas>, } #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] @@ -62,6 +65,18 @@ pub struct CorsRule { pub expose_headers: Vec<String>, } +#[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct BucketQuotas { + /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket) + pub max_size: Option<u64>, + /// Maximum number of non-deleted objects in the bucket + pub max_objects: Option<u64>, +} + +impl AutoCrdt for BucketQuotas { + const WARN_IF_DIFFERENT: bool = true; +} + impl BucketParams { /// Create an empty BucketParams with no authorized keys and no website accesss pub fn new() -> Self { @@ -72,6 +87,7 @@ impl BucketParams { local_aliases: crdt::LwwMap::new(), website_config: crdt::Lww::new(None), cors_config: crdt::Lww::new(None), + quotas: crdt::Lww::new(BucketQuotas::default()), } } } @@ -86,6 +102,7 @@ impl Crdt for BucketParams { self.website_config.merge(&o.website_config); self.cors_config.merge(&o.cors_config); + self.quotas.merge(&o.quotas); } } diff --git a/src/model/garage.rs b/src/model/garage.rs index 280f3dc7..15769a17 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -6,6 +6,7 @@ use garage_db as db; use garage_util::background::*; use garage_util::config::*; +use garage_util::error::Error; use garage_rpc::system::System; @@ -22,12 +23,11 @@ use crate::s3::version_table::*; use crate::bucket_alias_table::*; use crate::bucket_table::*; use crate::helper; +use crate::index_counter::*; use crate::key_table::*; #[cfg(feature = "k2v")] -use crate::index_counter::*; -#[cfg(feature = "k2v")] -use crate::k2v::{counter_table::*, item_table::*, poll::*, rpc::*}; +use crate::k2v::{item_table::*, poll::*, rpc::*}; /// An entire Garage full of data pub struct Garage { @@ -52,6 +52,8 @@ pub struct Garage { /// Table containing S3 objects pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>, + /// Counting table containing object counters + pub object_counter_table: Arc<IndexCounter<Object>>, /// Table containing S3 object versions pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, /// Table containing S3 block references (not blocks themselves) @@ -66,14 +68,57 @@ pub struct GarageK2V { /// Table containing K2V items pub item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, /// Indexing table containing K2V item counters - pub counter_table: Arc<IndexCounter<K2VCounterTable>>, + pub counter_table: Arc<IndexCounter<K2VItem>>, /// K2V RPC handler pub rpc: Arc<K2VRpcHandler>, } impl Garage { /// Create and run garage - pub fn new(config: Config, db: db::Db, background: Arc<BackgroundRunner>) -> Arc<Self> { + pub fn new(config: Config, background: Arc<BackgroundRunner>) -> Result<Arc<Self>, Error> { + info!("Opening database..."); + let mut db_path = config.metadata_dir.clone(); + std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory"); + let db = match config.db_engine.as_str() { + "sled" => { + db_path.push("db"); + info!("Opening Sled database at: {}", db_path.display()); + let db = db::sled_adapter::sled::Config::default() + .path(&db_path) + .cache_capacity(config.sled_cache_capacity) + .flush_every_ms(Some(config.sled_flush_every_ms)) + .open() + .expect("Unable to open sled DB"); + db::sled_adapter::SledDb::init(db) + } + "sqlite" | "sqlite3" | "rusqlite" => { + db_path.push("db.sqlite"); + info!("Opening Sqlite database at: {}", db_path.display()); + let db = db::sqlite_adapter::rusqlite::Connection::open(db_path) + .expect("Unable to open sqlite DB"); + db::sqlite_adapter::SqliteDb::init(db) + } + "lmdb" | "heed" => { + db_path.push("db.lmdb"); + info!("Opening LMDB database at: {}", db_path.display()); + std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory"); + let map_size = garage_db::lmdb_adapter::recommended_map_size(); + + let db = db::lmdb_adapter::heed::EnvOpenOptions::new() + .max_dbs(100) + .map_size(map_size) + .open(&db_path) + .expect("Unable to open LMDB DB"); + db::lmdb_adapter::LmdbDb::init(db) + } + e => { + return Err(Error::Message(format!( + "Unsupported DB engine: {} (options: sled, sqlite, lmdb)", + e + ))); + } + }; + let network_key = NetworkKey::from_slice( &hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..], ) @@ -155,12 +200,16 @@ impl Garage { &db, ); + info!("Initialize object counter table..."); + let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db); + info!("Initialize object_table..."); #[allow(clippy::redundant_clone)] let object_table = Table::new( ObjectTable { background: background.clone(), version_table: version_table.clone(), + object_counter_table: object_counter_table.clone(), }, meta_rep_param.clone(), system.clone(), @@ -171,9 +220,8 @@ impl Garage { #[cfg(feature = "k2v")] let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param); - info!("Initialize Garage..."); - - Arc::new(Self { + // -- done -- + Ok(Arc::new(Self { config, db, background, @@ -183,11 +231,12 @@ impl Garage { bucket_alias_table, key_table, object_table, + object_counter_table, version_table, block_ref_table, #[cfg(feature = "k2v")] k2v, - }) + })) } pub fn bucket_helper(&self) -> helper::bucket::BucketHelper { diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 2602d5d9..36e8172b 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -1,3 +1,4 @@ +use core::ops::Bound; use std::collections::{hash_map, BTreeMap, HashMap}; use std::marker::PhantomData; use std::sync::Arc; @@ -12,30 +13,36 @@ use garage_rpc::ring::Ring; use garage_rpc::system::System; use garage_util::data::*; use garage_util::error::*; +use garage_util::time::*; use garage_table::crdt::*; -use garage_table::replication::TableShardedReplication; +use garage_table::replication::*; use garage_table::*; -pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static { - const NAME: &'static str; - type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; - type S: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; +pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static { + const COUNTER_TABLE_NAME: &'static str; + + type CP: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type CS: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + + fn counter_partition_key(&self) -> &Self::CP; + fn counter_sort_key(&self) -> &Self::CS; + fn counts(&self) -> Vec<(&'static str, i64)>; } /// A counter entry in the global table -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct CounterEntry<T: CounterSchema> { - pub pk: T::P, - pub sk: T::S, +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +pub struct CounterEntry<T: CountedItem> { + pub pk: T::CP, + pub sk: T::CS, pub values: BTreeMap<String, CounterValue>, } -impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> { - fn partition_key(&self) -> &T::P { +impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> { + fn partition_key(&self) -> &T::CP { &self.pk } - fn sort_key(&self) -> &T::S { + fn sort_key(&self) -> &T::CS { &self.sk } fn is_tombstone(&self) -> bool { @@ -45,7 +52,7 @@ impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> { } } -impl<T: CounterSchema> CounterEntry<T> { +impl<T: CountedItem> CounterEntry<T> { pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> { let nodes = &ring.layout.node_id_vec[..]; self.filtered_values_with_nodes(nodes) @@ -78,7 +85,7 @@ pub struct CounterValue { pub node_values: BTreeMap<Uuid, (u64, i64)>, } -impl<T: CounterSchema> Crdt for CounterEntry<T> { +impl<T: CountedItem> Crdt for CounterEntry<T> { fn merge(&mut self, other: &Self) { for (name, e2) in other.values.iter() { if let Some(e) = self.values.get_mut(name) { @@ -104,15 +111,15 @@ impl Crdt for CounterValue { } } -pub struct CounterTable<T: CounterSchema> { +pub struct CounterTable<T: CountedItem> { _phantom_t: PhantomData<T>, } -impl<T: CounterSchema> TableSchema for CounterTable<T> { - const TABLE_NAME: &'static str = T::NAME; +impl<T: CountedItem> TableSchema for CounterTable<T> { + const TABLE_NAME: &'static str = T::COUNTER_TABLE_NAME; - type P = T::P; - type S = T::S; + type P = T::CP; + type S = T::CS; type E = CounterEntry<T>; type Filter = (DeletedFilter, Vec<Uuid>); @@ -131,14 +138,14 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> { // ---- -pub struct IndexCounter<T: CounterSchema> { +pub struct IndexCounter<T: CountedItem> { this_node: Uuid, local_counter: db::Tree, - propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>, + propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry<T>)>, pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>, } -impl<T: CounterSchema> IndexCounter<T> { +impl<T: CountedItem> IndexCounter<T> { pub fn new( system: Arc<System>, replication: TableShardedReplication, @@ -151,7 +158,7 @@ impl<T: CounterSchema> IndexCounter<T> { let this = Arc::new(Self { this_node: system.id, local_counter: db - .open_tree(format!("local_counter:{}", T::NAME)) + .open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME)) .expect("Unable to open local counter tree"), propagate_tx, table: Table::new( @@ -166,7 +173,7 @@ impl<T: CounterSchema> IndexCounter<T> { let this2 = this.clone(); background.spawn_worker( - format!("{} index counter propagator", T::NAME), + format!("{} index counter propagator", T::COUNTER_TABLE_NAME), move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit), ); this @@ -175,24 +182,45 @@ impl<T: CounterSchema> IndexCounter<T> { pub fn count( &self, tx: &mut db::Transaction, - pk: &T::P, - sk: &T::S, - counts: &[(&str, i64)], + old: Option<&T>, + new: Option<&T>, ) -> db::TxResult<(), Error> { + let pk = old + .map(|e| e.counter_partition_key()) + .unwrap_or_else(|| new.unwrap().counter_partition_key()); + let sk = old + .map(|e| e.counter_sort_key()) + .unwrap_or_else(|| new.unwrap().counter_sort_key()); + + // calculate counter differences + let mut counts = HashMap::new(); + for (k, v) in old.map(|x| x.counts()).unwrap_or_default() { + *counts.entry(k).or_insert(0) -= v; + } + for (k, v) in new.map(|x| x.counts()).unwrap_or_default() { + *counts.entry(k).or_insert(0) += v; + } + + // update local counter table let tree_key = self.table.data.tree_key(pk, sk); let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { - Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) - .map_err(Error::RmpDecode) - .map_err(db::TxError::Abort)?, + Some(old_bytes) => { + rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes) + .map_err(Error::RmpDecode) + .map_err(db::TxError::Abort)? + } None => LocalCounterEntry { + pk: pk.clone(), + sk: sk.clone(), values: BTreeMap::new(), }, }; + let now = now_msec(); for (s, inc) in counts.iter() { let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); - ent.0 += 1; + ent.0 = std::cmp::max(ent.0 + 1, now); ent.1 += *inc; } @@ -213,7 +241,7 @@ impl<T: CounterSchema> IndexCounter<T> { async fn propagate_loop( self: Arc<Self>, - mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>, + mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>, must_exit: watch::Receiver<bool>, ) { // This loop batches updates to counters to be sent all at once. @@ -236,7 +264,7 @@ impl<T: CounterSchema> IndexCounter<T> { if let Some((pk, sk, counters)) = ent { let tree_key = self.table.data.tree_key(&pk, &sk); - let dist_entry = counters.into_counter_entry::<T>(self.this_node, pk, sk); + let dist_entry = counters.into_counter_entry(self.this_node); match buf.entry(tree_key) { hash_map::Entry::Vacant(e) => { e.insert(dist_entry); @@ -255,10 +283,10 @@ impl<T: CounterSchema> IndexCounter<T> { if let Err(e) = self.table.insert_many(entries).await { errors += 1; if errors >= 2 && *must_exit.borrow() { - error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e); + error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e); break; } - warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors); + warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors); tokio::time::sleep(Duration::from_secs(5)).await; continue; } @@ -272,23 +300,155 @@ impl<T: CounterSchema> IndexCounter<T> { } } } + + pub fn offline_recount_all<TS, TR>( + &self, + counted_table: &Arc<Table<TS, TR>>, + ) -> Result<(), Error> + where + TS: TableSchema<E = T>, + TR: TableReplication, + { + let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> { + let entry_k = self + .table + .data + .tree_key(entry.partition_key(), entry.sort_key()); + self.table + .data + .update_entry_with(&entry_k, |ent| match ent { + Some(mut ent) => { + ent.merge(&entry); + ent + } + None => entry.clone(), + })?; + Ok(()) + }; + + // 1. Set all old local counters to zero + let now = now_msec(); + let mut next_start: Option<Vec<u8>> = None; + loop { + let low_bound = match next_start.take() { + Some(v) => Bound::Excluded(v), + None => Bound::Unbounded, + }; + let mut batch = vec![]; + for item in self.local_counter.range((low_bound, Bound::Unbounded))? { + batch.push(item?); + if batch.len() > 1000 { + break; + } + } + + if batch.is_empty() { + break; + } + + info!("zeroing old counters... ({})", hex::encode(&batch[0].0)); + for (local_counter_k, local_counter) in batch { + let mut local_counter = + rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?; + + for (_, tv) in local_counter.values.iter_mut() { + tv.0 = std::cmp::max(tv.0 + 1, now); + tv.1 = 0; + } + + let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + self.local_counter + .insert(&local_counter_k, &local_counter_bytes)?; + + let counter_entry = local_counter.into_counter_entry(self.this_node); + save_counter_entry(counter_entry)?; + + next_start = Some(local_counter_k); + } + } + + // 2. Recount all table entries + let now = now_msec(); + let mut next_start: Option<Vec<u8>> = None; + loop { + let low_bound = match next_start.take() { + Some(v) => Bound::Excluded(v), + None => Bound::Unbounded, + }; + let mut batch = vec![]; + for item in counted_table + .data + .store + .range((low_bound, Bound::Unbounded))? + { + batch.push(item?); + if batch.len() > 1000 { + break; + } + } + + if batch.is_empty() { + break; + } + + info!("counting entries... ({})", hex::encode(&batch[0].0)); + for (counted_entry_k, counted_entry) in batch { + let counted_entry = counted_table.data.decode_entry(&counted_entry)?; + + let pk = counted_entry.counter_partition_key(); + let sk = counted_entry.counter_sort_key(); + let counts = counted_entry.counts(); + + let local_counter_key = self.table.data.tree_key(pk, sk); + let mut local_counter = match self.local_counter.get(&local_counter_key)? { + Some(old_bytes) => { + let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>( + &old_bytes, + )?; + assert!(ent.pk == *pk); + assert!(ent.sk == *sk); + ent + } + None => LocalCounterEntry { + pk: pk.clone(), + sk: sk.clone(), + values: BTreeMap::new(), + }, + }; + for (s, v) in counts.iter() { + let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0)); + tv.0 = std::cmp::max(tv.0 + 1, now); + tv.1 += v; + } + + let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + self.local_counter + .insert(&local_counter_key, local_counter_bytes)?; + + let counter_entry = local_counter.into_counter_entry(self.this_node); + save_counter_entry(counter_entry)?; + + next_start = Some(counted_entry_k); + } + } + + // Done + Ok(()) + } } #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -struct LocalCounterEntry { +struct LocalCounterEntry<T: CountedItem> { + pk: T::CP, + sk: T::CS, values: BTreeMap<String, (u64, i64)>, } -impl LocalCounterEntry { - fn into_counter_entry<T: CounterSchema>( - self, - this_node: Uuid, - pk: T::P, - sk: T::S, - ) -> CounterEntry<T> { +impl<T: CountedItem> LocalCounterEntry<T> { + fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> { CounterEntry { - pk, - sk, + pk: self.pk, + sk: self.sk, values: self .values .into_iter() diff --git a/src/model/k2v/counter_table.rs b/src/model/k2v/counter_table.rs deleted file mode 100644 index 4856eb2b..00000000 --- a/src/model/k2v/counter_table.rs +++ /dev/null @@ -1,20 +0,0 @@ -use garage_util::data::*; - -use crate::index_counter::*; - -pub const ENTRIES: &str = "entries"; -pub const CONFLICTS: &str = "conflicts"; -pub const VALUES: &str = "values"; -pub const BYTES: &str = "bytes"; - -#[derive(PartialEq, Clone)] -pub struct K2VCounterTable; - -impl CounterSchema for K2VCounterTable { - const NAME: &'static str = "k2v_index_counter"; - - // Partition key = bucket id - type P = Uuid; - // Sort key = K2V item's partition key - type S = String; -} diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 991fe66d..baa1db4b 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -10,9 +10,13 @@ use garage_table::*; use crate::index_counter::*; use crate::k2v::causality::*; -use crate::k2v::counter_table::*; use crate::k2v::poll::*; +pub const ENTRIES: &str = "entries"; +pub const CONFLICTS: &str = "conflicts"; +pub const VALUES: &str = "values"; +pub const BYTES: &str = "bytes"; + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct K2VItem { pub partition: K2VItemPartition, @@ -112,27 +116,6 @@ impl K2VItem { ent.discard(); } } - - // returns counters: (non-deleted entries, conflict entries, non-tombstone values, bytes used) - fn stats(&self) -> (i64, i64, i64, i64) { - let values = self.values(); - - let n_entries = if self.is_tombstone() { 0 } else { 1 }; - let n_conflicts = if values.len() > 1 { 1 } else { 0 }; - let n_values = values - .iter() - .filter(|v| matches!(v, DvvsValue::Value(_))) - .count() as i64; - let n_bytes = values - .iter() - .map(|v| match v { - DvvsValue::Deleted => 0, - DvvsValue::Value(v) => v.len() as i64, - }) - .sum(); - - (n_entries, n_conflicts, n_values, n_bytes) - } } impl DvvsEntry { @@ -204,7 +187,7 @@ impl Entry<K2VItemPartition, String> for K2VItem { } pub struct K2VItemTable { - pub(crate) counter_table: Arc<IndexCounter<K2VCounterTable>>, + pub(crate) counter_table: Arc<IndexCounter<K2VItem>>, pub(crate) subscriptions: Arc<SubscriptionManager>, } @@ -229,40 +212,14 @@ impl TableSchema for K2VItemTable { new: Option<&Self::E>, ) -> db::TxOpResult<()> { // 1. Count - let (old_entries, old_conflicts, old_values, old_bytes) = match old { - None => (0, 0, 0, 0), - Some(e) => e.stats(), - }; - let (new_entries, new_conflicts, new_values, new_bytes) = match new { - None => (0, 0, 0, 0), - Some(e) => e.stats(), - }; - - let count_pk = old - .map(|e| e.partition.bucket_id) - .unwrap_or_else(|| new.unwrap().partition.bucket_id); - let count_sk = old - .map(|e| &e.partition.partition_key) - .unwrap_or_else(|| &new.unwrap().partition.partition_key); - - let counter_res = self.counter_table.count( - tx, - &count_pk, - count_sk, - &[ - (ENTRIES, new_entries - old_entries), - (CONFLICTS, new_conflicts - old_conflicts), - (VALUES, new_values - old_values), - (BYTES, new_bytes - old_bytes), - ], - ); + let counter_res = self.counter_table.count(tx, old, new); if let Err(e) = db::unabort(counter_res)? { // This result can be returned by `counter_table.count()` for instance // if messagepack serialization or deserialization fails at some step. // Warn admin but ignore this error for now, that's all we can do. error!( - "Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!", - count_pk, count_sk, e + "Unable to update K2V item counter: {}. Index values will be wrong!", + e ); } @@ -282,6 +239,47 @@ impl TableSchema for K2VItemTable { } } +impl CountedItem for K2VItem { + const COUNTER_TABLE_NAME: &'static str = "k2v_index_counter_v2"; + + // Partition key = bucket id + type CP = Uuid; + // Sort key = K2V item's partition key + type CS = String; + + fn counter_partition_key(&self) -> &Uuid { + &self.partition.bucket_id + } + fn counter_sort_key(&self) -> &String { + &self.partition.partition_key + } + + fn counts(&self) -> Vec<(&'static str, i64)> { + let values = self.values(); + + let n_entries = if self.is_tombstone() { 0 } else { 1 }; + let n_conflicts = if values.len() > 1 { 1 } else { 0 }; + let n_values = values + .iter() + .filter(|v| matches!(v, DvvsValue::Value(_))) + .count() as i64; + let n_bytes = values + .iter() + .map(|v| match v { + DvvsValue::Deleted => 0, + DvvsValue::Value(v) => v.len() as i64, + }) + .sum(); + + vec![ + (ENTRIES, n_entries), + (CONFLICTS, n_conflicts), + (VALUES, n_values), + (BYTES, n_bytes), + ] + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs index 664172a6..f6a96151 100644 --- a/src/model/k2v/mod.rs +++ b/src/model/k2v/mod.rs @@ -1,6 +1,5 @@ pub mod causality; -pub mod counter_table; pub mod item_table; pub mod poll; diff --git a/src/model/migrate.rs b/src/model/migrate.rs index 25acb4b0..5fc67069 100644 --- a/src/model/migrate.rs +++ b/src/model/migrate.rs @@ -77,6 +77,7 @@ impl Migrate { local_aliases: LwwMap::new(), website_config: Lww::new(website), cors_config: Lww::new(None), + quotas: Lww::new(Default::default()), }), }) .await?; diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 62f5d8d9..a3914c36 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -11,10 +11,15 @@ use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; use garage_table::*; +use crate::index_counter::*; use crate::s3::version_table::*; use garage_model_050::object_table as old; +pub const OBJECTS: &str = "objects"; +pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads"; +pub const BYTES: &str = "bytes"; + /// An object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { @@ -218,6 +223,7 @@ impl Crdt for Object { pub struct ObjectTable { pub background: Arc<BackgroundRunner>, pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, + pub object_counter_table: Arc<IndexCounter<Object>>, } #[derive(Clone, Copy, Debug, Serialize, Deserialize)] @@ -236,10 +242,20 @@ impl TableSchema for ObjectTable { fn updated( &self, - _tx: &mut db::Transaction, + tx: &mut db::Transaction, old: Option<&Self::E>, new: Option<&Self::E>, ) -> db::TxOpResult<()> { + // 1. Count + let counter_res = self.object_counter_table.count(tx, old, new); + if let Err(e) = db::unabort(counter_res)? { + error!( + "Unable to update object counter: {}. Index values will be wrong!", + e + ); + } + + // 2. Spawn threads that propagates deletions to version table let version_table = self.version_table.clone(); let old = old.cloned(); let new = new.cloned(); @@ -283,6 +299,49 @@ impl TableSchema for ObjectTable { } } +impl CountedItem for Object { + const COUNTER_TABLE_NAME: &'static str = "bucket_object_counter"; + + // Partition key = bucket id + type CP = Uuid; + // Sort key = nothing + type CS = EmptyKey; + + fn counter_partition_key(&self) -> &Uuid { + &self.bucket_id + } + fn counter_sort_key(&self) -> &EmptyKey { + &EmptyKey + } + + fn counts(&self) -> Vec<(&'static str, i64)> { + let versions = self.versions(); + let n_objects = if versions.iter().any(|v| v.is_data()) { + 1 + } else { + 0 + }; + let n_unfinished_uploads = versions + .iter() + .filter(|v| matches!(v.state, ObjectVersionState::Uploading(_))) + .count(); + let n_bytes = versions + .iter() + .map(|v| match &v.state { + ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) + | ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta.size, + _ => 0, + }) + .sum::<u64>(); + + vec![ + (OBJECTS, n_objects), + (UNFINISHED_UPLOADS, n_unfinished_uploads as i64), + (BYTES, n_bytes as i64), + ] + } +} + // vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv // (we just want to change bucket into bucket_id by hashing it) |