summaryrefslogtreecommitdiff
path: root/src/poolserver.ml
blob: 850db3c44aff335b08729ab843f7e0301e4f76e1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
open Unix
open Util
open Proto

let pool_port = ref 9082

type client = {
	id: id;
	addr: sockaddr;
	input: file_descr;
	send: pool_message -> unit;
	disconnect: unit -> unit;
	mutable slots: int;
}

type server = {
	clients: (id, client) Hashtbl.t;
	sock: file_descr;
}

let new_server () =
	let server = 
	{	clients = Hashtbl.create 12;
		sock = socket PF_INET SOCK_STREAM 0;
	} in
	Format.eprintf "Listening on port %d...@." !pool_port;
	bind server.sock (make_addr "0.0.0.0" !pool_port);
	listen server.sock 2;

	let stop_srv _ =
		Format.eprintf "Shutting down server...@.";
		Hashtbl.iter (fun _ c -> c.disconnect()) server.clients;
		shutdown server.sock SHUTDOWN_ALL;
		close server.sock;
		exit 0
	in
	Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv);
	Sys.set_signal Sys.sigterm (Sys.Signal_handle stop_srv);

	server

let server_add_client server cli =
	(* Say hello *)
	let msg = read_one_msg cli.input in
	if msg <> PoolHello then raise (ProtocolError "Client must say PoolHello first thing.");
	cli.send PoolHello;
	(* Put client somewhere *)
	Hashtbl.add server.clients cli.id cli

let client_of_fd server fd =
	let cli = ref None in
	Hashtbl.iter (fun _ c -> if c.input = fd then cli := Some c) server.clients;
	match !cli with
	| None -> assert false
	| Some c -> c

let client_disconnect server cli =
	cli.disconnect ();
	Hashtbl.remove server.clients cli.id;
	Format.eprintf "Disconnected: %s@." (id_str cli.id)
		
let rec server_run server =
	let fds = Hashtbl.fold
		(fun _ c l -> c.input::l) 
		server.clients [server.sock]
	in
	let qi, _, qe = select fds [] fds (-1.0) in
	begin match qi, qe with
	| x::_, _ when x = server.sock ->
		let cli, cli_addr = accept server.sock in
		let oc = out_channel_of_descr cli in
		let cli =
			{	id = new_id();
				addr = cli_addr;
				input = cli;
				send = (fun msg -> Marshal.to_channel oc msg []; flush oc);
				disconnect = (fun () -> shutdown cli SHUTDOWN_ALL; close cli);
				slots = 0;
			} in
		server_add_client server cli;
		Format.eprintf "New client %s.@." (id_str cli.id)
	| x::_, _ ->
		let cli = client_of_fd server x in
		begin try match read_one_msg cli.input with
		| PoolProvide n ->
			Format.eprintf "%s provide %d@." (id_str cli.id) n;
			cli.slots <- cli.slots + n
		| PoolRequest(task, addr, n) ->
			Format.eprintf "%s request %d for %s@." (id_str cli.id) n task;
			(* Distribute tasks in the pool *)
			let rec aux n =
				if n > 0 then begin
					let cli = ref None in
					Hashtbl.iter (fun _ c -> if c.slots > 0 then cli := Some c) server.clients;
					match !cli with
					| None -> () (* Sorry, pool is to small for your request. *)
					| Some c ->
						let k = min n c.slots in
						c.slots <- c.slots - k;
						c.send (PoolRequest(task, addr, k));
						aux (n-k)
				end
			in aux n
		| PoolHello -> raise (ProtocolError "Misplaced PoolHello.")
		with _ ->
			client_disconnect server cli
		end
	| [], x::_ ->
		let cli = client_of_fd server x in
		client_disconnect server cli
	| _ -> assert false
	end;
	server_run server	(* Infinite Loop ! *)


let () =
	let usage = "Usage: ./poolserver [options]" in
	let options = [
		"-port", Arg.Set_int pool_port, "Set port for pooling server.";
	] in
	Arg.parse options (fun _ -> ()) usage;

	let server = new_server() in
	server_run server