aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api_server.rs69
-rw-r--r--src/data.rs2
-rw-r--r--src/error.rs18
-rw-r--r--src/main.rs2
-rw-r--r--src/object_table.rs88
-rw-r--r--src/server.rs16
-rw-r--r--src/table.rs28
-rw-r--r--src/version_table.rs59
8 files changed, 169 insertions, 113 deletions
diff --git a/src/api_server.rs b/src/api_server.rs
index ff7c536c..a92fd36b 100644
--- a/src/api_server.rs
+++ b/src/api_server.rs
@@ -39,16 +39,10 @@ pub async fn run_api_server(garage: Arc<Garage>, shutdown_signal: impl Future<Ou
async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Error> {
match handler_inner(garage, req, addr).await {
Ok(x) => Ok(x),
- Err(Error::BadRequest(e)) => {
- let mut bad_request = Response::new(Body::from(format!("{}\n", e)));
- *bad_request.status_mut() = StatusCode::BAD_REQUEST;
- Ok(bad_request)
- }
Err(e) => {
- let mut ise = Response::new(Body::from(
- format!("Internal server error: {}\n", e)));
- *ise.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
- Ok(ise)
+ let mut http_error = Response::new(Body::from(format!("{}\n", e)));
+ *http_error.status_mut() = e.http_status_code();
+ Ok(http_error)
}
}
}
@@ -65,9 +59,7 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr
match req.method() {
&Method::GET => {
- Ok(Response::new(Body::from(
- "TODO: implement GET object",
- )))
+ Ok(handle_get(garage, &bucket, &key).await?)
}
&Method::PUT => {
let mime_type = req.headers()
@@ -97,27 +89,30 @@ async fn handle_put(garage: Arc<Garage>,
None => return Err(Error::BadRequest(format!("Empty body"))),
};
- let mut version = VersionMeta{
+ let mut object = Object {
bucket: bucket.into(),
key: key.into(),
- timestamp: now_msec(),
+ versions: Vec::new(),
+ };
+ object.versions.push(Box::new(Version{
uuid: version_uuid.clone(),
+ timestamp: now_msec(),
mime_type: mime_type.to_string(),
size: first_block.len() as u64,
is_complete: false,
data: VersionData::DeleteMarker,
- };
+ }));
if first_block.len() < INLINE_THRESHOLD {
- version.data = VersionData::Inline(first_block);
- version.is_complete = true;
- garage.version_table.insert(&version).await?;
+ object.versions[0].data = VersionData::Inline(first_block);
+ object.versions[0].is_complete = true;
+ garage.object_table.insert(&object).await?;
return Ok(version_uuid)
}
let first_block_hash = hash(&first_block[..]);
- version.data = VersionData::FirstBlock(first_block_hash);
- garage.version_table.insert(&version).await?;
+ object.versions[0].data = VersionData::FirstBlock(first_block_hash);
+ garage.object_table.insert(&object).await?;
let block_meta = BlockMeta{
version_uuid: version_uuid.clone(),
@@ -143,8 +138,9 @@ async fn handle_put(garage: Arc<Garage>,
// TODO: if at any step we have an error, we should undo everything we did
- version.is_complete = true;
- garage.version_table.insert(&version).await?;
+ object.versions[0].is_complete = true;
+ object.versions[0].size = next_offset as u64;
+ garage.object_table.insert(&object).await?;
Ok(version_uuid)
}
@@ -198,3 +194,32 @@ impl BodyChunker {
}
}
}
+
+async fn handle_get(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<Response<Body>, Error> {
+ let mut object = match garage.object_table.get(&bucket.to_string(), &key.to_string()).await? {
+ None => return Err(Error::NotFound),
+ Some(o) => o
+ };
+
+ let last_v = match object.versions.drain(..)
+ .rev().filter(|v| v.is_complete)
+ .next() {
+ Some(v) => v,
+ None => return Err(Error::NotFound),
+ };
+
+ let resp_builder = Response::builder()
+ .header("Content-Type", last_v.mime_type)
+ .status(StatusCode::OK);
+
+ match last_v.data {
+ VersionData::DeleteMarker => Err(Error::NotFound),
+ VersionData::Inline(bytes) => {
+ Ok(resp_builder.body(bytes.into())?)
+ }
+ VersionData::FirstBlock(hash) => {
+ // TODO
+ unimplemented!()
+ }
+ }
+}
diff --git a/src/data.rs b/src/data.rs
index f01d5394..14846fe2 100644
--- a/src/data.rs
+++ b/src/data.rs
@@ -121,7 +121,7 @@ pub struct SplitpointMeta {
pub deleted: bool,
}
-pub use crate::version_table::*;
+pub use crate::object_table::*;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BlockMeta {
diff --git a/src/error.rs b/src/error.rs
index 1481234f..578f73e9 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -1,5 +1,6 @@
-use err_derive::Error;
use std::io;
+use err_derive::Error;
+use hyper::StatusCode;
#[derive(Debug, Error)]
pub enum Error {
@@ -32,9 +33,22 @@ pub enum Error {
#[error(display = "RPC error: {}", _0)]
RPCError(String),
- #[error(display = "{}", _0)]
+ #[error(display = "Bad request: {}", _0)]
BadRequest(String),
+ #[error(display = "Not found")]
+ NotFound,
+
#[error(display = "{}", _0)]
Message(String),
}
+
+impl Error {
+ pub fn http_status_code(&self) -> StatusCode {
+ match self {
+ Error::BadRequest(_) => StatusCode::BAD_REQUEST,
+ Error::NotFound => StatusCode::NOT_FOUND,
+ _ => StatusCode::INTERNAL_SERVER_ERROR,
+ }
+ }
+}
diff --git a/src/main.rs b/src/main.rs
index aa0f23dc..2303e7a9 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -5,7 +5,7 @@ mod proto;
mod membership;
mod table;
-mod version_table;
+mod object_table;
mod server;
mod rpc_server;
diff --git a/src/object_table.rs b/src/object_table.rs
new file mode 100644
index 00000000..37c02225
--- /dev/null
+++ b/src/object_table.rs
@@ -0,0 +1,88 @@
+use std::sync::Arc;
+use serde::{Serialize, Deserialize};
+use async_trait::async_trait;
+use tokio::sync::RwLock;
+
+use crate::data::*;
+use crate::table::*;
+use crate::server::Garage;
+
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct Object {
+ pub bucket: String,
+ pub key: String,
+
+ pub versions: Vec<Box<Version>>,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct Version {
+ pub uuid: UUID,
+ pub timestamp: u64,
+
+ pub mime_type: String,
+ pub size: u64,
+ pub is_complete: bool,
+
+ pub data: VersionData,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub enum VersionData {
+ DeleteMarker,
+ Inline(#[serde(with="serde_bytes")] Vec<u8>),
+ FirstBlock(Hash),
+}
+
+pub struct ObjectTable {
+ pub garage: RwLock<Option<Arc<Garage>>>,
+}
+
+impl Entry<String, String> for Object {
+ fn partition_key(&self) -> &String {
+ &self.bucket
+ }
+ fn sort_key(&self) -> &String {
+ &self.key
+ }
+
+ fn merge(&mut self, other: &Self) {
+ for other_v in other.versions.iter() {
+ match self.versions.binary_search_by(|v| (v.timestamp, &v.uuid).cmp(&(other_v.timestamp, &other_v.uuid))) {
+ Ok(i) => {
+ let mut v = &mut self.versions[i];
+ if other_v.size > v.size {
+ v.size = other_v.size;
+ }
+ if other_v.is_complete {
+ v.is_complete = true;
+ }
+ }
+ Err(i) => {
+ self.versions.insert(i, other_v.clone());
+ }
+ }
+ }
+ let last_complete = self.versions
+ .iter().enumerate().rev()
+ .filter(|(_, v)| v.is_complete)
+ .next()
+ .map(|(vi, _)| vi);
+
+ if let Some(last_vi) = last_complete {
+ self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>();
+ }
+ }
+}
+
+#[async_trait]
+impl TableFormat for ObjectTable {
+ type P = String;
+ type S = String;
+ type E = Object;
+
+ async fn updated(&self, old: Option<&Self::E>, new: &Self::E) {
+ unimplemented!()
+ }
+}
diff --git a/src/server.rs b/src/server.rs
index 1f1ac2af..c41ee9b7 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -21,7 +21,7 @@ pub struct Garage {
pub table_rpc_handlers: HashMap<String, Box<dyn TableRpcHandler + Sync + Send>>,
- pub version_table: Arc<Table<VersionTable>>,
+ pub object_table: Arc<Table<ObjectTable>>,
}
impl Garage {
@@ -35,25 +35,25 @@ impl Garage {
timeout: DEFAULT_TIMEOUT,
};
- let version_table = Arc::new(Table::new(
- VersionTable{garage: RwLock::new(None)},
+ let object_table = Arc::new(Table::new(
+ ObjectTable{garage: RwLock::new(None)},
system.clone(),
&db,
- "version".to_string(),
+ "object".to_string(),
meta_rep_param.clone()));
let mut garage = Self{
db,
system: system.clone(),
table_rpc_handlers: HashMap::new(),
- version_table,
+ object_table,
};
garage.table_rpc_handlers.insert(
- garage.version_table.name.clone(),
- garage.version_table.clone().rpc_handler());
+ garage.object_table.name.clone(),
+ garage.object_table.clone().rpc_handler());
let garage = Arc::new(garage);
- *garage.version_table.instance.garage.write().await = Some(garage.clone());
+ *garage.object_table.instance.garage.write().await = Some(garage.clone());
garage
}
}
diff --git a/src/table.rs b/src/table.rs
index df82e9c7..55ae9229 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -64,11 +64,11 @@ pub struct Partition {
pub other_nodes: Vec<UUID>,
}
-pub trait PartitionKey: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync {
+pub trait PartitionKey {
fn hash(&self) -> Hash;
}
-pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync {
+pub trait SortKey {
fn sort_key(&self) -> &[u8];
}
@@ -87,33 +87,21 @@ impl SortKey for EmptySortKey {
}
}
-#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
-pub struct StringKey(String);
-impl PartitionKey for StringKey {
+impl<T: AsRef<str>> PartitionKey for T {
fn hash(&self) -> Hash {
- hash(self.0.as_bytes())
+ hash(self.as_ref().as_bytes())
}
}
-impl SortKey for StringKey {
+impl<T: AsRef<str>> SortKey for T {
fn sort_key(&self) -> &[u8] {
- self.0.as_bytes()
- }
-}
-impl AsRef<str> for StringKey {
- fn as_ref(&self) -> &str {
- &self.0
- }
-}
-impl From<&str> for StringKey {
- fn from(s: &str) -> StringKey {
- StringKey(s.to_string())
+ self.as_ref().as_bytes()
}
}
#[async_trait]
pub trait TableFormat: Send + Sync {
- type P: PartitionKey;
- type S: SortKey;
+ type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
type E: Entry<Self::P, Self::S>;
async fn updated(&self, old: Option<&Self::E>, new: &Self::E);
diff --git a/src/version_table.rs b/src/version_table.rs
deleted file mode 100644
index 1542dc42..00000000
--- a/src/version_table.rs
+++ /dev/null
@@ -1,59 +0,0 @@
-use std::sync::Arc;
-use serde::{Serialize, Deserialize};
-use async_trait::async_trait;
-use tokio::sync::RwLock;
-
-use crate::data::*;
-use crate::table::*;
-use crate::server::Garage;
-
-
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub struct VersionMeta {
- pub bucket: StringKey,
- pub key: StringKey,
-
- pub timestamp: u64,
- pub uuid: UUID,
-
- pub mime_type: String,
- pub size: u64,
- pub is_complete: bool,
-
- pub data: VersionData,
-}
-
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub enum VersionData {
- DeleteMarker,
- Inline(#[serde(with="serde_bytes")] Vec<u8>),
- FirstBlock(Hash),
-}
-
-pub struct VersionTable {
- pub garage: RwLock<Option<Arc<Garage>>>,
-}
-
-impl Entry<StringKey, StringKey> for VersionMeta {
- fn partition_key(&self) -> &StringKey {
- &self.bucket
- }
- fn sort_key(&self) -> &StringKey {
- &self.key
- }
-
- fn merge(&mut self, other: &Self) {
- unimplemented!()
- }
-}
-
-#[async_trait]
-impl TableFormat for VersionTable {
- type P = StringKey;
- type S = StringKey;
- type E = VersionMeta;
-
- async fn updated(&self, old: Option<&Self::E>, new: &Self::E) {
- unimplemented!()
- }
-}