aboutsummaryrefslogblamecommitdiff
path: root/src/table/table.rs
blob: 7d1ff31c911b1289848d3e248896c960e8312829 (plain) (tree)
1
2
3
4
5
6
7
8
9

                                                    
                   
 
                             
                       

                                    
 




                                                    

                    
                                              

                              
                                         
                                  
 

                               
 
                      
                   
                 
                     
                                    
                          

                     
                   
 
                                                       


                                                     
                                           
                               
                                                   

 
                                 
                                          
           
 
                              
                                           
 
                                                                                                   






                                                    
 
                                  

 

                                                   
 
 
                                                       

                                                                                             
                                                                                                

                                     
                                                                                          
 
                                                                                     
 
                                                                      
 

                                                                                                    
 

                                                               
                                           
                               
                             
                                       
                           
                               
                                 
                   
 
                                                          
 
                     

         




                                                                       

         
                                                                   













                                                                                       
                                                    
                                                                  
 
                                                                 
                                                             
 
                           
                                     
                                             
                                               
                                             
                                    
                                                                           
                                                                                           
                         
                                
 

                      
 




                                                                                                   
                                                                                          



                                                         
                                                                           
                                                                                  
 

                                                                                       
                                                                       
                                
 
                                                             
 

                      
 
                                                                                               



                                                         



























                                                                                            

                                                   
                                                                
                                                                                 
                                                                             


























                                                                                                   


                         




















                                                                                                                
                   

                                                                                                
                                                                              

                                                                      

                                        

































                                                                                                

                         









                                                                                         

         




                                          


                                                                           

                                                              
                                                                                       
                                                                       
                                
 
                                                             
 

                       
 




                                          
                                                
                                                                  
 
                                                                                            
                                
                               
                                     
                                       
                                               
                                     
                                    
                                                                           
                                                                                          
                         

                                
                                   
                                             
                                   
                                                                          
                                                              
                                                                                            


                                                                
                                                                   
                                                                                    
                                                                            
                                                         


                                                               
                                 
                                
                                                                                                       
                         
                 

                                               

                                                             




                                                                                                    

                         
 
                       

         


                                     
                                             

                                          
                                                    
                                       


                                                                                
                              






                                                  
                                                                                       
                                                                       
                                
 
                                                             
 

                       
 





                                             
                                                    
                                       
                                                
                                                                  
 






                                                         
 
                                
                               
                                     
                                       
                                               
                                     
                                    
                                                                           
                                                                                          
                         

                                

                                                                       
                                   
                                                                 
                                                                   

                                                                                                                    




                                                                                                    
                                                         
                                                 


                                                                                     

                                         

                                                                                

                         
 

                                                 



                                                                      
                                                 
                                                    


                                                                                                 
                                 
                           
                 





















                                                                                                




                                                                                 
                                                                                       
                                                                       
                           
                                     
                                       
                                               
                                    
                                                                      
                                                                                                   
                         

                                
         
 
 
              
                                                                                        




                                         
                           
                                                               
                                                                                 
                                                                      
                         













                                                                  
                                                            
                         
                                                    
                                                              
                                                
                         
                                                                   

                 
 
use std::borrow::Borrow;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;

use async_trait::async_trait;
use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;

use opentelemetry::{
	trace::{FutureExt, TraceContextExt, Tracer},
	Context,
};

use garage_db as db;

use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
use garage_util::migrate::Migrate;

use garage_rpc::system::System;
use garage_rpc::*;

use crate::crdt::Crdt;
use crate::data::*;
use crate::gc::*;
use crate::merkle::*;
use crate::queue::InsertQueueWorker;
use crate::replication::*;
use crate::schema::*;
use crate::sync::*;
use crate::util::*;

pub struct Table<F: TableSchema, R: TableReplication> {
	pub system: Arc<System>,
	pub data: Arc<TableData<F, R>>,
	pub merkle_updater: Arc<MerkleUpdater<F, R>>,
	pub syncer: Arc<TableSyncer<F, R>>,
	gc: Arc<TableGc<F, R>>,
	endpoint: Arc<Endpoint<TableRpc<F>, Self>>,
}

#[derive(Serialize, Deserialize)]
pub(crate) enum TableRpc<F: TableSchema> {
	Ok,

	ReadEntry(F::P, F::S),
	ReadEntryResponse(Option<ByteBuf>),

	// Read range: read all keys in partition P, possibly starting at a certain sort key offset
	ReadRange {
		partition: F::P,
		begin_sort_key: Option<F::S>,
		filter: Option<F::Filter>,
		limit: usize,
		enumeration_order: EnumerationOrder,
	},

	Update(Vec<Arc<ByteBuf>>),
}

impl<F: TableSchema> Rpc for TableRpc<F> {
	type Response = Result<TableRpc<F>, Error>;
}

