summaryrefslogblamecommitdiff
path: root/src/poolserver.ml
blob: 15a700c56428f19a436d477ffa1bca6f24b82236 (plain) (tree)

























































































































                                                                                                                   
open Unix
open Util
open Proto

let pool_port = ref 9082

type client = {
	id: id;
	addr: sockaddr;
	input: file_descr;
	send: pool_message -> unit;
	disconnect: unit -> unit;
	mutable slots: int;
}

type server = {
	clients: (id, client) Hashtbl.t;
	sock: file_descr;
}

let new_server () =
	let server = 
	{	clients = Hashtbl.create 12;
		sock = socket PF_INET SOCK_STREAM 0;
	} in
	Format.eprintf "Listening on port %d...@." !pool_port;
	bind server.sock (make_addr "0.0.0.0" !pool_port);
	listen server.sock 2;

	let stop_srv _ =
		Format.eprintf "Shutting down server...@.";
		shutdown server.sock SHUTDOWN_ALL;
		exit 0
	in
	Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv);
	Sys.set_signal Sys.sigterm (Sys.Signal_handle stop_srv);

	server

let server_add_client server cli =
	(* Say hello *)
	let msg = read_one_msg cli.input in
	if msg <> PoolHello then raise (ProtocolError "Client must say PoolHello first thing.");
	cli.send PoolHello;
	(* Put client somewhere *)
	Hashtbl.add server.clients cli.id cli

let client_of_fd server fd =
	let cli = ref None in
	Hashtbl.iter (fun _ c -> if c.input = fd then cli := Some c) server.clients;
	match !cli with
	| None -> assert false
	| Some c -> c

let client_disconnect server cli =
	cli.disconnect ();
	Hashtbl.remove server.clients cli.id;
	Format.eprintf "Disconnected: %s@." (id_str cli.id)
		
let rec server_run server =
	let fds = Hashtbl.fold
		(fun _ c l -> c.input::l) 
		server.clients [server.sock]
	in
	let qi, _, qe = select fds [] fds (-1.0) in
	begin match qi, qe with
	| x::_, _ when x = server.sock ->
		let cli, cli_addr = accept server.sock in
		let oc = out_channel_of_descr cli in
		let cli =
			{	id = new_id();
				addr = cli_addr;
				input = cli;
				send = (fun msg -> Marshal.to_channel oc msg []; flush oc);
				disconnect = (fun () -> shutdown cli SHUTDOWN_ALL; close cli);
				slots = 0;
			} in
		server_add_client server cli;
		Format.eprintf "New client %s.@." (id_str cli.id)
	| x::_, _ ->
		let cli = client_of_fd server x in
		begin try match read_one_msg cli.input with
		| PoolProvide n ->
			Format.eprintf "%s provide %d@." (id_str cli.id) n;
			cli.slots <- cli.slots + n
		| PoolRequest(task, addr, n) ->
			Format.eprintf "%s request %d for %s@." (id_str cli.id) n task;
			(* Distribute tasks in the pool *)
			let rec aux n =
				if n > 0 then begin
					let cli = ref None in
					Hashtbl.iter (fun _ c -> if c.slots > 0 then cli := Some c) server.clients;
					match !cli with
					| None -> () (* Sorry, pool is to small for your request. *)
					| Some c ->
						let k = min n c.slots in
						c.slots <- c.slots - k;
						c.send (PoolRequest(task, addr, k));
						aux (n-k)
				end
			in aux n
		| PoolHello -> raise (ProtocolError "Misplaced PoolHello.")
		with _ ->
			client_disconnect server cli
		end
	| [], x::_ ->
		let cli = client_of_fd server x in
		client_disconnect server cli
	| _ -> assert false
	end;
	server_run server	(* Infinite Loop ! *)


let () =
	let usage = "Usage: ./poolserver [options]" in
	let options = [
		"-port", Arg.Set_int pool_port, "Set port for pooling server.";
	] in
	Arg.parse options (fun _ -> ()) usage;

	let server = new_server() in
	server_run server