diff options
Diffstat (limited to 'src/api_server.rs')
-rw-r--r-- | src/api_server.rs | 122 |
1 files changed, 92 insertions, 30 deletions
diff --git a/src/api_server.rs b/src/api_server.rs index a92fd36b..a5e6e322 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -2,17 +2,20 @@ use std::sync::Arc; use std::net::SocketAddr; use std::collections::VecDeque; -use futures::stream::StreamExt; +use futures::stream::*; use hyper::service::{make_service_fn, service_fn}; use hyper::server::conn::AddrStream; use hyper::{Body, Method, Request, Response, Server, StatusCode}; +use hyper::body::Bytes; use futures::future::Future; use crate::error::Error; use crate::data::*; +use crate::data; use crate::proto::*; use crate::rpc_client::*; use crate::server::Garage; +use crate::table::EmptySortKey; pub async fn run_api_server(garage: Arc<Garage>, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> { let addr = ([0, 0, 0, 0], garage.system.config.api_port).into(); @@ -69,7 +72,7 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr .to_string(); let version_uuid = handle_put(garage, &mime_type, &bucket, &key, req.into_body()).await?; Ok(Response::new(Body::from( - format!("Version UUID: {:?}", version_uuid), + format!("{:?}\n", version_uuid), ))) } _ => Err(Error::BadRequest(format!("Invalid method"))), @@ -94,43 +97,46 @@ async fn handle_put(garage: Arc<Garage>, key: key.into(), versions: Vec::new(), }; - object.versions.push(Box::new(Version{ + object.versions.push(Box::new(ObjectVersion{ uuid: version_uuid.clone(), timestamp: now_msec(), mime_type: mime_type.to_string(), size: first_block.len() as u64, is_complete: false, - data: VersionData::DeleteMarker, + data: ObjectVersionData::DeleteMarker, })); if first_block.len() < INLINE_THRESHOLD { - object.versions[0].data = VersionData::Inline(first_block); + object.versions[0].data = ObjectVersionData::Inline(first_block); object.versions[0].is_complete = true; garage.object_table.insert(&object).await?; return Ok(version_uuid) } + let version = Version { + version: version_uuid.clone(), + deleted: false, + blocks: Vec::new(), + bucket: bucket.into(), + key: key.into(), + }; + let first_block_hash = hash(&first_block[..]); - object.versions[0].data = VersionData::FirstBlock(first_block_hash); + object.versions[0].data = ObjectVersionData::FirstBlock(first_block_hash.clone()); garage.object_table.insert(&object).await?; - let block_meta = BlockMeta{ - version_uuid: version_uuid.clone(), - offset: 0, - hash: hash(&first_block[..]), - }; let mut next_offset = first_block.len(); - let mut put_curr_block = put_block(garage.clone(), block_meta, first_block); + let mut put_curr_version_block = put_version_block(garage.clone(), &version, 0, first_block_hash.clone()); + let mut put_curr_block = put_block(garage.clone(), first_block_hash, first_block); + loop { - let (_, next_block) = futures::try_join!(put_curr_block, chunker.next())?; + let (_, _, next_block) = futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; if let Some(block) = next_block { - let block_meta = BlockMeta{ - version_uuid: version_uuid.clone(), - offset: next_offset as u64, - hash: hash(&block[..]), - }; - next_offset += block.len(); - put_curr_block = put_block(garage.clone(), block_meta, block); + let block_hash = hash(&block[..]); + let block_len = block.len(); + put_curr_version_block = put_version_block(garage.clone(), &version, next_offset as u64, block_hash.clone()); + put_curr_block = put_block(garage.clone(), block_hash, block); + next_offset += block_len; } else { break; } @@ -144,13 +150,23 @@ async fn handle_put(garage: Arc<Garage>, Ok(version_uuid) } -async fn put_block(garage: Arc<Garage>, meta: BlockMeta, data: Vec<u8>) -> Result<(), Error> { +async fn put_version_block(garage: Arc<Garage>, version: &Version, offset: u64, hash: Hash) -> Result<(), Error> { + let mut version = version.clone(); + version.blocks.push(VersionBlock{ + offset, + hash, + }); + garage.version_table.insert(&version).await?; + Ok(()) +} + +async fn put_block(garage: Arc<Garage>, hash: Hash, data: Vec<u8>) -> Result<(), Error> { let who = garage.system.members.read().await - .walk_ring(&meta.hash, garage.system.config.meta_replication_factor); + .walk_ring(&hash, garage.system.config.meta_replication_factor); rpc_try_call_many(garage.system.clone(), &who[..], &Message::PutBlock(PutBlockMessage{ - meta, + hash, data, }), (garage.system.config.meta_replication_factor+1)/2, @@ -160,6 +176,7 @@ async fn put_block(garage: Arc<Garage>, meta: BlockMeta, data: Vec<u8>) -> Resul struct BodyChunker { body: Body, + read_all: bool, block_size: usize, buf: VecDeque<u8>, } @@ -168,17 +185,19 @@ impl BodyChunker { fn new(body: Body, block_size: usize) -> Self { Self{ body, + read_all: false, block_size, buf: VecDeque::new(), } } async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> { - while self.buf.len() < self.block_size { + while !self.read_all && self.buf.len() < self.block_size { if let Some(block) = self.body.next().await { let bytes = block?; + eprintln!("Body next: {} bytes", bytes.len()); self.buf.extend(&bytes[..]); } else { - break; + self.read_all = true; } } if self.buf.len() == 0 { @@ -213,13 +232,56 @@ async fn handle_get(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<Resp .status(StatusCode::OK); match last_v.data { - VersionData::DeleteMarker => Err(Error::NotFound), - VersionData::Inline(bytes) => { + ObjectVersionData::DeleteMarker => Err(Error::NotFound), + ObjectVersionData::Inline(bytes) => { Ok(resp_builder.body(bytes.into())?) } - VersionData::FirstBlock(hash) => { - // TODO - unimplemented!() + ObjectVersionData::FirstBlock(first_block_hash) => { + let read_first_block = get_block(garage.clone(), &first_block_hash); + let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptySortKey); + + let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?; + let version = match version { + Some(v) => v, + None => return Err(Error::NotFound), + }; + + let mut blocks = version.blocks.iter() + .map(|vb| (vb.hash.clone(), None)) + .collect::<Vec<_>>(); + blocks[0].1 = Some(first_block); + + let block_futures = blocks.drain(..) + .map(move |(hash, data_opt)| async { + if let Some(data) = data_opt { + Ok(data) + } else { + get_block(garage.clone(), &hash).await + .map_err(|e| format!("{}", e)) + } + }); + let body_stream = futures::stream::iter(block_futures).buffered(2); + let body = Body::wrap_stream(body_stream); + Ok(resp_builder.body(body)?) + } + } +} + +async fn get_block(garage: Arc<Garage>, hash: &Hash) -> Result<Vec<u8>, Error> { + let who = garage.system.members.read().await + .walk_ring(&hash, garage.system.config.meta_replication_factor); + let resps = rpc_try_call_many(garage.system.clone(), + &who[..], + &Message::GetBlock(hash.clone()), + 1, + DEFAULT_TIMEOUT).await?; + + for resp in resps { + if let Message::PutBlock(pbm) = resp { + if data::hash(&pbm.data) == *hash { + return Ok(pbm.data) + } } } + Err(Error::Message(format!("No valid blocks returned"))) } |