impl<F: TableSchema, R: TableReplication> Table<F, R> {
	// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============

	pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> {
		let endpoint = system
			.netapp
			.endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME));

		let data = TableData::new(system.clone(), instance, replication, db);

		let merkle_updater = MerkleUpdater::new(data.clone());

		let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone());
		let gc = TableGc::new(system.clone(), data.clone());

		system.layout_manager.add_table(F::TABLE_NAME);

		let table = Arc::new(Self {
			system,
			data,
			merkle_updater,
			gc,
			syncer,
			endpoint,
		});

		table.endpoint.set_handler(table.clone());

		table
	}

	pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
		self.merkle_updater.spawn_workers(bg);
		self.syncer.spawn_workers(bg);
		self.gc.spawn_workers(bg);
		bg.spawn_worker(InsertQueueWorker(self.clone()));
	}

	pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
		let tracer = opentelemetry::global::tracer("garage_table");
		let span = tracer.start(format!("{} insert", F::TABLE_NAME));

		self.insert_internal(e)
			.bound_record_duration(&self.data.metrics.put_request_duration)
			.with_context(Context::current_with_span(span))
			.await?;

		self.data.metrics.put_request_counter.add(1);

		Ok(())
	}

	async fn insert_internal(&self, e: &F::E) -> Result<(), Error> {
		let hash = e.partition_key().hash();
		let who = self.data.replication.write_sets(&hash);

		let e_enc = Arc::new(ByteBuf::from(e.encode()?));
		let rpc = TableRpc::<F>::Update(vec![e_enc]);

		self.system
			.rpc_helper()
			.try_write_many_sets(
				&self.endpoint,
				who.as_ref(),
				rpc,
				RequestStrategy::with_priority(PRIO_NORMAL)
					.with_quorum(self.data.replication.write_quorum()),
			)
			.await?;

		Ok(())
	}

	/// Insert item locally
	pub fn queue_insert(&self, tx: &mut db::Transaction, e: &F::E) -> db::TxResult<(), Error> {
		self.data.queue_insert(tx, e)
	}

	pub async fn insert_many<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error>
	where
		I: IntoIterator<Item = IE> + Send + Sync,
		IE: Borrow<F::E> + Send + Sync,
	{
		let tracer = opentelemetry::global::tracer("garage_table");
		let span = tracer.start(format!("{} insert_many", F::TABLE_NAME));

		self.insert_many_internal(entries)
			.bound_record_duration(&self.data.metrics.put_request_duration)
			.with_context(Context::current_with_span(span))
			.await?;

		self.data.metrics.put_request_counter.add(1);

		Ok(())
	}

	async fn insert_many_internal<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error>
	where
		I: IntoIterator<Item = IE> + Send + Sync,
		IE: Borrow<F::E> + Send + Sync,
	{
		// The different items will have to be stored on possibly different nodes.
		// We will here batch all items into a single request for each concerned
		// node, with all of the entries it must store within that request.
		// Each entry has to be saved to a specific list of "write sets", i.e. a set
		// of node within wich a quorum must be achieved. In normal operation, there
		// is a single write set which corresponds to the quorum in the current
		// cluster layout, but when the layout is updated, multiple write sets might
		// have to be handled at once. Here, since we are sending many entries, we
		// will have to handle many write sets in all cases. The algorihtm is thus
		// to send one request to each node with all the items it must save,
		// and keep track of the OK responses within each write set: if for all sets
		// a quorum of nodes has answered OK, then the insert has succeeded and
		// consistency properties (read-after-write) are preserved.

		// Some code here might feel redundant with RpcHelper::try_write_many_sets,
		// but I think deduplicating could lead to more spaghetti instead of
		// improving the readability, so I'm leaving as is.

		let quorum = self.data.replication.write_quorum();

		// Serialize all entries and compute the write sets for each of them.
		// In the case of sharded table replication, this also takes an "ack lock"
		// to the layout manager to avoid ack'ing newer versions which are not
		// taken into account by writes in progress (the ack can happen later, once
		// all writes that didn't take the new layout into account are finished).
		// These locks are released when entries_vec is dropped, i.e. when this
		// function returns.
		let mut entries_vec = Vec::new();
		for entry in entries.into_iter() {
			let entry = entry.borrow();
			let hash = entry.partition_key().hash();
			let write_sets = self.data.replication.write_sets(&hash);
			let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
			entries_vec.push((write_sets, e_enc));
		}

		// Compute a deduplicated list of all of the write sets,
		// and compute an index from each node to the position of the sets in which
		// it takes part, to optimize the detection of a quorum.
		let mut write_sets = entries_vec
			.iter()
			.map(|(wss, _)| wss.as_ref().iter().map(|ws| ws.as_slice()))
			.flatten()
			.collect::<Vec<&[Uuid]>>();
		write_sets.sort();
		write_sets.dedup();
		let mut write_set_index = HashMap::<&Uuid, Vec<usize>>::new();
		for (i, write_set) in write_sets.iter().enumerate() {
			for node in write_set.iter() {
				write_set_index.entry(node).or_default().push(i);
			}
		}

		// Build a map of all nodes to the entries that must be sent to that node.
		let mut call_list: HashMap<Uuid, Vec<_>> = HashMap::new();
		for (write_sets, entry_enc) in entries_vec.iter() {
			for write_set in write_sets.as_ref().iter() {
				for node in write_set.iter() {
					call_list.entry(*node).or_default().push(entry_enc.clone())
				}
			}
		}

		// Build futures to actually perform each of the corresponding RPC calls
		let call_count = call_list.len();
		let call_futures = call_list.into_iter().map(|(node, entries)| {
			let this = self.clone();
			let tracer = opentelemetry::global::tracer("garage");
			let span = tracer.start(format!("RPC to {:?}", node));
			let fut = async move {
				let rpc = TableRpc::<F>::Update(entries);
				let resp = this
					.system
					.rpc_helper()
					.call(
						&this.endpoint,
						node,
						rpc,
						RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(quorum),
					)
					.await;
				(node, resp)
			};
			fut.with_context(Context::current_with_span(span))
		});

		// Run all requests in parallel thanks to FuturesUnordered, and collect results.
		let mut resps = call_futures.collect::<FuturesUnordered<_>>();
		let mut set_counters = vec![(0, 0); write_sets.len()];
		let mut successes = 0;
		let mut errors = vec![];

		while let Some((node, resp)) = resps.next().await {
			match resp {
				Ok(_) => {
					successes += 1;
					for set in write_set_index.get(&node).unwrap().iter() {
						set_counters[*set].0 += 1;
					}
				}
				Err(e) => {
					errors.push(e);
					for set in write_set_index.get(&node).unwrap().iter() {
						set_counters[*set].1 += 1;
					}
				}
			}

			if set_counters.iter().all(|(ok_cnt, _)| *ok_cnt >= quorum) {
				// Success

				// Continue all other requests in background
				tokio::spawn(async move {
					resps.collect::<Vec<(Uuid, Result<_, _>)>>().await;
				});

				return Ok(());
			}

			if set_counters
				.iter()
				.enumerate()
				.any(|(i, (_, err_cnt))| err_cnt + quorum > write_sets[i].len())
			{
				// Too many errors in this set, we know we won't get a quorum
				break;
			}
		}

		// Failure, could not get quorum within at least one set
		let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
		Err(Error::Quorum(
			quorum,
			Some(write_sets.len()),
			successes,
			call_count,
			errors,
		))
	}

	pub async fn get(
		self: &Arc<Self>,
		partition_key: &F::P,
		sort_key: &F::S,
	) -> Result<Option<F::E>, Error> {
		let tracer = opentelemetry::global::tracer("garage_table");
		let span = tracer.start(format!("{} get", F::TABLE_NAME));

		let res = self
			.get_internal(partition_key, sort_key)
			.bound_record_duration(&self.data.metrics.get_request_duration)
			.with_context(Context::current_with_span(span))
			.await?;

		self.data.metrics.get_request_counter.add(1);

		Ok(res)
	}

	async fn get_internal(
		self: &Arc<Self>,
		partition_key: &F::P,
		sort_key: &F::S,
	) -> Result<Option<F::E>, Error> {
		let hash = partition_key.hash();
		let who = self.data.replication.read_nodes(&hash);

		let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
		let resps = self
			.system
			.rpc_helper()
			.try_call_many(
				&self.endpoint,
				&who,
				rpc,
				RequestStrategy::with_priority(PRIO_NORMAL)
					.with_quorum(self.data.replication.read_quorum()),
			)
			.await?;

		let mut ret = None;
		let mut not_all_same = false;
		for resp in resps {
			if let TableRpc::ReadEntryResponse(value) = resp {
				if let Some(v_bytes) = value {
					let v = self.data.decode_entry(v_bytes.as_slice())?;
					ret = match ret {
						None => Some(v),
						Some(mut x) => {
							if x != v {
								not_all_same = true;
								x.merge(&v);
							}
							Some(x)
						}
					}
				}
			} else {
				return Err(Error::Message("Invalid return value to read".to_string()));
			}
		}
		if let Some(ret_entry) = &ret {
			if not_all_same {
				let self2 = self.clone();
				let ent2 = ret_entry.clone();
				tokio::spawn(async move {
					if let Err(e) = self2.repair_on_read(&who[..], ent2).await {
						warn!("Error doing repair on read: {}", e);
					}
				});
			}
		}

		Ok(ret)
	}

	pub async fn get_range(
		self: &Arc<Self>,
		partition_key: &F::P,
		begin_sort_key: Option<F::S>,
		filter: Option<F::Filter>,
		limit: usize,
		enumeration_order: EnumerationOrder,
	) -> Result<Vec<F::E>, Error> {
		let tracer = opentelemetry::global::tracer("garage_table");
		let span = tracer.start(format!("{} get_range", F::TABLE_NAME));

		let res = self
			.get_range_internal(
				partition_key,
				begin_sort_key,
				filter,
				limit,
				enumeration_order,
			)
			.bound_record_duration(&self.data.metrics.get_request_duration)
			.with_context(Context::current_with_span(span))
			.await?;

		self.data.metrics.get_request_counter.add(1);

		Ok(res)
	}

	async fn get_range_internal(
		self: &Arc<Self>,
		partition_key: &F::P,
		begin_sort_key: Option<F::S>,
		filter: Option<F::Filter>,
		limit: usize,
		enumeration_order: EnumerationOrder,
	) -> Result<Vec<F::E>, Error> {
		let hash = partition_key.hash();
		let who = self.data.replication.read_nodes(&hash);

		let rpc = TableRpc::<F>::ReadRange {
			partition: partition_key.clone(),
			begin_sort_key,
			filter,
			limit,
			enumeration_order,
		};

		let resps = self
			.system
			.rpc_helper()
			.try_call_many(
				&self.endpoint,
				&who,
				rpc,
				RequestStrategy::with_priority(PRIO_NORMAL)
					.with_quorum(self.data.replication.read_quorum()),
			)
			.await?;

		let mut ret: BTreeMap<Vec<u8>, F::E> = BTreeMap::new();
		let mut to_repair = BTreeSet::new();
		for resp in resps {
			if let TableRpc::Update(entries) = resp {
				for entry_bytes in entries.iter() {
					let entry = self.data.decode_entry(entry_bytes.as_slice())?;
					let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key());
					match ret.get_mut(&entry_key) {
						Some(e) => {
							if *e != entry {
								e.merge(&entry);
								to_repair.insert(entry_key.clone());
							}
						}
						None => {
							ret.insert(entry_key, entry);
						}
					}
				}
			} else {
				return Err(Error::unexpected_rpc_message(resp));
			}
		}

		if !to_repair.is_empty() {
			let self2 = self.clone();
			let to_repair = to_repair
				.into_iter()
				.map(|k| ret.get(&k).unwrap().clone())
				.collect::<Vec<_>>();
			tokio::spawn(async move {
				for v in to_repair {
					if let Err(e) = self2.repair_on_read(&who[..], v).await {
						warn!("Error doing repair on read: {}", e);
					}
				}
			});
		}

		// At this point, the `ret` btreemap might contain more than `limit`
		// items, because nodes might have returned us each `limit` items
		// but for different keys. We have to take only the first `limit` items
		// in this map, in the specified enumeration order, for two reasons:
		// 1. To return to the user no more than the number of items that they requested
		// 2. To return only items for which we have a read quorum: we do not know
		//    that we have a read quorum for the items after the first `limit`
		//    of them
		let ret_vec = match enumeration_order {
			EnumerationOrder::Forward => ret
				.into_iter()
				.take(limit)
				.map(|(_k, v)| v)
				.collect::<Vec<_>>(),
			EnumerationOrder::Reverse => ret
				.into_iter()
				.rev()
				.take(limit)
				.map(|(_k, v)| v)
				.collect::<Vec<_>>(),
		};
		Ok(ret_vec)
	}

	// =============== UTILITY FUNCTION FOR CLIENT OPERATIONS ===============

	async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {
		let what_enc = Arc::new(ByteBuf::from(what.encode()?));
		self.system
			.rpc_helper()
			.try_call_many(
				&self.endpoint,
				who,
				TableRpc::<F>::Update(vec![what_enc]),
				RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(who.len()),
			)
			.await?;
		Ok(())
	}
}

#[async_trait]
impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table<F, R> {
	async fn handle(
		self: &Arc<Self>,
		msg: &TableRpc<F>,
		_from: NodeID,
	) -> Result<TableRpc<F>, Error> {
		match msg {
			TableRpc::ReadEntry(key, sort_key) => {
				let value = self.data.read_entry(key, sort_key)?;
				Ok(TableRpc::ReadEntryResponse(value))
			}
			TableRpc::ReadRange {
				partition,
				begin_sort_key,
				filter,
				limit,
				enumeration_order,
			} => {
				let values = self.data.read_range(
					partition,
					begin_sort_key,
					filter,
					*limit,
					*enumeration_order,
				)?;
				Ok(TableRpc::Update(values))
			}
			TableRpc::Update(pairs) => {
				self.data.update_many(pairs)?;
				Ok(TableRpc::Ok)
			}
			m => Err(Error::unexpected_rpc_message(m)),
		}
	}
}