aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/Cargo.toml34
-rw-r--r--src/core/block.rs506
-rw-r--r--src/core/block_ref_table.rs68
-rw-r--r--src/core/bucket_table.rs121
-rw-r--r--src/core/garage.rs166
-rw-r--r--src/core/key_table.rs154
-rw-r--r--src/core/lib.rs10
-rw-r--r--src/core/object_table.rs165
-rw-r--r--src/core/version_table.rs131
9 files changed, 1355 insertions, 0 deletions
diff --git a/src/core/Cargo.toml b/src/core/Cargo.toml
new file mode 100644
index 00000000..1e482d87
--- /dev/null
+++ b/src/core/Cargo.toml
@@ -0,0 +1,34 @@
+[package]
+name = "garage_core"
+version = "0.1.0"
+authors = ["Alex Auvolat <alex@adnab.me>"]
+edition = "2018"
+
+[lib]
+path = "lib.rs"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+garage_util = { path = "../util" }
+garage_rpc = { path = "../rpc" }
+garage_table = { path = "../table" }
+
+bytes = "0.4"
+rand = "0.7"
+hex = "0.3"
+sha2 = "0.8"
+arc-swap = "0.4"
+log = "0.4"
+
+sled = "0.31"
+
+rmp-serde = "0.14.3"
+serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
+serde_bytes = "0.11"
+
+async-trait = "0.1.30"
+futures = "0.3"
+futures-util = "0.3"
+tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
+
diff --git a/src/core/block.rs b/src/core/block.rs
new file mode 100644
index 00000000..af8b9efb
--- /dev/null
+++ b/src/core/block.rs
@@ -0,0 +1,506 @@
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::time::Duration;
+
+use arc_swap::ArcSwapOption;
+use futures::future::*;
+use futures::select;
+use futures::stream::*;
+use serde::{Deserialize, Serialize};
+use tokio::fs;
+use tokio::prelude::*;
+use tokio::sync::{watch, Mutex, Notify};
+
+use garage_util::data;
+use garage_util::data::*;
+use garage_util::error::Error;
+
+use garage_rpc::membership::System;
+use garage_rpc::rpc_client::*;
+use garage_rpc::rpc_server::*;
+
+use garage_table::table_sharded::TableShardedReplication;
+use garage_table::TableReplication;
+
+use crate::block_ref_table::*;
+
+use crate::garage::Garage;
+
+pub const INLINE_THRESHOLD: usize = 3072;
+
+const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42);
+const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
+const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
+
+#[derive(Debug, Serialize, Deserialize)]
+pub enum Message {
+ Ok,
+ GetBlock(Hash),
+ PutBlock(PutBlockMessage),
+ NeedBlockQuery(Hash),
+ NeedBlockReply(bool),
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct PutBlockMessage {
+ pub hash: Hash,
+
+ #[serde(with = "serde_bytes")]
+ pub data: Vec<u8>,
+}
+
+impl RpcMessage for Message {}
+
+pub struct BlockManager {
+ pub replication: TableShardedReplication,
+ pub data_dir: PathBuf,
+ pub data_dir_lock: Mutex<()>,
+
+ pub rc: sled::Tree,
+
+ pub resync_queue: sled::Tree,
+ pub resync_notify: Notify,
+
+ pub system: Arc<System>,
+ rpc_client: Arc<RpcClient<Message>>,
+ pub garage: ArcSwapOption<Garage>,
+}
+
+impl BlockManager {
+ pub fn new(
+ db: &sled::Db,
+ data_dir: PathBuf,
+ replication: TableShardedReplication,
+ system: Arc<System>,
+ rpc_server: &mut RpcServer,
+ ) -> Arc<Self> {
+ let rc = db
+ .open_tree("block_local_rc")
+ .expect("Unable to open block_local_rc tree");
+ rc.set_merge_operator(rc_merge);
+
+ let resync_queue = db
+ .open_tree("block_local_resync_queue")
+ .expect("Unable to open block_local_resync_queue tree");
+
+ let rpc_path = "block_manager";
+ let rpc_client = system.rpc_client::<Message>(rpc_path);
+
+ let block_manager = Arc::new(Self {
+ replication,
+ data_dir,
+ data_dir_lock: Mutex::new(()),
+ rc,
+ resync_queue,
+ resync_notify: Notify::new(),
+ system,
+ rpc_client,
+ garage: ArcSwapOption::from(None),
+ });
+ block_manager
+ .clone()
+ .register_handler(rpc_server, rpc_path.into());
+ block_manager
+ }
+
+ fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) {
+ let self2 = self.clone();
+ rpc_server.add_handler::<Message, _, _>(path, move |msg, _addr| {
+ let self2 = self2.clone();
+ async move { self2.handle(&msg).await }
+ });
+
+ let self2 = self.clone();
+ self.rpc_client
+ .set_local_handler(self.system.id, move |msg| {
+ let self2 = self2.clone();
+ async move { self2.handle(&msg).await }
+ });
+ }
+
+ async fn handle(self: Arc<Self>, msg: &Message) -> Result<Message, Error> {
+ match msg {
+ Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await,
+ Message::GetBlock(h) => self.read_block(h).await,
+ Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply),
+ _ => Err(Error::BadRequest(format!("Unexpected RPC message"))),
+ }
+ }
+
+ pub async fn spawn_background_worker(self: Arc<Self>) {
+ // Launch 2 simultaneous workers for background resync loop preprocessing
+ for i in 0..2usize {
+ let bm2 = self.clone();
+ let background = self.system.background.clone();
+ tokio::spawn(async move {
+ tokio::time::delay_for(Duration::from_secs(10)).await;
+ background
+ .spawn_worker(format!("block resync worker {}", i), move |must_exit| {
+ bm2.resync_loop(must_exit)
+ })
+ .await;
+ });
+ }
+ }
+
+ pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
+ let _lock = self.data_dir_lock.lock().await;
+
+ let mut path = self.block_dir(hash);
+ fs::create_dir_all(&path).await?;
+
+ path.push(hex::encode(hash));
+ if fs::metadata(&path).await.is_ok() {
+ return Ok(Message::Ok);
+ }
+
+ let mut f = fs::File::create(path).await?;
+ f.write_all(data).await?;
+ drop(f);
+
+ Ok(Message::Ok)
+ }
+
+ pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
+ let path = self.block_path(hash);
+
+ let mut f = match fs::File::open(&path).await {
+ Ok(f) => f,
+ Err(e) => {
+ // Not found but maybe we should have had it ??
+ self.put_to_resync(hash, 0)?;
+ return Err(Into::into(e));
+ }
+ };
+ let mut data = vec![];
+ f.read_to_end(&mut data).await?;
+ drop(f);
+
+ if data::hash(&data[..]) != *hash {
+ let _lock = self.data_dir_lock.lock().await;
+ warn!("Block {:?} is corrupted. Deleting and resyncing.", hash);
+ fs::remove_file(path).await?;
+ self.put_to_resync(&hash, 0)?;
+ return Err(Error::CorruptData(*hash));
+ }
+
+ Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
+ }
+
+ pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
+ let needed = self
+ .rc
+ .get(hash.as_ref())?
+ .map(|x| u64_from_bytes(x.as_ref()) > 0)
+ .unwrap_or(false);
+ if needed {
+ let path = self.block_path(hash);
+ let exists = fs::metadata(&path).await.is_ok();
+ Ok(!exists)
+ } else {
+ Ok(false)
+ }
+ }
+
+ fn block_dir(&self, hash: &Hash) -> PathBuf {
+ let mut path = self.data_dir.clone();
+ path.push(hex::encode(&hash.as_slice()[0..1]));
+ path.push(hex::encode(&hash.as_slice()[1..2]));
+ path
+ }
+ fn block_path(&self, hash: &Hash) -> PathBuf {
+ let mut path = self.block_dir(hash);
+ path.push(hex::encode(hash.as_ref()));
+ path
+ }
+
+ pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
+ let old_rc = self.rc.get(&hash)?;
+ self.rc.merge(&hash, vec![1])?;
+ if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
+ self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?;
+ }
+ Ok(())
+ }
+
+ pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
+ let new_rc = self.rc.merge(&hash, vec![0])?;
+ if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
+ self.put_to_resync(&hash, 0)?;
+ }
+ Ok(())
+ }
+
+ fn put_to_resync(&self, hash: &Hash, delay_millis: u64) -> Result<(), Error> {
+ let when = now_msec() + delay_millis;
+ trace!("Put resync_queue: {} {:?}", when, hash);
+ let mut key = u64::to_be_bytes(when).to_vec();
+ key.extend(hash.as_ref());
+ self.resync_queue.insert(key, hash.as_ref())?;
+ self.resync_notify.notify();
+ Ok(())
+ }
+
+ async fn resync_loop(
+ self: Arc<Self>,
+ mut must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ let mut n_failures = 0usize;
+ while !*must_exit.borrow() {
+ if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
+ let time_msec = u64_from_bytes(&time_bytes[0..8]);
+ let now = now_msec();
+ if now >= time_msec {
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(hash_bytes.as_ref());
+ let hash = Hash::from(hash);
+
+ if let Err(e) = self.resync_iter(&hash).await {
+ warn!("Failed to resync block {:?}, retrying later: {}", hash, e);
+ self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?;
+ n_failures += 1;
+ if n_failures >= 10 {
+ warn!("Too many resync failures, throttling.");
+ tokio::time::delay_for(Duration::from_secs(1)).await;
+ }
+ } else {
+ n_failures = 0;
+ }
+ } else {
+ self.resync_queue.insert(time_bytes, hash_bytes)?;
+ let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now));
+ select! {
+ _ = delay.fuse() => (),
+ _ = self.resync_notify.notified().fuse() => (),
+ _ = must_exit.recv().fuse() => (),
+ }
+ }
+ } else {
+ select! {
+ _ = self.resync_notify.notified().fuse() => (),
+ _ = must_exit.recv().fuse() => (),
+ }
+ }
+ }
+ Ok(())
+ }
+
+ async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> {
+ let path = self.block_path(hash);
+
+ let exists = fs::metadata(&path).await.is_ok();
+ let needed = self
+ .rc
+ .get(hash.as_ref())?
+ .map(|x| u64_from_bytes(x.as_ref()) > 0)
+ .unwrap_or(false);
+
+ if exists != needed {
+ info!(
+ "Resync block {:?}: exists {}, needed {}",
+ hash, exists, needed
+ );
+ }
+
+ if exists && !needed {
+ let garage = self.garage.load_full().unwrap();
+ let active_refs = garage
+ .block_ref_table
+ .get_range(&hash, None, Some(()), 1)
+ .await?;
+ let needed_by_others = !active_refs.is_empty();
+ if needed_by_others {
+ let ring = self.system.ring.borrow().clone();
+ let who = self.replication.replication_nodes(&hash, &ring);
+ let msg = Arc::new(Message::NeedBlockQuery(*hash));
+ let who_needs_fut = who.iter().map(|to| {
+ self.rpc_client
+ .call_arc(*to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT)
+ });
+ let who_needs = join_all(who_needs_fut).await;
+
+ let mut need_nodes = vec![];
+ for (node, needed) in who.into_iter().zip(who_needs.iter()) {
+ match needed {
+ Ok(Message::NeedBlockReply(needed)) => {
+ if *needed {
+ need_nodes.push(node);
+ }
+ }
+ Err(e) => {
+ return Err(Error::Message(format!(
+ "Should delete block, but unable to confirm that all other nodes that need it have it: {}",
+ e
+ )));
+ }
+ Ok(_) => {
+ return Err(Error::Message(format!(
+ "Unexpected response to NeedBlockQuery RPC"
+ )));
+ }
+ }
+ }
+
+ if need_nodes.len() > 0 {
+ let put_block_message = self.read_block(hash).await?;
+ self.rpc_client
+ .try_call_many(
+ &need_nodes[..],
+ put_block_message,
+ RequestStrategy::with_quorum(need_nodes.len())
+ .with_timeout(BLOCK_RW_TIMEOUT),
+ )
+ .await?;
+ }
+ }
+ fs::remove_file(path).await?;
+ self.resync_queue.remove(&hash)?;
+ }
+
+ if needed && !exists {
+ // TODO find a way to not do this if they are sending it to us
+ // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay
+ // between the RC being incremented and this part being called.
+ let block_data = self.rpc_get_block(&hash).await?;
+ self.write_block(hash, &block_data[..]).await?;
+ }
+
+ Ok(())
+ }
+
+ pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
+ let who = self.replication.read_nodes(&hash, &self.system);
+ let resps = self
+ .rpc_client
+ .try_call_many(
+ &who[..],
+ Message::GetBlock(*hash),
+ RequestStrategy::with_quorum(1)
+ .with_timeout(BLOCK_RW_TIMEOUT)
+ .interrupt_after_quorum(true),
+ )
+ .await?;
+
+ for resp in resps {
+ if let Message::PutBlock(msg) = resp {
+ return Ok(msg.data);
+ }
+ }
+ Err(Error::Message(format!(
+ "Unable to read block {:?}: no valid blocks returned",
+ hash
+ )))
+ }
+
+ pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
+ let who = self.replication.write_nodes(&hash, &self.system);
+ self.rpc_client
+ .try_call_many(
+ &who[..],
+ Message::PutBlock(PutBlockMessage { hash, data }),
+ RequestStrategy::with_quorum(self.replication.write_quorum())
+ .with_timeout(BLOCK_RW_TIMEOUT),
+ )
+ .await?;
+ Ok(())
+ }
+
+ pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
+ // 1. Repair blocks from RC table
+ let garage = self.garage.load_full().unwrap();
+ let mut last_hash = None;
+ let mut i = 0usize;
+ for entry in garage.block_ref_table.store.iter() {
+ let (_k, v_bytes) = entry?;
+ let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(v_bytes.as_ref())?;
+ if Some(&block_ref.block) == last_hash.as_ref() {
+ continue;
+ }
+ if !block_ref.deleted {
+ last_hash = Some(block_ref.block);
+ self.put_to_resync(&block_ref.block, 0)?;
+ }
+ i += 1;
+ if i & 0xFF == 0 && *must_exit.borrow() {
+ return Ok(());
+ }
+ }
+
+ // 2. Repair blocks actually on disk
+ let mut ls_data_dir = fs::read_dir(&self.data_dir).await?;
+ while let Some(data_dir_ent) = ls_data_dir.next().await {
+ let data_dir_ent = data_dir_ent?;
+ let dir_name = data_dir_ent.file_name();
+ let dir_name = match dir_name.into_string() {
+ Ok(x) => x,
+ Err(_) => continue,
+ };
+ if dir_name.len() != 2 || hex::decode(&dir_name).is_err() {
+ continue;
+ }
+
+ let mut ls_data_dir_2 = match fs::read_dir(data_dir_ent.path()).await {
+ Err(e) => {
+ warn!(
+ "Warning: could not list dir {:?}: {}",
+ data_dir_ent.path().to_str(),
+ e
+ );
+ continue;
+ }
+ Ok(x) => x,
+ };
+ while let Some(file) = ls_data_dir_2.next().await {
+ let file = file?;
+ let file_name = file.file_name();
+ let file_name = match file_name.into_string() {
+ Ok(x) => x,
+ Err(_) => continue,
+ };
+ if file_name.len() != 64 {
+ continue;
+ }
+ let hash_bytes = match hex::decode(&file_name) {
+ Ok(h) => h,
+ Err(_) => continue,
+ };
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&hash_bytes[..]);
+ self.put_to_resync(&hash.into(), 0)?;
+
+ if *must_exit.borrow() {
+ return Ok(());
+ }
+ }
+ }
+ Ok(())
+ }
+}
+
+fn u64_from_bytes(bytes: &[u8]) -> u64 {
+ assert!(bytes.len() == 8);
+ let mut x8 = [0u8; 8];
+ x8.copy_from_slice(bytes);
+ u64::from_be_bytes(x8)
+}
+
+fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> {
+ let old = old.map(u64_from_bytes).unwrap_or(0);
+ assert!(new.len() == 1);
+ let new = match new[0] {
+ 0 => {
+ if old > 0 {
+ old - 1
+ } else {
+ 0
+ }
+ }
+ 1 => old + 1,
+ _ => unreachable!(),
+ };
+ if new == 0 {
+ None
+ } else {
+ Some(u64::to_be_bytes(new).to_vec())
+ }
+}
diff --git a/src/core/block_ref_table.rs b/src/core/block_ref_table.rs
new file mode 100644
index 00000000..a00438c0
--- /dev/null
+++ b/src/core/block_ref_table.rs
@@ -0,0 +1,68 @@
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+use std::sync::Arc;
+
+use garage_util::background::*;
+use garage_util::data::*;
+use garage_util::error::Error;
+
+use garage_table::*;
+
+use crate::block::*;
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct BlockRef {
+ // Primary key
+ pub block: Hash,
+
+ // Sort key
+ pub version: UUID,
+
+ // Keep track of deleted status
+ pub deleted: bool,
+}
+
+impl Entry<Hash, UUID> for BlockRef {
+ fn partition_key(&self) -> &Hash {
+ &self.block
+ }
+ fn sort_key(&self) -> &UUID {
+ &self.version
+ }
+
+ fn merge(&mut self, other: &Self) {
+ if other.deleted {
+ self.deleted = true;
+ }
+ }
+}
+
+pub struct BlockRefTable {
+ pub background: Arc<BackgroundRunner>,
+ pub block_manager: Arc<BlockManager>,
+}
+
+#[async_trait]
+impl TableSchema for BlockRefTable {
+ type P = Hash;
+ type S = UUID;
+ type E = BlockRef;
+ type Filter = ();
+
+ async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
+ let block = &old.as_ref().or(new.as_ref()).unwrap().block;
+ let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false);
+ let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false);
+ if is_after && !was_before {
+ self.block_manager.block_incref(block)?;
+ }
+ if was_before && !is_after {
+ self.block_manager.block_decref(block)?;
+ }
+ Ok(())
+ }
+
+ fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool {
+ !entry.deleted
+ }
+}
diff --git a/src/core/bucket_table.rs b/src/core/bucket_table.rs
new file mode 100644
index 00000000..28234d82
--- /dev/null
+++ b/src/core/bucket_table.rs
@@ -0,0 +1,121 @@
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+
+use garage_table::*;
+use garage_util::error::Error;
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct Bucket {
+ // Primary key
+ pub name: String,
+
+ // Timestamp and deletion
+ // Upon version increment, all info is replaced
+ pub timestamp: u64,
+ pub deleted: bool,
+
+ // Authorized keys
+ authorized_keys: Vec<AllowedKey>,
+}
+
+impl Bucket {
+ pub fn new(
+ name: String,
+ timestamp: u64,
+ deleted: bool,
+ authorized_keys: Vec<AllowedKey>,
+ ) -> Self {
+ let mut ret = Bucket {
+ name,
+ timestamp,
+ deleted,
+ authorized_keys: vec![],
+ };
+ for key in authorized_keys {
+ ret.add_key(key)
+ .expect("Duplicate AllowedKey in Bucket constructor");
+ }
+ ret
+ }
+ /// Add a key only if it is not already present
+ pub fn add_key(&mut self, key: AllowedKey) -> Result<(), ()> {
+ match self
+ .authorized_keys
+ .binary_search_by(|k| k.key_id.cmp(&key.key_id))
+ {
+ Err(i) => {
+ self.authorized_keys.insert(i, key);
+ Ok(())
+ }
+ Ok(_) => Err(()),
+ }
+ }
+ pub fn authorized_keys(&self) -> &[AllowedKey] {
+ &self.authorized_keys[..]
+ }
+ pub fn clear_keys(&mut self) {
+ self.authorized_keys.clear();
+ }
+}
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct AllowedKey {
+ pub key_id: String,
+ pub timestamp: u64,
+ pub allow_read: bool,
+ pub allow_write: bool,
+}
+
+impl Entry<EmptyKey, String> for Bucket {
+ fn partition_key(&self) -> &EmptyKey {
+ &EmptyKey
+ }
+ fn sort_key(&self) -> &String {
+ &self.name
+ }
+
+ fn merge(&mut self, other: &Self) {
+ if other.timestamp < self.timestamp {
+ *self = other.clone();
+ return;
+ }
+ if self.timestamp > other.timestamp || self.deleted {
+ return;
+ }
+
+ for ak in other.authorized_keys.iter() {
+ match self
+ .authorized_keys
+ .binary_search_by(|our_ak| our_ak.key_id.cmp(&ak.key_id))
+ {
+ Ok(i) => {
+ let our_ak = &mut self.authorized_keys[i];
+ if ak.timestamp > our_ak.timestamp {
+ *our_ak = ak.clone();
+ }
+ }
+ Err(i) => {
+ self.authorized_keys.insert(i, ak.clone());
+ }
+ }
+ }
+ }
+}
+
+pub struct BucketTable;
+
+#[async_trait]
+impl TableSchema for BucketTable {
+ type P = EmptyKey;
+ type S = String;
+ type E = Bucket;
+ type Filter = ();
+
+ async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> {
+ Ok(())
+ }
+
+ fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool {
+ !entry.deleted
+ }
+}
diff --git a/src/core/garage.rs b/src/core/garage.rs
new file mode 100644
index 00000000..d77b0acd
--- /dev/null
+++ b/src/core/garage.rs
@@ -0,0 +1,166 @@
+use std::sync::Arc;
+
+use garage_util::background::*;
+use garage_util::config::*;
+
+use garage_rpc::membership::System;
+use garage_rpc::rpc_client::RpcHttpClient;
+use garage_rpc::rpc_server::RpcServer;
+
+use garage_table::table_fullcopy::*;
+use garage_table::table_sharded::*;
+use garage_table::*;
+
+use crate::block::*;
+use crate::block_ref_table::*;
+use crate::bucket_table::*;
+use crate::key_table::*;
+use crate::object_table::*;
+use crate::version_table::*;
+
+pub struct Garage {
+ pub config: Config,
+
+ pub db: sled::Db,
+ pub background: Arc<BackgroundRunner>,
+ pub system: Arc<System>,
+ pub block_manager: Arc<BlockManager>,
+
+ pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>,
+ pub key_table: Arc<Table<KeyTable, TableFullReplication>>,
+
+ pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
+ pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
+ pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
+}
+
+impl Garage {
+ pub async fn new(
+ config: Config,
+ db: sled::Db,
+ background: Arc<BackgroundRunner>,
+ rpc_server: &mut RpcServer,
+ ) -> Arc<Self> {
+ info!("Initialize membership management system...");
+ let rpc_http_client = Arc::new(
+ RpcHttpClient::new(config.max_concurrent_rpc_requests, &config.rpc_tls)
+ .expect("Could not create RPC client"),
+ );
+ let system = System::new(
+ config.metadata_dir.clone(),
+ rpc_http_client,
+ background.clone(),
+ rpc_server,
+ );
+
+ let data_rep_param = TableShardedReplication {
+ replication_factor: config.data_replication_factor,
+ write_quorum: (config.data_replication_factor + 1) / 2,
+ read_quorum: 1,
+ };
+
+ let meta_rep_param = TableShardedReplication {
+ replication_factor: config.meta_replication_factor,
+ write_quorum: (config.meta_replication_factor + 1) / 2,
+ read_quorum: (config.meta_replication_factor + 1) / 2,
+ };
+
+ let control_rep_param = TableFullReplication::new(
+ config.meta_epidemic_factor,
+ (config.meta_epidemic_factor + 1) / 2,
+ );
+
+ info!("Initialize block manager...");
+ let block_manager = BlockManager::new(
+ &db,
+ config.data_dir.clone(),
+ data_rep_param.clone(),
+ system.clone(),
+ rpc_server,
+ );
+
+ info!("Initialize block_ref_table...");
+ let block_ref_table = Table::new(
+ BlockRefTable {
+ background: background.clone(),
+ block_manager: block_manager.clone(),
+ },
+ data_rep_param.clone(),
+ system.clone(),
+ &db,
+ "block_ref".to_string(),
+ rpc_server,
+ )
+ .await;
+
+ info!("Initialize version_table...");
+ let version_table = Table::new(
+ VersionTable {
+ background: background.clone(),
+ block_ref_table: block_ref_table.clone(),
+ },
+ meta_rep_param.clone(),
+ system.clone(),
+ &db,
+ "version".to_string(),
+ rpc_server,
+ )
+ .await;
+
+ info!("Initialize object_table...");
+ let object_table = Table::new(
+ ObjectTable {
+ background: background.clone(),
+ version_table: version_table.clone(),
+ },
+ meta_rep_param.clone(),
+ system.clone(),
+ &db,
+ "object".to_string(),
+ rpc_server,
+ )
+ .await;
+
+ info!("Initialize bucket_table...");
+ let bucket_table = Table::new(
+ BucketTable,
+ control_rep_param.clone(),
+ system.clone(),
+ &db,
+ "bucket".to_string(),
+ rpc_server,
+ )
+ .await;
+
+ info!("Initialize key_table_table...");
+ let key_table = Table::new(
+ KeyTable,
+ control_rep_param.clone(),
+ system.clone(),
+ &db,
+ "key".to_string(),
+ rpc_server,
+ )
+ .await;
+
+ info!("Initialize Garage...");
+ let garage = Arc::new(Self {
+ config,
+ db,
+ system: system.clone(),
+ block_manager,
+ background,
+ bucket_table,
+ key_table,
+ object_table,
+ version_table,
+ block_ref_table,
+ });
+
+ info!("Start block manager background thread...");
+ garage.block_manager.garage.swap(Some(garage.clone()));
+ garage.block_manager.clone().spawn_background_worker().await;
+
+ garage
+ }
+}
diff --git a/src/core/key_table.rs b/src/core/key_table.rs
new file mode 100644
index 00000000..76d163b5
--- /dev/null
+++ b/src/core/key_table.rs
@@ -0,0 +1,154 @@
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+
+use garage_table::*;
+use garage_util::data::*;
+use garage_util::error::Error;
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct Key {
+ // Primary key
+ pub key_id: String,
+
+ // Associated secret key (immutable)
+ pub secret_key: String,
+
+ // Name
+ pub name: String,
+ pub name_timestamp: u64,
+
+ // Deletion
+ pub deleted: bool,
+
+ // Authorized keys
+ authorized_buckets: Vec<AllowedBucket>,
+}
+
+impl Key {
+ pub fn new(name: String, buckets: Vec<AllowedBucket>) -> Self {
+ let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..]));
+ let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]);
+ let mut ret = Self {
+ key_id,
+ secret_key,
+ name,
+ name_timestamp: now_msec(),
+ deleted: false,
+ authorized_buckets: vec![],
+ };
+ for b in buckets {
+ ret.add_bucket(b)
+ .expect("Duplicate AllowedBucket in Key constructor");
+ }
+ ret
+ }
+ pub fn delete(key_id: String) -> Self {
+ Self {
+ key_id,
+ secret_key: "".into(),
+ name: "".into(),
+ name_timestamp: now_msec(),
+ deleted: true,
+ authorized_buckets: vec![],
+ }
+ }
+ /// Add an authorized bucket, only if it wasn't there before
+ pub fn add_bucket(&mut self, new: AllowedBucket) -> Result<(), ()> {
+ match self
+ .authorized_buckets
+ .binary_search_by(|b| b.bucket.cmp(&new.bucket))
+ {
+ Err(i) => {
+ self.authorized_buckets.insert(i, new);
+ Ok(())
+ }
+ Ok(_) => Err(()),
+ }
+ }
+ pub fn authorized_buckets(&self) -> &[AllowedBucket] {
+ &self.authorized_buckets[..]
+ }
+ pub fn clear_buckets(&mut self) {
+ self.authorized_buckets.clear();
+ }
+ pub fn allow_read(&self, bucket: &str) -> bool {
+ self.authorized_buckets
+ .iter()
+ .find(|x| x.bucket.as_str() == bucket)
+ .map(|x| x.allow_read)
+ .unwrap_or(false)
+ }
+ pub fn allow_write(&self, bucket: &str) -> bool {
+ self.authorized_buckets
+ .iter()
+ .find(|x| x.bucket.as_str() == bucket)
+ .map(|x| x.allow_write)
+ .unwrap_or(false)
+ }
+}
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct AllowedBucket {
+ pub bucket: String,
+ pub timestamp: u64,
+ pub allow_read: bool,
+ pub allow_write: bool,
+}
+
+impl Entry<EmptyKey, String> for Key {
+ fn partition_key(&self) -> &EmptyKey {
+ &EmptyKey
+ }
+ fn sort_key(&self) -> &String {
+ &self.key_id
+ }
+
+ fn merge(&mut self, other: &Self) {
+ if other.deleted {
+ self.deleted = true;
+ }
+ if self.deleted {
+ self.authorized_buckets.clear();
+ return;
+ }
+ if other.name_timestamp > self.name_timestamp {
+ self.name_timestamp = other.name_timestamp;
+ self.name = other.name.clone();
+ }
+
+ for ab in other.authorized_buckets.iter() {
+ match self
+ .authorized_buckets
+ .binary_search_by(|our_ab| our_ab.bucket.cmp(&ab.bucket))
+ {
+ Ok(i) => {
+ let our_ab = &mut self.authorized_buckets[i];
+ if ab.timestamp > our_ab.timestamp {
+ *our_ab = ab.clone();
+ }
+ }
+ Err(i) => {
+ self.authorized_buckets.insert(i, ab.clone());
+ }
+ }
+ }
+ }
+}
+
+pub struct KeyTable;
+
+#[async_trait]
+impl TableSchema for KeyTable {
+ type P = EmptyKey;
+ type S = String;
+ type E = Key;
+ type Filter = ();
+
+ async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> {
+ Ok(())
+ }
+
+ fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool {
+ !entry.deleted
+ }
+}
diff --git a/src/core/lib.rs b/src/core/lib.rs
new file mode 100644
index 00000000..b4a8ddb7
--- /dev/null
+++ b/src/core/lib.rs
@@ -0,0 +1,10 @@
+#[macro_use]
+extern crate log;
+
+pub mod block;
+pub mod block_ref_table;
+pub mod bucket_table;
+pub mod garage;
+pub mod key_table;
+pub mod object_table;
+pub mod version_table;
diff --git a/src/core/object_table.rs b/src/core/object_table.rs
new file mode 100644
index 00000000..1fe2b3d4
--- /dev/null
+++ b/src/core/object_table.rs
@@ -0,0 +1,165 @@
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+use std::sync::Arc;
+
+use garage_util::background::BackgroundRunner;
+use garage_util::data::*;
+use garage_util::error::Error;
+
+use garage_table::table_sharded::*;
+use garage_table::*;
+
+use crate::version_table::*;
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct Object {
+ // Primary key
+ pub bucket: String,
+
+ // Sort key
+ pub key: String,
+
+ // Data
+ versions: Vec<ObjectVersion>,
+}
+
+impl Object {
+ pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self {
+ let mut ret = Self {
+ bucket,
+ key,
+ versions: vec![],
+ };
+ for v in versions {
+ ret.add_version(v)
+ .expect("Twice the same ObjectVersion in Object constructor");
+ }
+ ret
+ }
+ /// Adds a version if it wasn't already present
+ pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> {
+ match self
+ .versions
+ .binary_search_by(|v| v.cmp_key().cmp(&new.cmp_key()))
+ {
+ Err(i) => {
+ self.versions.insert(i, new);
+ Ok(())
+ }
+ Ok(_) => Err(()),
+ }
+ }
+ pub fn versions(&self) -> &[ObjectVersion] {
+ &self.versions[..]
+ }
+}
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct ObjectVersion {
+ pub uuid: UUID,
+ pub timestamp: u64,
+
+ pub mime_type: String,
+ pub size: u64,
+ pub is_complete: bool,
+
+ pub data: ObjectVersionData,
+}
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub enum ObjectVersionData {
+ DeleteMarker,
+ Inline(#[serde(with = "serde_bytes")] Vec<u8>),
+ FirstBlock(Hash),
+}
+
+impl ObjectVersion {
+ fn cmp_key(&self) -> (u64, &UUID) {
+ (self.timestamp, &self.uuid)
+ }
+}
+
+impl Entry<String, String> for Object {
+ fn partition_key(&self) -> &String {
+ &self.bucket
+ }
+ fn sort_key(&self) -> &String {
+ &self.key
+ }
+
+ fn merge(&mut self, other: &Self) {
+ for other_v in other.versions.iter() {
+ match self
+ .versions
+ .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key()))
+ {
+ Ok(i) => {
+ let mut v = &mut self.versions[i];
+ if other_v.size > v.size {
+ v.size = other_v.size;
+ }
+ if other_v.is_complete && !v.is_complete {
+ v.is_complete = true;
+ }
+ }
+ Err(i) => {
+ self.versions.insert(i, other_v.clone());
+ }
+ }
+ }
+ let last_complete = self
+ .versions
+ .iter()
+ .enumerate()
+ .rev()
+ .filter(|(_, v)| v.is_complete)
+ .next()
+ .map(|(vi, _)| vi);
+
+ if let Some(last_vi) = last_complete {
+ self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>();
+ }
+ }
+}
+
+pub struct ObjectTable {
+ pub background: Arc<BackgroundRunner>,
+ pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
+}
+
+#[async_trait]
+impl TableSchema for ObjectTable {
+ type P = String;
+ type S = String;
+ type E = Object;
+ type Filter = ();
+
+ async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
+ let version_table = self.version_table.clone();
+ if let (Some(old_v), Some(new_v)) = (old, new) {
+ // Propagate deletion of old versions
+ for v in old_v.versions.iter() {
+ if new_v
+ .versions
+ .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
+ .is_err()
+ {
+ let deleted_version = Version::new(
+ v.uuid,
+ old_v.bucket.clone(),
+ old_v.key.clone(),
+ true,
+ vec![],
+ );
+ version_table.insert(&deleted_version).await?;
+ }
+ }
+ }
+ Ok(())
+ }
+
+ fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
+ // TODO
+ true
+ }
+}
diff --git a/src/core/version_table.rs b/src/core/version_table.rs
new file mode 100644
index 00000000..ae32e5cb
--- /dev/null
+++ b/src/core/version_table.rs
@@ -0,0 +1,131 @@
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+use std::sync::Arc;
+
+use garage_util::background::BackgroundRunner;
+use garage_util::data::*;
+use garage_util::error::Error;
+
+use garage_table::table_sharded::*;
+use garage_table::*;
+
+use crate::block_ref_table::*;
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct Version {
+ // Primary key
+ pub uuid: UUID,
+
+ // Actual data: the blocks for this version
+ pub deleted: bool,
+ blocks: Vec<VersionBlock>,
+
+ // Back link to bucket+key so that we can figure if
+ // this was deleted later on
+ pub bucket: String,
+ pub key: String,
+}
+
+impl Version {
+ pub fn new(
+ uuid: UUID,
+ bucket: String,
+ key: String,
+ deleted: bool,
+ blocks: Vec<VersionBlock>,
+ ) -> Self {
+ let mut ret = Self {
+ uuid,
+ deleted,
+ blocks: vec![],
+ bucket,
+ key,
+ };
+ for b in blocks {
+ ret.add_block(b)
+ .expect("Twice the same VersionBlock in Version constructor");
+ }
+ ret
+ }
+ /// Adds a block if it wasn't already present
+ pub fn add_block(&mut self, new: VersionBlock) -> Result<(), ()> {
+ match self.blocks.binary_search_by(|b| b.offset.cmp(&new.offset)) {
+ Err(i) => {
+ self.blocks.insert(i, new);
+ Ok(())
+ }
+ Ok(_) => Err(()),
+ }
+ }
+ pub fn blocks(&self) -> &[VersionBlock] {
+ &self.blocks[..]
+ }
+}
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct VersionBlock {
+ pub offset: u64,
+ pub hash: Hash,
+}
+
+impl Entry<Hash, EmptyKey> for Version {
+ fn partition_key(&self) -> &Hash {
+ &self.uuid
+ }
+ fn sort_key(&self) -> &EmptyKey {
+ &EmptyKey
+ }
+
+ fn merge(&mut self, other: &Self) {
+ if other.deleted {
+ self.deleted = true;
+ self.blocks.clear();
+ } else if !self.deleted {
+ for bi in other.blocks.iter() {
+ match self.blocks.binary_search_by(|x| x.offset.cmp(&bi.offset)) {
+ Ok(_) => (),
+ Err(pos) => {
+ self.blocks.insert(pos, bi.clone());
+ }
+ }
+ }
+ }
+ }
+}
+
+pub struct VersionTable {
+ pub background: Arc<BackgroundRunner>,
+ pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
+}
+
+#[async_trait]
+impl TableSchema for VersionTable {
+ type P = Hash;
+ type S = EmptyKey;
+ type E = Version;
+ type Filter = ();
+
+ async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
+ let block_ref_table = self.block_ref_table.clone();
+ if let (Some(old_v), Some(new_v)) = (old, new) {
+ // Propagate deletion of version blocks
+ if new_v.deleted && !old_v.deleted {
+ let deleted_block_refs = old_v
+ .blocks
+ .iter()
+ .map(|vb| BlockRef {
+ block: vb.hash,
+ version: old_v.uuid,
+ deleted: true,
+ })
+ .collect::<Vec<_>>();
+ block_ref_table.insert_many(&deleted_block_refs[..]).await?;
+ }
+ }
+ Ok(())
+ }
+
+ fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool {
+ !entry.deleted
+ }
+}