diff options
Diffstat (limited to 'src/api_server.rs')
-rw-r--r-- | src/api_server.rs | 70 |
1 files changed, 29 insertions, 41 deletions
diff --git a/src/api_server.rs b/src/api_server.rs index c1b4d81d..8acd15d8 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -9,21 +9,21 @@ use hyper::{Body, Method, Request, Response, Server, StatusCode}; use futures::future::Future; use crate::error::Error; -use crate::membership::System; use crate::data::*; use crate::proto::*; use crate::rpc_client::*; +use crate::server::Garage; -pub async fn run_api_server(sys: Arc<System>, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> { - let addr = ([0, 0, 0, 0], sys.config.api_port).into(); +pub async fn run_api_server(garage: Arc<Garage>, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> { + let addr = ([0, 0, 0, 0], garage.system.config.api_port).into(); let service = make_service_fn(|conn: &AddrStream| { - let sys = sys.clone(); + let garage = garage.clone(); let client_addr = conn.remote_addr(); async move { Ok::<_, Error>(service_fn(move |req: Request<Body>| { - let sys = sys.clone(); - handler(sys, req, client_addr) + let garage = garage.clone(); + handler(garage, req, client_addr) })) } }); @@ -36,8 +36,8 @@ pub async fn run_api_server(sys: Arc<System>, shutdown_signal: impl Future<Outpu graceful.await } -async fn handler(sys: Arc<System>, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Error> { - match handler_inner(sys, req, addr).await { +async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Error> { + match handler_inner(garage, req, addr).await { Ok(x) => Ok(x), Err(Error::BadRequest(e)) => { let mut bad_request = Response::new(Body::from(format!("{}\n", e))); @@ -53,7 +53,7 @@ async fn handler(sys: Arc<System>, req: Request<Body>, addr: SocketAddr) -> Resu } } -async fn handler_inner(sys: Arc<System>, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Error> { +async fn handler_inner(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Error> { eprintln!("{} {} {}", addr, req.method(), req.uri()); let bucket = req.headers() @@ -75,7 +75,7 @@ async fn handler_inner(sys: Arc<System>, req: Request<Body>, addr: SocketAddr) - .map(|x| x.to_str()) .unwrap_or(Ok("blob"))? .to_string(); - let version_uuid = handle_put(sys, &mime_type, &bucket, &key, req.into_body()).await?; + let version_uuid = handle_put(garage, &mime_type, &bucket, &key, req.into_body()).await?; Ok(Response::new(Body::from( format!("Version UUID: {:?}", version_uuid), ))) @@ -84,22 +84,24 @@ async fn handler_inner(sys: Arc<System>, req: Request<Body>, addr: SocketAddr) - } } -async fn handle_put(sys: Arc<System>, +async fn handle_put(garage: Arc<Garage>, mime_type: &str, bucket: &str, key: &str, body: Body) -> Result<UUID, Error> { let version_uuid = gen_uuid(); - let mut chunker = BodyChunker::new(body, sys.config.block_size); + let mut chunker = BodyChunker::new(body, garage.system.config.block_size); let first_block = match chunker.next().await? { Some(x) => x, None => return Err(Error::BadRequest(format!("Empty body"))), }; - let mut version = VersionMeta{ + let version_key = VersionMetaKey{ bucket: bucket.to_string(), key: key.to_string(), + }; + let mut version_value = VersionMetaValue { timestamp: now_msec(), uuid: version_uuid.clone(), mime_type: mime_type.to_string(), @@ -107,27 +109,17 @@ async fn handle_put(sys: Arc<System>, is_complete: false, data: VersionData::DeleteMarker, }; - let version_who = sys.members.read().await - .walk_ring(&version_uuid, sys.config.meta_replication_factor); if first_block.len() < INLINE_THRESHOLD { - version.data = VersionData::Inline(first_block); - version.is_complete = true; - rpc_try_call_many(sys.clone(), - &version_who[..], - &Message::AdvertiseVersion(version), - (sys.config.meta_replication_factor+1)/2, - DEFAULT_TIMEOUT).await?; + version_value.data = VersionData::Inline(first_block); + version_value.is_complete = true; + garage.version_table.insert(&version_key, &version_value).await?; return Ok(version_uuid) } let first_block_hash = hash(&first_block[..]); - version.data = VersionData::FirstBlock(first_block_hash); - rpc_try_call_many(sys.clone(), - &version_who[..], - &Message::AdvertiseVersion(version.clone()), - (sys.config.meta_replication_factor+1)/2, - DEFAULT_TIMEOUT).await?; + version_value.data = VersionData::FirstBlock(first_block_hash); + garage.version_table.insert(&version_key, &version_value).await?; let block_meta = BlockMeta{ version_uuid: version_uuid.clone(), @@ -135,7 +127,7 @@ async fn handle_put(sys: Arc<System>, hash: hash(&first_block[..]), }; let mut next_offset = first_block.len(); - let mut put_curr_block = put_block(sys.clone(), block_meta, first_block); + let mut put_curr_block = put_block(garage.clone(), block_meta, first_block); loop { let (_, next_block) = futures::try_join!(put_curr_block, chunker.next())?; if let Some(block) = next_block { @@ -145,7 +137,7 @@ async fn handle_put(sys: Arc<System>, hash: hash(&block[..]), }; next_offset += block.len(); - put_curr_block = put_block(sys.clone(), block_meta, block); + put_curr_block = put_block(garage.clone(), block_meta, block); } else { break; } @@ -153,25 +145,21 @@ async fn handle_put(sys: Arc<System>, // TODO: if at any step we have an error, we should undo everything we did - version.is_complete = true; - rpc_try_call_many(sys.clone(), - &version_who[..], - &Message::AdvertiseVersion(version), - (sys.config.meta_replication_factor+1)/2, - DEFAULT_TIMEOUT).await?; + version_value.is_complete = true; + garage.version_table.insert(&version_key, &version_value).await?; Ok(version_uuid) } -async fn put_block(sys: Arc<System>, meta: BlockMeta, data: Vec<u8>) -> Result<(), Error> { - let who = sys.members.read().await - .walk_ring(&meta.hash, sys.config.meta_replication_factor); - rpc_try_call_many(sys.clone(), +async fn put_block(garage: Arc<Garage>, meta: BlockMeta, data: Vec<u8>) -> Result<(), Error> { + let who = garage.system.members.read().await + .walk_ring(&meta.hash, garage.system.config.meta_replication_factor); + rpc_try_call_many(garage.system.clone(), &who[..], &Message::PutBlock(PutBlockMessage{ meta, data, }), - (sys.config.meta_replication_factor+1)/2, + (garage.system.config.meta_replication_factor+1)/2, DEFAULT_TIMEOUT).await?; Ok(()) } |