1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
open Unix
open Util
open Proto
let pool_port = ref 9082
type client = {
id: id;
addr: sockaddr;
input: file_descr;
send: pool_message -> unit;
disconnect: unit -> unit;
mutable slots: int;
}
type server = {
clients: (id, client) Hashtbl.t;
sock: file_descr;
}
let new_server () =
let server =
{ clients = Hashtbl.create 12;
sock = socket PF_INET SOCK_STREAM 0;
} in
Format.eprintf "Listening on port %d...@." !pool_port;
bind server.sock (make_addr "0.0.0.0" !pool_port);
listen server.sock 2;
let stop_srv _ =
Format.eprintf "Shutting down server...@.";
Hashtbl.iter (fun _ c -> c.disconnect()) server.clients;
shutdown server.sock SHUTDOWN_ALL;
close server.sock;
exit 0
in
Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv);
Sys.set_signal Sys.sigterm (Sys.Signal_handle stop_srv);
server
let server_add_client server cli =
(* Say hello *)
let msg = read_one_msg cli.input in
if msg <> PoolHello then raise (ProtocolError "Client must say PoolHello first thing.");
cli.send PoolHello;
(* Put client somewhere *)
Hashtbl.add server.clients cli.id cli
let client_of_fd server fd =
let cli = ref None in
Hashtbl.iter (fun _ c -> if c.input = fd then cli := Some c) server.clients;
match !cli with
| None -> assert false
| Some c -> c
let client_disconnect server cli =
cli.disconnect ();
Hashtbl.remove server.clients cli.id;
Format.eprintf "Disconnected: %s@." (id_str cli.id)
let rec server_run server =
let fds = Hashtbl.fold
(fun _ c l -> c.input::l)
server.clients [server.sock]
in
let qi, _, qe = select fds [] fds (-1.0) in
begin match qi, qe with
| x::_, _ when x = server.sock ->
let cli, cli_addr = accept server.sock in
let oc = out_channel_of_descr cli in
let cli =
{ id = new_id();
addr = cli_addr;
input = cli;
send = (fun msg -> Marshal.to_channel oc msg []; flush oc);
disconnect = (fun () -> shutdown cli SHUTDOWN_ALL; close cli);
slots = 0;
} in
server_add_client server cli;
Format.eprintf "New client %s.@." (id_str cli.id)
| x::_, _ ->
let cli = client_of_fd server x in
begin try match read_one_msg cli.input with
| PoolProvide n ->
Format.eprintf "%s provide %d@." (id_str cli.id) n;
cli.slots <- cli.slots + n
| PoolRequest(task, addr, n) ->
Format.eprintf "%s request %d for %s@." (id_str cli.id) n task;
(* Distribute tasks in the pool *)
let rec aux n =
if n > 0 then begin
let cli = ref None in
Hashtbl.iter (fun _ c -> if c.slots > 0 then cli := Some c) server.clients;
match !cli with
| None -> () (* Sorry, pool is to small for your request. *)
| Some c ->
let k = min n c.slots in
c.slots <- c.slots - k;
c.send (PoolRequest(task, addr, k));
aux (n-k)
end
in aux n
| PoolHello -> raise (ProtocolError "Misplaced PoolHello.")
with _ ->
client_disconnect server cli
end
| [], x::_ ->
let cli = client_of_fd server x in
client_disconnect server cli
| _ -> assert false
end;
server_run server (* Infinite Loop ! *)
let () =
let usage = "Usage: ./poolserver [options]" in
let options = [
"-port", Arg.Set_int pool_port, "Set port for pooling server.";
] in
Arg.parse options (fun _ -> ()) usage;
let server = new_server() in
server_run server
|