diff options
-rw-r--r-- | src/api/s3_copy.rs | 10 | ||||
-rw-r--r-- | src/api/s3_delete.rs | 10 | ||||
-rw-r--r-- | src/api/s3_get.rs | 53 | ||||
-rw-r--r-- | src/api/s3_list.rs | 14 | ||||
-rw-r--r-- | src/api/s3_put.rs | 95 | ||||
-rw-r--r-- | src/model/object_table.rs | 177 | ||||
-rw-r--r-- | src/table/lib.rs | 2 | ||||
-rw-r--r-- | src/table/schema.rs | 11 | ||||
-rw-r--r-- | src/table/table.rs | 23 |
9 files changed, 209 insertions, 186 deletions
diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs index b54701e2..db790d95 100644 --- a/src/api/s3_copy.rs +++ b/src/api/s3_copy.rs @@ -39,16 +39,16 @@ pub async fn handle_copy( Some(v) => v, None => return Err(Error::NotFound), }; - let source_last_state = match &source_last_v.state { - ObjectVersionState::Complete(x) => x, - _ => unreachable!(), - }; + let source_last_state = match &source_last_v.state { + ObjectVersionState::Complete(x) => x, + _ => unreachable!(), + }; let new_uuid = gen_uuid(); let dest_object_version = ObjectVersion { uuid: new_uuid, timestamp: now_msec(), - state: ObjectVersionState::Complete(source_last_state.clone()), + state: ObjectVersionState::Complete(source_last_state.clone()), }; match &source_last_state { diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs index c5cd5970..42216f51 100644 --- a/src/api/s3_delete.rs +++ b/src/api/s3_delete.rs @@ -28,12 +28,10 @@ async fn handle_delete_internal( Some(o) => o, }; - let interesting_versions = object.versions().iter().filter(|v| { - match v.state { - ObjectVersionState::Aborted => false, - ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false, - _ => true, - } + let interesting_versions = object.versions().iter().filter(|v| match v.state { + ObjectVersionState::Aborted => false, + ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false, + _ => true, }); let mut must_delete = None; diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs index 25b3d3e3..a3a20d49 100644 --- a/src/api/s3_get.rs +++ b/src/api/s3_get.rs @@ -12,12 +12,19 @@ use garage_table::EmptyKey; use garage_model::garage::Garage; use garage_model::object_table::*; -fn object_headers(version: &ObjectVersion, version_meta: &ObjectVersionMeta) -> http::response::Builder { +fn object_headers( + version: &ObjectVersion, + version_meta: &ObjectVersionMeta, +) -> http::response::Builder { let date = UNIX_EPOCH + Duration::from_millis(version.timestamp); let date_str = httpdate::fmt_http_date(date); Response::builder() - .header("Content-Type", version_meta.headers.content_type.to_string()) + .header( + "Content-Type", + version_meta.headers.content_type.to_string(), + ) + // TODO: other headers .header("Content-Length", format!("{}", version_meta.size)) .header("ETag", version_meta.etag.to_string()) .header("Last-Modified", date_str) @@ -48,11 +55,11 @@ pub async fn handle_head( Some(v) => v, None => return Err(Error::NotFound), }; - let version_meta = match &version.state { - ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta, - ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta, - _ => unreachable!(), - }; + let version_meta = match &version.state { + ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta, + ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta, + _ => unreachable!(), + }; let body: Body = Body::from(vec![]); let response = object_headers(&version, version_meta) @@ -87,15 +94,15 @@ pub async fn handle_get( Some(v) => v, None => return Err(Error::NotFound), }; - let last_v_data = match &last_v.state { - ObjectVersionState::Complete(x) => x, - _ => unreachable!(), - }; - let last_v_meta = match last_v_data { - ObjectVersionData::DeleteMarker => return Err(Error::NotFound), - ObjectVersionData::Inline(meta, _) => meta, - ObjectVersionData::FirstBlock(meta, _) => meta, - }; + let last_v_data = match &last_v.state { + ObjectVersionState::Complete(x) => x, + _ => unreachable!(), + }; + let last_v_meta = match last_v_data { + ObjectVersionData::DeleteMarker => return Err(Error::NotFound), + ObjectVersionData::Inline(meta, _) => meta, + ObjectVersionData::FirstBlock(meta, _) => meta, + }; let range = match req.headers().get("range") { Some(range) => { @@ -113,7 +120,15 @@ pub async fn handle_get( None => None, }; if let Some(range) = range { - return handle_get_range(garage, last_v, last_v_data, last_v_meta, range.start, range.start + range.length).await; + return handle_get_range( + garage, + last_v, + last_v_data, + last_v_meta, + range.start, + range.start + range.length, + ) + .await; } let resp_builder = object_headers(&last_v, last_v_meta).status(StatusCode::OK); @@ -167,8 +182,8 @@ pub async fn handle_get( pub async fn handle_get_range( garage: Arc<Garage>, version: &ObjectVersion, - version_data: &ObjectVersionData, - version_meta: &ObjectVersionMeta, + version_data: &ObjectVersionData, + version_meta: &ObjectVersionMeta, begin: u64, end: u64, ) -> Result<Response<Body>, Error> { diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index 3fca1348..0a3b62ec 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -74,11 +74,15 @@ pub async fn handle_list( if let Some(pfx) = common_prefix { result_common_prefixes.insert(pfx.to_string()); } else { - let size = match &version.state { - ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta.size, - ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta.size, - _ => unreachable!(), - }; + let size = match &version.state { + ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => { + meta.size + } + ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => { + meta.size + } + _ => unreachable!(), + }; let info = match result_keys.get(&object.key) { None => ListResultInfo { last_modified: version.timestamp, diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 75622168..9ac7dafd 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -1,4 +1,4 @@ -use std::collections::{VecDeque, BTreeMap}; +use std::collections::{BTreeMap, VecDeque}; use std::fmt::Write; use std::sync::Arc; @@ -24,10 +24,10 @@ pub async fn handle_put( key: &str, ) -> Result<Response<Body>, Error> { let version_uuid = gen_uuid(); - let headers = ObjectVersionHeaders{ - content_type: get_mime_type(&req)?, - other: BTreeMap::new(), // TODO - }; + let headers = ObjectVersionHeaders { + content_type: get_mime_type(&req)?, + other: BTreeMap::new(), // TODO + }; let body = req.into_body(); @@ -44,13 +44,14 @@ pub async fn handle_put( }; if first_block.len() < INLINE_THRESHOLD { - object_version.state = ObjectVersionState::Complete(ObjectVersionData::Inline( - ObjectVersionMeta{ - headers, - size: first_block.len() as u64, - etag: "".to_string(), // TODO - }, - first_block)); + object_version.state = ObjectVersionState::Complete(ObjectVersionData::Inline( + ObjectVersionMeta { + headers, + size: first_block.len() as u64, + etag: "".to_string(), // TODO + }, + first_block, + )); let object = Object::new(bucket.into(), key.into(), vec![object_version]); garage.object_table.insert(&object).await?; @@ -76,12 +77,13 @@ pub async fn handle_put( // TODO: if at any step we have an error, we should undo everything we did object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( - ObjectVersionMeta{ - headers, - size: total_size, - etag: "".to_string(), // TODO - }, - first_block_hash)); + ObjectVersionMeta { + headers, + size: total_size, + etag: "".to_string(), // TODO + }, + first_block_hash, + )); let object = Object::new(bucket.into(), key.into(), vec![object_version]); garage.object_table.insert(&object).await?; @@ -207,7 +209,7 @@ impl BodyChunker { pub fn put_response(version_uuid: UUID) -> Response<Body> { Response::builder() .header("x-amz-version-id", hex::encode(version_uuid)) - // TODO ETag + // TODO ETag .body(Body::from(vec![])) .unwrap() } @@ -219,10 +221,10 @@ pub async fn handle_create_multipart_upload( key: &str, ) -> Result<Response<Body>, Error> { let version_uuid = gen_uuid(); - let headers = ObjectVersionHeaders{ - content_type: get_mime_type(&req)?, - other: BTreeMap::new(), // TODO - }; + let headers = ObjectVersionHeaders { + content_type: get_mime_type(&req)?, + other: BTreeMap::new(), // TODO + }; let object_version = ObjectVersion { uuid: version_uuid, @@ -286,9 +288,11 @@ pub async fn handle_put_part( None => return Err(Error::BadRequest(format!("Object not found"))), Some(x) => x, }; - if !object.versions().iter().any(|v| { - v.uuid == version_uuid && v.is_uploading() - }) { + if !object + .versions() + .iter() + .any(|v| v.uuid == version_uuid && v.is_uploading()) + { return Err(Error::BadRequest(format!( "Multipart upload does not exist or is otherwise invalid" ))); @@ -330,9 +334,10 @@ pub async fn handle_complete_multipart_upload( None => return Err(Error::BadRequest(format!("Object not found"))), Some(x) => x, }; - let object_version = object.versions().iter().find(|v| { - v.uuid == version_uuid && v.is_uploading() - }); + let object_version = object + .versions() + .iter() + .find(|v| v.uuid == version_uuid && v.is_uploading()); let mut object_version = match object_version { None => { return Err(Error::BadRequest(format!( @@ -348,10 +353,10 @@ pub async fn handle_complete_multipart_upload( if version.blocks().len() == 0 { return Err(Error::BadRequest(format!("No data was uploaded"))); } - let headers = match object_version.state { - ObjectVersionState::Uploading(headers) => headers.clone(), - _ => unreachable!(), - }; + let headers = match object_version.state { + ObjectVersionState::Uploading(headers) => headers.clone(), + _ => unreachable!(), + }; // TODO: check that all the parts that they pretend they gave us are indeed there // TODO: check MD5 sum of all uploaded parts? but that would mean we have to store them somewhere... @@ -361,15 +366,14 @@ pub async fn handle_complete_multipart_upload( .iter() .map(|x| x.size) .fold(0, |x, y| x + y); - object_version.state = ObjectVersionState::Complete( - ObjectVersionData::FirstBlock( - ObjectVersionMeta{ - headers, - size: total_size, - etag: "".to_string(),// TODO - }, - version.blocks()[0].hash) - ); + object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( + ObjectVersionMeta { + headers, + size: total_size, + etag: "".to_string(), // TODO + }, + version.blocks()[0].hash, + )); let final_object = Object::new(bucket.clone(), key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; @@ -411,9 +415,10 @@ pub async fn handle_abort_multipart_upload( None => return Err(Error::BadRequest(format!("Object not found"))), Some(x) => x, }; - let object_version = object.versions().iter().find(|v| { - v.uuid == version_uuid && v.is_uploading() - }); + let object_version = object + .versions() + .iter() + .find(|v| v.uuid == version_uuid && v.is_uploading()); let mut object_version = match object_version { None => { return Err(Error::BadRequest(format!( diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 039e985e..719a222c 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use std::sync::Arc; use std::collections::BTreeMap; +use std::sync::Arc; use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -75,23 +75,21 @@ pub enum ObjectVersionState { impl ObjectVersionState { fn merge(&mut self, other: &Self) { use ObjectVersionState::*; - match other { - Aborted => { - *self = Aborted; - } - Complete(b) => { - match self { - Aborted => {}, - Complete(a) => { - a.merge(b); - } - Uploading(_) => { - *self = Complete(b.clone()); - } - } - } - Uploading(_) => {} - } + match other { + Aborted => { + *self = Aborted; + } + Complete(b) => match self { + Aborted => {} + Complete(a) => { + a.merge(b); + } + Uploading(_) => { + *self = Complete(b.clone()); + } + }, + Uploading(_) => {} + } } } @@ -104,47 +102,50 @@ pub enum ObjectVersionData { #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersionMeta { - pub headers: ObjectVersionHeaders, + pub headers: ObjectVersionHeaders, pub size: u64, - pub etag: String, + pub etag: String, } #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersionHeaders { - pub content_type: String, - pub other: BTreeMap<String, String>, + pub content_type: String, + pub other: BTreeMap<String, String>, } impl ObjectVersionData { - fn merge(&mut self, b: &Self) { - if *self != *b { - warn!("Inconsistent object version data: {:?} (local) vs {:?} (remote)", self, b); - } - } + fn merge(&mut self, b: &Self) { + if *self != *b { + warn!( + "Inconsistent object version data: {:?} (local) vs {:?} (remote)", + self, b + ); + } + } } impl ObjectVersion { fn cmp_key(&self) -> (u64, UUID) { (self.timestamp, self.uuid) } - pub fn is_uploading(&self) -> bool { - match self.state { - ObjectVersionState::Uploading(_) => true, - _ => false, - } - } + pub fn is_uploading(&self) -> bool { + match self.state { + ObjectVersionState::Uploading(_) => true, + _ => false, + } + } pub fn is_complete(&self) -> bool { - match self.state { - ObjectVersionState::Complete(_) => true, - _ => false, - } + match self.state { + ObjectVersionState::Complete(_) => true, + _ => false, + } } pub fn is_data(&self) -> bool { - match self.state { - ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false, - ObjectVersionState::Complete(_) => true, - _ => false, - } + match self.state { + ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false, + ObjectVersionState::Complete(_) => true, + _ => false, + } } } @@ -163,7 +164,7 @@ impl Entry<String, String> for Object { .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) { Ok(i) => { - self.versions[i].state.merge(&other_v.state); + self.versions[i].state.merge(&other_v.state); } Err(i) => { self.versions.insert(i, other_v.clone()); @@ -231,50 +232,54 @@ impl TableSchema for ObjectTable { entry.versions.iter().any(|v| v.is_data()) } - fn try_migrate(bytes: &[u8]) -> Option<Self::E> { - let old = match rmp_serde::decode::from_read_ref::<_, prev::Object>(bytes) { - Ok(x) => x, - Err(_) => return None, - }; - let new_v = old.versions().iter() - .map(migrate_version) - .collect::<Vec<_>>(); - let new = Object::new(old.bucket.clone(), old.key.clone(), new_v); - Some(new) - } + fn try_migrate(bytes: &[u8]) -> Option<Self::E> { + let old = match rmp_serde::decode::from_read_ref::<_, prev::Object>(bytes) { + Ok(x) => x, + Err(_) => return None, + }; + let new_v = old + .versions() + .iter() + .map(migrate_version) + .collect::<Vec<_>>(); + let new = Object::new(old.bucket.clone(), old.key.clone(), new_v); + Some(new) + } } fn migrate_version(old: &prev::ObjectVersion) -> ObjectVersion { - let headers = ObjectVersionHeaders{ - content_type: old.mime_type.clone(), - other: BTreeMap::new(), - }; - let meta = ObjectVersionMeta{ - headers: headers.clone(), - size: old.size, - etag: "".to_string(), - }; - let state = match old.state { - prev::ObjectVersionState::Uploading => ObjectVersionState::Uploading(headers), - prev::ObjectVersionState::Aborted => ObjectVersionState::Aborted, - prev::ObjectVersionState::Complete => { - match &old.data { - prev::ObjectVersionData::Uploading => ObjectVersionState::Uploading(headers), - prev::ObjectVersionData::DeleteMarker => ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), - prev::ObjectVersionData::Inline(x) => ObjectVersionState::Complete(ObjectVersionData::Inline(meta, x.clone())), - prev::ObjectVersionData::FirstBlock(h) => { - let mut hash = [0u8; 32]; - hash.copy_from_slice(h.as_ref()); - ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, Hash::from(hash))) - } - } - } - }; - let mut uuid = [0u8; 32]; - uuid.copy_from_slice(old.uuid.as_ref()); - ObjectVersion{ - uuid: UUID::from(uuid), - timestamp: old.timestamp, - state, - } + let headers = ObjectVersionHeaders { + content_type: old.mime_type.clone(), + other: BTreeMap::new(), + }; + let meta = ObjectVersionMeta { + headers: headers.clone(), + size: old.size, + etag: "".to_string(), + }; + let state = match old.state { + prev::ObjectVersionState::Uploading => ObjectVersionState::Uploading(headers), + prev::ObjectVersionState::Aborted => ObjectVersionState::Aborted, + prev::ObjectVersionState::Complete => match &old.data { + prev::ObjectVersionData::Uploading => ObjectVersionState::Uploading(headers), + prev::ObjectVersionData::DeleteMarker => { + ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) + } + prev::ObjectVersionData::Inline(x) => { + ObjectVersionState::Complete(ObjectVersionData::Inline(meta, x.clone())) + } + prev::ObjectVersionData::FirstBlock(h) => { + let mut hash = [0u8; 32]; + hash.copy_from_slice(h.as_ref()); + ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, Hash::from(hash))) + } + }, + }; + let mut uuid = [0u8; 32]; + uuid.copy_from_slice(old.uuid.as_ref()); + ObjectVersion { + uuid: UUID::from(uuid), + timestamp: old.timestamp, + state, + } } diff --git a/src/table/lib.rs b/src/table/lib.rs index e30a6665..ac129146 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -9,5 +9,5 @@ pub mod table_fullcopy; pub mod table_sharded; pub mod table_sync; -pub use table::*; pub use schema::*; +pub use table::*; diff --git a/src/table/schema.rs b/src/table/schema.rs index cedaacac..1914320e 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -4,7 +4,6 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; use garage_util::error::Error; - pub trait PartitionKey { fn hash(&self) -> Hash; } @@ -64,11 +63,11 @@ pub trait TableSchema: Send + Sync { type E: Entry<Self::P, Self::S>; type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; - // Action to take if not able to decode current version: - // try loading from an older version - fn try_migrate(_bytes: &[u8]) -> Option<Self::E> { - None - } + // Action to take if not able to decode current version: + // try loading from an older version + fn try_migrate(_bytes: &[u8]) -> Option<Self::E> { + None + } async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error>; fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { diff --git a/src/table/table.rs b/src/table/table.rs index 7a5caf4f..9d43a475 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -14,8 +14,8 @@ use garage_rpc::membership::{Ring, System}; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; -use crate::table_sync::*; use crate::schema::*; +use crate::table_sync::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); @@ -48,7 +48,6 @@ pub enum TableRPC<F: TableSchema> { impl<F: TableSchema> RpcMessage for TableRPC<F> {} - pub trait TableReplication: Send + Sync { // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods @@ -456,15 +455,13 @@ where ret } - fn decode_entry(bytes: &[u8]) -> Result<F::E, Error> { - match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { - Ok(x) => Ok(x), - Err(e) => { - match F::try_migrate(bytes) { - Some(x) => Ok(x), - None => Err(e.into()), - } - } - } - } + fn decode_entry(bytes: &[u8]) -> Result<F::E, Error> { + match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { + Ok(x) => Ok(x), + Err(e) => match F::try_migrate(bytes) { + Some(x) => Ok(x), + None => Err(e.into()), + }, + } + } } |