diff options
-rw-r--r-- | src/garage/admin.rs | 2 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 3 | ||||
-rw-r--r-- | src/model/block.rs | 228 |
3 files changed, 182 insertions, 51 deletions
diff --git a/src/garage/admin.rs b/src/garage/admin.rs index b5fc9a7e..c3a83d02 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -446,7 +446,7 @@ impl AdminRpcHandler { if opt.detailed { writeln!( &mut ret, - " number of blocks: {}", + " number of RC entries (~= number of blocks): {}", self.garage.block_manager.rc_len() ) .unwrap(); diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 0df6ef87..b930d8a8 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -8,8 +8,7 @@ pub enum Command { #[structopt(name = "server")] Server, - /// Print identifier (public key) of this garage node. - /// Generates a new keypair if necessary. + /// Print identifier (public key) of this Garage node #[structopt(name = "node-id")] NodeId(NodeIdOpt), diff --git a/src/model/block.rs b/src/model/block.rs index 406abf7b..3f40aaab 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -31,10 +31,20 @@ pub const INLINE_THRESHOLD: usize = 3072; pub const BACKGROUND_WORKERS: u64 = 1; pub const BACKGROUND_TRANQUILITY: u32 = 3; -const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42); -const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60); +// Timeout for RPCs that read and write blocks to remote nodes +const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30); +// Timeout for RPCs that ask other nodes whether they need a copy +// of a given block before we delete it locally const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); -const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); + +// The delay between the time where a resync operation fails +// and the time when it is retried. +const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60); + +// The delay between the moment when the reference counter +// drops to zero, and the moment where we allow ourselves +// to delete the block locally. +const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); /// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] @@ -180,7 +190,7 @@ impl BlockManager { /// that are required because of refcount > 0, and will try /// to fix any mismatch between the two. pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { - // 1. Repair blocks from RC table + // 1. Repair blocks from RC table. let garage = self.garage.load_full().unwrap(); let mut last_hash = None; for (i, entry) in garage.block_ref_table.data.store.iter().enumerate() { @@ -245,40 +255,51 @@ impl BlockManager { /// Increment the number of time a block is used, putting it to resynchronization if it is /// required, but not known pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { - let old_rc = self.rc.fetch_and_update(&hash, |old| { - let old_v = old.map(u64_from_be_bytes).unwrap_or(0); - Some(u64::to_be_bytes(old_v + 1).to_vec()) - })?; - let old_rc = old_rc.map(u64_from_be_bytes).unwrap_or(0); - if old_rc == 0 { - self.put_to_resync(hash, BLOCK_RW_TIMEOUT)?; + let old_rc = self + .rc + .fetch_and_update(&hash, |old| RcEntry::parse_opt(old).increment().serialize())?; + let old_rc = RcEntry::parse_opt(old_rc); + if old_rc.is_zero() { + // When the reference counter is incremented, there is + // normally a node that is responsible for sending us the + // data of the block. However that operation may fail, + // so in all cases we add the block here to the todo list + // to check later that it arrived correctly, and if not + // we will fecth it from someone. + self.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?; } Ok(()) } /// Decrement the number of time a block is used pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { - let new_rc = self.rc.update_and_fetch(&hash, |old| { - let old_v = old.map(u64_from_be_bytes).unwrap_or(0); - if old_v > 1 { - Some(u64::to_be_bytes(old_v - 1).to_vec()) - } else { - None - } - })?; - if new_rc.is_none() { - self.put_to_resync(hash, BLOCK_GC_TIMEOUT)?; + let new_rc = self + .rc + .update_and_fetch(&hash, |old| RcEntry::parse_opt(old).decrement().serialize())?; + let new_rc = RcEntry::parse_opt(new_rc); + if let RcEntry::Deletable { .. } = new_rc { + self.put_to_resync(hash, BLOCK_GC_DELAY + Duration::from_secs(10))?; } Ok(()) } /// Read a block's reference count - pub fn get_block_rc(&self, hash: &Hash) -> Result<u64, Error> { - Ok(self - .rc - .get(hash.as_ref())? - .map(u64_from_be_bytes) - .unwrap_or(0)) + fn get_block_rc(&self, hash: &Hash) -> Result<RcEntry, Error> { + Ok(RcEntry::parse_opt(self.rc.get(hash.as_ref())?)) + } + + /// Delete an entry in the RC table if it is deletable and the + /// deletion time has passed + fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> { + let now = now_msec(); + self.rc.update_and_fetch(&hash, |rcval| { + let updated = match RcEntry::parse_opt(rcval) { + RcEntry::Deletable { at_time } if now > at_time => RcEntry::Absent, + v => v, + }; + updated.serialize() + })?; + Ok(()) } // ---- Reading and writing blocks locally ---- @@ -300,7 +321,7 @@ impl BlockManager { Ok(f) => f, Err(e) => { // Not found but maybe we should have had it ?? - self.put_to_resync(hash, Duration::from_millis(0))?; + self.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?; return Err(Into::into(e)); } }; @@ -314,6 +335,7 @@ impl BlockManager { .await .move_block_to_corrupted(hash, self) .await?; + self.put_to_resync(hash, Duration::from_millis(0))?; return Err(Error::CorruptData(*hash)); } @@ -328,7 +350,7 @@ impl BlockManager { .await .check_block_status(hash, self) .await?; - Ok(needed && !exists) + Ok(needed.is_nonzero() && !exists) } /// Utility: gives the path of the directory in which a block should be found @@ -349,7 +371,7 @@ impl BlockManager { // ---- Resync loop ---- pub fn spawn_background_worker(self: Arc<Self>) { - // Launch 2 simultaneous workers for background resync loop preprocessing + // Launch n simultaneous workers for background resync loop preprocessing for i in 0..BACKGROUND_WORKERS { let bm2 = self.clone(); let background = self.system.background.clone(); @@ -407,7 +429,7 @@ impl BlockManager { let res = self.resync_block(&hash).await; if let Err(e) = &res { warn!("Error when resyncing {:?}: {}", hash, e); - self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?; + self.put_to_resync(&hash, RESYNC_RETRY_DELAY)?; } Ok(true) } else { @@ -437,15 +459,18 @@ impl BlockManager { .check_block_status(hash, self) .await?; - if exists != needed { - info!( - "Resync block {:?}: exists {}, needed {}", - hash, exists, needed + if exists != needed.is_needed() || exists != needed.is_nonzero() { + debug!( + "Resync block {:?}: exists {}, nonzero rc {}, deletable {}", + hash, + exists, + needed.is_nonzero(), + needed.is_deletable(), ); } - if exists && !needed { - trace!("Offloading block {:?}", hash); + if exists && needed.is_deletable() { + info!("Resync block {:?}: offloading and deleting", hash); let mut who = self.replication.write_nodes(hash); if who.len() < self.replication.write_quorum() { @@ -488,7 +513,7 @@ impl BlockManager { need_nodes.len() ); - let put_block_message = self.read_block(hash).await.err_context("PutBlock RPC")?; + let put_block_message = self.read_block(hash).await?; self.system .rpc .try_call_many( @@ -499,10 +524,11 @@ impl BlockManager { .with_quorum(need_nodes.len()) .with_timeout(BLOCK_RW_TIMEOUT), ) - .await?; + .await + .err_context("PutBlock RPC")?; } info!( - "Deleting block {:?}, offload finished ({} / {})", + "Deleting unneeded block {:?}, offload finished ({} / {})", hash, need_nodes.len(), who.len() @@ -513,12 +539,16 @@ impl BlockManager { .await .delete_if_unneeded(hash, self) .await?; + + self.clear_deleted_block_rc(hash)?; } - if needed && !exists { - // TODO find a way to not do this if they are sending it to us - // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay - // between the RC being incremented and this part being called. + if needed.is_nonzero() && !exists { + info!( + "Resync block {:?}: fetching absent but needed block (refcount > 0)", + hash + ); + let block_data = self.rpc_get_block(hash).await?; self.write_block(hash, &block_data[..]).await?; } @@ -526,6 +556,8 @@ impl BlockManager { Ok(()) } + // ---- Utility: iteration on files in the data directory ---- + async fn for_each_file<F, Fut, State>( &self, state: State, @@ -608,7 +640,7 @@ impl EndpointHandler<BlockRpc> for BlockManager { struct BlockStatus { exists: bool, - needed: bool, + needed: RcEntry, } impl BlockManagerLocked { @@ -620,7 +652,7 @@ impl BlockManagerLocked { let path = mgr.block_path(hash); let exists = fs::metadata(&path).await.is_ok(); - let needed = mgr.get_block_rc(hash)? > 0; + let needed = mgr.get_block_rc(hash)?; Ok(BlockStatus { exists, needed }) } @@ -659,14 +691,13 @@ impl BlockManagerLocked { let mut path2 = path.clone(); path2.set_extension("corrupted"); fs::rename(path, path2).await?; - mgr.put_to_resync(hash, Duration::from_millis(0))?; Ok(()) } async fn delete_if_unneeded(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> { let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?; - if exists && !needed { + if exists && needed.is_deletable() { let path = mgr.block_path(hash); fs::remove_file(path).await?; } @@ -680,3 +711,104 @@ fn u64_from_be_bytes<T: AsRef<[u8]>>(bytes: T) -> u64 { x8.copy_from_slice(bytes.as_ref()); u64::from_be_bytes(x8) } + +/// Describes the state of the reference counter for a block +#[derive(Clone, Copy, Debug)] +enum RcEntry { + /// Present: the block has `count` references, with `count` > 0. + /// + /// This is stored as u64::to_be_bytes(count) + Present { count: u64 }, + + /// Deletable: the block has zero references, and can be deleted + /// once time (returned by now_msec) is larger than at_time + /// (in millis since Unix epoch) + /// + /// This is stored as [0u8; 8] followed by u64::to_be_bytes(at_time), + /// (this allows for the data format to be backwards compatible with + /// previous Garage versions that didn't have this intermediate state) + Deletable { at_time: u64 }, + + /// Absent: the block has zero references, and can be deleted + /// immediately + Absent, +} + +impl RcEntry { + fn parse(bytes: &[u8]) -> Self { + if bytes.len() == 8 { + RcEntry::Present { + count: u64::from_be_bytes(bytes.try_into().unwrap()), + } + } else if bytes.len() == 16 { + RcEntry::Deletable { + at_time: u64::from_be_bytes(bytes[8..16].try_into().unwrap()), + } + } else { + panic!("Invalid RC entry: {:?}, database is corrupted. This is an error Garage is currently unable to recover from. Sorry, and also please report a bug.", + bytes + ) + } + } + + fn parse_opt<V: AsRef<[u8]>>(bytes: Option<V>) -> Self { + bytes + .map(|b| Self::parse(b.as_ref())) + .unwrap_or(Self::Absent) + } + + fn serialize(self) -> Option<Vec<u8>> { + match self { + RcEntry::Present { count } => Some(u64::to_be_bytes(count).to_vec()), + RcEntry::Deletable { at_time } => { + Some([u64::to_be_bytes(0), u64::to_be_bytes(at_time)].concat()) + } + RcEntry::Absent => None, + } + } + + fn increment(self) -> Self { + let old_count = match self { + RcEntry::Present { count } => count, + _ => 0, + }; + RcEntry::Present { + count: old_count + 1, + } + } + + fn decrement(self) -> Self { + match self { + RcEntry::Present { count } => { + if count > 1 { + RcEntry::Present { count: count - 1 } + } else { + RcEntry::Deletable { + at_time: now_msec() + BLOCK_GC_DELAY.as_millis() as u64, + } + } + } + del => del, + } + } + + fn is_zero(&self) -> bool { + matches!(self, RcEntry::Deletable { .. } | RcEntry::Absent) + } + + fn is_nonzero(&self) -> bool { + !self.is_zero() + } + + fn is_deletable(&self) -> bool { + match self { + RcEntry::Present { .. } => false, + RcEntry::Deletable { at_time } => now_msec() > *at_time, + RcEntry::Absent => true, + } + } + + fn is_needed(&self) -> bool { + !self.is_deletable() + } +} |