From 4a5ca97b0970e191332fb6fb4684a397b93390f5 Mon Sep 17 00:00:00 2001 From: Alex AUVOLAT Date: Sun, 25 May 2014 01:00:44 +0200 Subject: ./manager -pool-addr mypool -pool-count 16 -my-addr my_addr ./example.native --- src/poolserver.ml | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 src/poolserver.ml (limited to 'src/poolserver.ml') diff --git a/src/poolserver.ml b/src/poolserver.ml new file mode 100644 index 0000000..15a700c --- /dev/null +++ b/src/poolserver.ml @@ -0,0 +1,122 @@ +open Unix +open Util +open Proto + +let pool_port = ref 9082 + +type client = { + id: id; + addr: sockaddr; + input: file_descr; + send: pool_message -> unit; + disconnect: unit -> unit; + mutable slots: int; +} + +type server = { + clients: (id, client) Hashtbl.t; + sock: file_descr; +} + +let new_server () = + let server = + { clients = Hashtbl.create 12; + sock = socket PF_INET SOCK_STREAM 0; + } in + Format.eprintf "Listening on port %d...@." !pool_port; + bind server.sock (make_addr "0.0.0.0" !pool_port); + listen server.sock 2; + + let stop_srv _ = + Format.eprintf "Shutting down server...@."; + shutdown server.sock SHUTDOWN_ALL; + exit 0 + in + Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv); + Sys.set_signal Sys.sigterm (Sys.Signal_handle stop_srv); + + server + +let server_add_client server cli = + (* Say hello *) + let msg = read_one_msg cli.input in + if msg <> PoolHello then raise (ProtocolError "Client must say PoolHello first thing."); + cli.send PoolHello; + (* Put client somewhere *) + Hashtbl.add server.clients cli.id cli + +let client_of_fd server fd = + let cli = ref None in + Hashtbl.iter (fun _ c -> if c.input = fd then cli := Some c) server.clients; + match !cli with + | None -> assert false + | Some c -> c + +let client_disconnect server cli = + cli.disconnect (); + Hashtbl.remove server.clients cli.id; + Format.eprintf "Disconnected: %s@." (id_str cli.id) + +let rec server_run server = + let fds = Hashtbl.fold + (fun _ c l -> c.input::l) + server.clients [server.sock] + in + let qi, _, qe = select fds [] fds (-1.0) in + begin match qi, qe with + | x::_, _ when x = server.sock -> + let cli, cli_addr = accept server.sock in + let oc = out_channel_of_descr cli in + let cli = + { id = new_id(); + addr = cli_addr; + input = cli; + send = (fun msg -> Marshal.to_channel oc msg []; flush oc); + disconnect = (fun () -> shutdown cli SHUTDOWN_ALL; close cli); + slots = 0; + } in + server_add_client server cli; + Format.eprintf "New client %s.@." (id_str cli.id) + | x::_, _ -> + let cli = client_of_fd server x in + begin try match read_one_msg cli.input with + | PoolProvide n -> + Format.eprintf "%s provide %d@." (id_str cli.id) n; + cli.slots <- cli.slots + n + | PoolRequest(task, addr, n) -> + Format.eprintf "%s request %d for %s@." (id_str cli.id) n task; + (* Distribute tasks in the pool *) + let rec aux n = + if n > 0 then begin + let cli = ref None in + Hashtbl.iter (fun _ c -> if c.slots > 0 then cli := Some c) server.clients; + match !cli with + | None -> () (* Sorry, pool is to small for your request. *) + | Some c -> + let k = min n c.slots in + c.slots <- c.slots - k; + c.send (PoolRequest(task, addr, k)); + aux (n-k) + end + in aux n + | PoolHello -> raise (ProtocolError "Misplaced PoolHello.") + with _ -> + client_disconnect server cli + end + | [], x::_ -> + let cli = client_of_fd server x in + client_disconnect server cli + | _ -> assert false + end; + server_run server (* Infinite Loop ! *) + + +let () = + let usage = "Usage: ./poolserver [options]" in + let options = [ + "-port", Arg.Set_int pool_port, "Set port for pooling server."; + ] in + Arg.parse options (fun _ -> ()) usage; + + let server = new_server() in + server_run server -- cgit v1.2.3