summaryrefslogtreecommitdiff
path: root/src/poolserver.ml
diff options
context:
space:
mode:
Diffstat (limited to 'src/poolserver.ml')
-rw-r--r--src/poolserver.ml122
1 files changed, 122 insertions, 0 deletions
diff --git a/src/poolserver.ml b/src/poolserver.ml
new file mode 100644
index 0000000..15a700c
--- /dev/null
+++ b/src/poolserver.ml
@@ -0,0 +1,122 @@
+open Unix
+open Util
+open Proto
+
+let pool_port = ref 9082
+
+type client = {
+ id: id;
+ addr: sockaddr;
+ input: file_descr;
+ send: pool_message -> unit;
+ disconnect: unit -> unit;
+ mutable slots: int;
+}
+
+type server = {
+ clients: (id, client) Hashtbl.t;
+ sock: file_descr;
+}
+
+let new_server () =
+ let server =
+ { clients = Hashtbl.create 12;
+ sock = socket PF_INET SOCK_STREAM 0;
+ } in
+ Format.eprintf "Listening on port %d...@." !pool_port;
+ bind server.sock (make_addr "0.0.0.0" !pool_port);
+ listen server.sock 2;
+
+ let stop_srv _ =
+ Format.eprintf "Shutting down server...@.";
+ shutdown server.sock SHUTDOWN_ALL;
+ exit 0
+ in
+ Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv);
+ Sys.set_signal Sys.sigterm (Sys.Signal_handle stop_srv);
+
+ server
+
+let server_add_client server cli =
+ (* Say hello *)
+ let msg = read_one_msg cli.input in
+ if msg <> PoolHello then raise (ProtocolError "Client must say PoolHello first thing.");
+ cli.send PoolHello;
+ (* Put client somewhere *)
+ Hashtbl.add server.clients cli.id cli
+
+let client_of_fd server fd =
+ let cli = ref None in
+ Hashtbl.iter (fun _ c -> if c.input = fd then cli := Some c) server.clients;
+ match !cli with
+ | None -> assert false
+ | Some c -> c
+
+let client_disconnect server cli =
+ cli.disconnect ();
+ Hashtbl.remove server.clients cli.id;
+ Format.eprintf "Disconnected: %s@." (id_str cli.id)
+
+let rec server_run server =
+ let fds = Hashtbl.fold
+ (fun _ c l -> c.input::l)
+ server.clients [server.sock]
+ in
+ let qi, _, qe = select fds [] fds (-1.0) in
+ begin match qi, qe with
+ | x::_, _ when x = server.sock ->
+ let cli, cli_addr = accept server.sock in
+ let oc = out_channel_of_descr cli in
+ let cli =
+ { id = new_id();
+ addr = cli_addr;
+ input = cli;
+ send = (fun msg -> Marshal.to_channel oc msg []; flush oc);
+ disconnect = (fun () -> shutdown cli SHUTDOWN_ALL; close cli);
+ slots = 0;
+ } in
+ server_add_client server cli;
+ Format.eprintf "New client %s.@." (id_str cli.id)
+ | x::_, _ ->
+ let cli = client_of_fd server x in
+ begin try match read_one_msg cli.input with
+ | PoolProvide n ->
+ Format.eprintf "%s provide %d@." (id_str cli.id) n;
+ cli.slots <- cli.slots + n
+ | PoolRequest(task, addr, n) ->
+ Format.eprintf "%s request %d for %s@." (id_str cli.id) n task;
+ (* Distribute tasks in the pool *)
+ let rec aux n =
+ if n > 0 then begin
+ let cli = ref None in
+ Hashtbl.iter (fun _ c -> if c.slots > 0 then cli := Some c) server.clients;
+ match !cli with
+ | None -> () (* Sorry, pool is to small for your request. *)
+ | Some c ->
+ let k = min n c.slots in
+ c.slots <- c.slots - k;
+ c.send (PoolRequest(task, addr, k));
+ aux (n-k)
+ end
+ in aux n
+ | PoolHello -> raise (ProtocolError "Misplaced PoolHello.")
+ with _ ->
+ client_disconnect server cli
+ end
+ | [], x::_ ->
+ let cli = client_of_fd server x in
+ client_disconnect server cli
+ | _ -> assert false
+ end;
+ server_run server (* Infinite Loop ! *)
+
+
+let () =
+ let usage = "Usage: ./poolserver [options]" in
+ let options = [
+ "-port", Arg.Set_int pool_port, "Set port for pooling server.";
+ ] in
+ Arg.parse options (fun _ -> ()) usage;
+
+ let server = new_server() in
+ server_run server