use std::collections::HashMap; use std::sync::Arc; use futures::prelude::*; use hyper::{Request, Response}; use md5::{Digest as Md5Digest, Md5}; use garage_table::*; use garage_util::data::*; use garage_model::bucket_table::Bucket; use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::encryption::EncryptionParams; use crate::s3::error::*; use crate::s3::put::*; use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; // ---- pub async fn handle_create_multipart_upload( garage: Arc, req: &Request, bucket_name: &str, bucket_id: Uuid, key: &String, ) -> Result, Error> { let existing_object = garage.object_table.get(&bucket_id, &key).await?; let upload_id = gen_uuid(); let timestamp = next_timestamp(existing_object.as_ref()); let headers = get_headers(req.headers())?; // Determine whether object should be encrypted, and if so the key let encryption = EncryptionParams::new_from_req(&garage, &req)?; if encryption.is_encrypted() { // TODO unimplemented!("multipart upload encryption"); } let object_encryption = encryption.encrypt_headers(headers)?; // Create object in object table let object_version = ObjectVersion { uuid: upload_id, timestamp, state: ObjectVersionState::Uploading { multipart: true, encryption: object_encryption, }, }; let object = Object::new(bucket_id, key.to_string(), vec![object_version]); garage.object_table.insert(&object).await?; // Create multipart upload in mpu table // This multipart upload will hold references to uploaded parts // (which are entries in the Version table) let mpu = MultipartUpload::new(upload_id, timestamp, bucket_id, key.into(), false); garage.mpu_table.insert(&mpu).await?; // Send success response let result = s3_xml::InitiateMultipartUploadResult { xmlns: (), bucket: s3_xml::Value(bucket_name.to_string()), key: s3_xml::Value(key.to_string()), upload_id: s3_xml::Value(hex::encode(upload_id)), }; let xml = s3_xml::to_xml_with_header(&result)?; Ok(Response::new(string_body(xml))) } pub async fn handle_put_part( garage: Arc, req: Request, bucket_id: Uuid, key: &str, part_number: u64, upload_id: &str, content_sha256: Option, ) -> Result, Error> { let upload_id = decode_upload_id(upload_id)?; let content_md5 = match req.headers().get("content-md5") { Some(x) => Some(x.to_str()?.to_string()), None => None, }; // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); let (req_head, req_body) = req.into_parts(); let stream = body_stream(req_body); let mut chunker = StreamChunker::new(stream, garage.config.block_size); let ((_, object_version, mut mpu), first_block) = futures::try_join!( get_upload(&garage, &bucket_id, &key, &upload_id), chunker.next(), )?; // Check encryption params let object_encryption = match object_version.state { ObjectVersionState::Uploading { encryption, .. } => encryption, _ => unreachable!(), }; let (encryption, _) = EncryptionParams::check_decrypt_for_get( &garage, &Request::from_parts(req_head, empty_body::()), &object_encryption, )?; // Check object is valid and part can be accepted let first_block = first_block.ok_or_bad_request("Empty body")?; // Calculate part identity: timestamp, version id let version_uuid = gen_uuid(); let mpu_part_key = MpuPartKey { part_number, timestamp: mpu.next_timestamp(part_number), }; // The following consists in many steps that can each fail. // Keep track that some cleanup will be needed if things fail // before everything is finished (cleanup is done using the Drop trait). let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner { garage: garage.clone(), upload_id, version_uuid, })); // Create version and link version from MPU mpu.parts.clear(); mpu.parts.put( mpu_part_key, MpuPart { version: version_uuid, etag: None, size: None, }, ); garage.mpu_table.insert(&mpu).await?; let version = Version::new( version_uuid, VersionBacklink::MultipartUpload { upload_id }, false, ); garage.version_table.insert(&version).await?; // Copy data to version let (total_size, data_md5sum, data_sha256sum, _) = read_and_put_blocks( &garage, &version, encryption, part_number, first_block, &mut chunker, ) .await?; // Verify that checksums map ensure_checksum_matches( data_md5sum.as_slice(), data_sha256sum, content_md5.as_deref(), content_sha256, )?; // Store part etag in version let data_md5sum_hex = hex::encode(data_md5sum); mpu.parts.put( mpu_part_key, MpuPart { version: version_uuid, etag: Some(data_md5sum_hex.clone()), size: Some(total_size), }, ); garage.mpu_table.insert(&mpu).await?; // We were not interrupted, everything went fine. // We won't have to clean up on drop. interrupted_cleanup.cancel(); let response = Response::builder() .header("ETag", format!("\"{}\"", data_md5sum_hex)) .body(empty_body()) .unwrap(); Ok(response) } struct InterruptedCleanup(Option); struct InterruptedCleanupInner { garage: Arc, upload_id: Uuid, version_uuid: Uuid, } impl InterruptedCleanup { fn cancel(&mut self) { drop(self.0.take()); } } impl Drop for InterruptedCleanup { fn drop(&mut self) { if let Some(info) = self.0.take() { tokio::spawn(async move { let version = Version::new( info.version_uuid, VersionBacklink::MultipartUpload { upload_id: info.upload_id, }, true, ); if let Err(e) = info.garage.version_table.insert(&version).await { warn!("Cannot cleanup after aborted UploadPart: {}", e); } }); } } } pub async fn handle_complete_multipart_upload( garage: Arc, req: Request, bucket_name: &str, bucket: &Bucket, key: &str, upload_id: &str, content_sha256: Option, ) -> Result, Error> { let body = http_body_util::BodyExt::collect(req.into_body()) .await? .to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; } let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let body_list_of_parts = parse_complete_multipart_upload_body(&body_xml) .ok_or_bad_request("Invalid CompleteMultipartUpload XML")?; debug!( "CompleteMultipartUpload list of parts: {:?}", body_list_of_parts ); let upload_id = decode_upload_id(upload_id)?; // Get object and multipart upload let key = key.to_string(); let (object, mut object_version, mpu) = get_upload(&garage, &bucket.id, &key, &upload_id).await?; if mpu.parts.is_empty() { return Err(Error::bad_request("No data was uploaded")); } let object_encryption = match object_version.state { ObjectVersionState::Uploading { encryption, .. } => encryption, _ => unreachable!(), }; // Check that part numbers are an increasing sequence. // (it doesn't need to start at 1 nor to be a continuous sequence, // see discussion in #192) if body_list_of_parts.is_empty() { return Err(Error::EntityTooSmall); } if !body_list_of_parts .iter() .zip(body_list_of_parts.iter().skip(1)) .all(|(p1, p2)| p1.part_number < p2.part_number) { return Err(Error::InvalidPartOrder); } // Check that the list of parts they gave us corresponds to parts we have here debug!("Parts stored in multipart upload: {:?}", mpu.parts.items()); let mut have_parts = HashMap::new(); for (pk, pv) in mpu.parts.items().iter() { have_parts.insert(pk.part_number, pv); } let mut parts = vec![]; for req_part in body_list_of_parts.iter() { match have_parts.get(&req_part.part_number) { Some(part) if part.etag.as_ref() == Some(&req_part.etag) && part.size.is_some() => { parts.push(*part) } _ => return Err(Error::InvalidPart), } } let grg = &garage; let parts_versions = futures::future::try_join_all(parts.iter().map(|p| async move { grg.version_table .get(&p.version, &EmptyKey) .await? .ok_or_internal_error("Part version missing from version table") })) .await?; // Create final version and block refs let mut final_version = Version::new( upload_id, VersionBacklink::Object { bucket_id: bucket.id, key: key.to_string(), }, false, ); for (part_number, part_version) in parts_versions.iter().enumerate() { if part_version.deleted.get() { return Err(Error::InvalidPart); } for (vbk, vb) in part_version.blocks.items().iter() { final_version.blocks.put( VersionBlockKey { part_number: (part_number + 1) as u64, offset: vbk.offset, }, *vb, ); } } garage.version_table.insert(&final_version).await?; let block_refs = final_version.blocks.items().iter().map(|(_, b)| BlockRef { block: b.hash, version: upload_id, deleted: false.into(), }); garage.block_ref_table.insert_many(block_refs).await?; // Calculate etag of final object // To understand how etags are calculated, read more here: // https://teppen.io/2018/06/23/aws_s3_etags/ let mut etag_md5_hasher = Md5::new(); for part in parts.iter() { etag_md5_hasher.update(part.etag.as_ref().unwrap().as_bytes()); } let etag = format!( "{}-{}", hex::encode(etag_md5_hasher.finalize()), parts.len() ); // Calculate total size of final object let total_size = parts.iter().map(|x| x.size.unwrap()).sum(); if let Err(e) = check_quotas(&garage, bucket, total_size, Some(&object)).await { object_version.state = ObjectVersionState::Aborted; let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; return Err(e); } // Write final object version object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { encryption: object_encryption, size: total_size, etag: etag.clone(), }, final_version.blocks.items()[0].1.hash, )); let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; // Send response saying ok we're done let result = s3_xml::CompleteMultipartUploadResult { xmlns: (), location: None, bucket: s3_xml::Value(bucket_name.to_string()), key: s3_xml::Value(key), etag: s3_xml::Value(format!("\"{}\"", etag)), }; let xml = s3_xml::to_xml_with_header(&result)?; Ok(Response::new(string_body(xml))) } pub async fn handle_abort_multipart_upload( garage: Arc, bucket_id: Uuid, key: &str, upload_id: &str, ) -> Result, Error> { let upload_id = decode_upload_id(upload_id)?; let (_, mut object_version, _) = get_upload(&garage, &bucket_id, &key.to_string(), &upload_id).await?; object_version.state = ObjectVersionState::Aborted; let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]); garage.object_table.insert(&final_object).await?; Ok(Response::new(empty_body())) } // ======== helpers ============ #[allow(clippy::ptr_arg)] pub(crate) async fn get_upload( garage: &Garage, bucket_id: &Uuid, key: &String, upload_id: &Uuid, ) -> Result<(Object, ObjectVersion, MultipartUpload), Error> { let (object, mpu) = futures::try_join!( garage.object_table.get(bucket_id, key).map_err(Error::from), garage .mpu_table .get(upload_id, &EmptyKey) .map_err(Error::from), )?; let object = object.ok_or(Error::NoSuchUpload)?; let mpu = mpu.ok_or(Error::NoSuchUpload)?; let object_version = object .versions() .iter() .find(|v| v.uuid == *upload_id && v.is_uploading(Some(true))) .ok_or(Error::NoSuchUpload)? .clone(); Ok((object, object_version, mpu)) } pub fn decode_upload_id(id: &str) -> Result { let id_bin = hex::decode(id).map_err(|_| Error::NoSuchUpload)?; if id_bin.len() != 32 { return Err(Error::NoSuchUpload); } let mut uuid = [0u8; 32]; uuid.copy_from_slice(&id_bin[..]); Ok(Uuid::from(uuid)) } #[derive(Debug)] struct CompleteMultipartUploadPart { etag: String, part_number: u64, } fn parse_complete_multipart_upload_body( xml: &roxmltree::Document, ) -> Option> { let mut parts = vec![]; let root = xml.root(); let cmu = root.first_child()?; if !cmu.has_tag_name("CompleteMultipartUpload") { return None; } for item in cmu.children() { // Only parse nodes if !item.is_element() { continue; } if item.has_tag_name("Part") { let etag = item.children().find(|e| e.has_tag_name("ETag"))?.text()?; let part_number = item .children() .find(|e| e.has_tag_name("PartNumber"))? .text()?; parts.push(CompleteMultipartUploadPart { etag: etag.trim_matches('"').to_string(), part_number: part_number.parse().ok()?, }); } else { return None; } } Some(parts) }