summaryrefslogtreecommitdiff
path: root/src/manager.ml
diff options
context:
space:
mode:
Diffstat (limited to 'src/manager.ml')
-rw-r--r--src/manager.ml84
1 files changed, 74 insertions, 10 deletions
diff --git a/src/manager.ml b/src/manager.ml
index 39b1ee9..0da7df2 100644
--- a/src/manager.ml
+++ b/src/manager.ml
@@ -7,6 +7,16 @@ let dbg x = if !dbg_out then Format.eprintf "(srv) %s@." x
let dbg1 a x = if !dbg_out then Format.eprintf "(srv) %s %s@." (id_str a) x
let dbg2 a b x = if !dbg_out then Format.eprintf "(srv) %s %s %s@." (id_str a) (id_str b) x
+(* Program options *)
+let program = ref ""
+let local_proc = ref 1
+
+let my_addr = ref ""
+let my_port = ref 9011
+let pool_addr = ref ""
+let pool_port = ref 9082
+let pool_count = ref 0
+
(* Server data structures *)
@@ -32,15 +42,33 @@ type server = {
msg_chan: (id, string Queue.t) Hashtbl.t;
mutable final_result: string option;
clients: (id, client) Hashtbl.t;
+ sock: file_descr;
}
let new_server () =
+ let server =
{ tasks = Queue.create ();
tsk_chan = Hashtbl.create 12;
msg_chan = Hashtbl.create 12;
final_result = None;
clients = Hashtbl.create 4;
- }
+ sock = socket PF_INET SOCK_STREAM 0;
+ } in
+ (* Setup networking *)
+ if !my_addr <> "" then begin
+ dbg @@ Format.sprintf "Listening on port %d" !my_port;
+ bind server.sock (make_addr "0.0.0.0" !my_port);
+ listen server.sock (min 1 !pool_count);
+
+ let stop_srv _ =
+ dbg "Shutting down server...";
+ shutdown server.sock SHUTDOWN_ALL;
+ exit 0
+ in
+ Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv);
+ Sys.set_signal Sys.sigterm (Sys.Signal_handle stop_srv)
+ end;
+ server
let push_task server task =
let cli = ref None in
@@ -102,6 +130,21 @@ let client_of_fd server fd =
| None -> assert false
| Some c -> c
+let server_accept_client server =
+ let cli, cli_addr = accept server.sock in
+ let oc = out_channel_of_descr cli in
+ let cli =
+ { id = new_id();
+ input = cli;
+ send = (fun msg -> Marshal.to_channel oc msg []; flush oc);
+ disconnect = (fun () -> shutdown cli SHUTDOWN_ALL; close cli);
+ status = Busy;
+ } in
+ server_add_client server cli
+
+let client_disconnect server cli =
+ cli.disconnect ();
+ Hashtbl.remove server.clients cli.id
let rec server_run server =
let fds = Hashtbl.fold
@@ -110,10 +153,13 @@ let rec server_run server =
then c.input::l
else l)
server.clients [] in
- if not (fds = []) then begin
+ if List.length fds > 0 then begin
+ let fds = if !my_addr = "" then fds else server.sock :: fds in
dbg "selecting...";
let qi, _, qe = select fds [] fds (-1.0) in
begin match qi, qe with
+ | x::_, _ when x = server.sock ->
+ server_accept_client server
| x::_, _ ->
let cli = client_of_fd server x in
dbg1 cli.id "reading...";
@@ -129,8 +175,7 @@ let rec server_run server =
| Task(a, b) -> GiveTask(a,b))
| Some r ->
cli.send(FinalResult r);
- cli.disconnect();
- Hashtbl.remove server.clients cli.id
+ client_disconnect server cli
end;
| Get(chan, td) ->
dbg2 cli.id chan "got GET";
@@ -150,8 +195,7 @@ let rec server_run server =
List.iter
(fun c ->
c.send(FinalResult x);
- c.disconnect();
- Hashtbl.remove server.clients c.id)
+ client_disconnect server c)
!p
| GiveTask(a, b) ->
dbg "got Task";
@@ -159,12 +203,12 @@ let rec server_run server =
| GiveMsgTask(a, b) ->
dbg "got MsgTask";
push_task server (MsgTask(a, b))
+ | Output s -> Format.printf "%s@?" s
| Hello -> raise (ProtocolError "Unexpected Hello.")
end
| [], x::_ ->
let cli = client_of_fd server x in
- cli.disconnect();
- Hashtbl.remove server.clients cli.id
+ client_disconnect server cli
| _ -> assert false
end;
server_run server
@@ -178,14 +222,17 @@ let rec server_run server =
(* Main function *)
-let program = ref ""
-let local_proc = ref 1
let parse_args () =
let usage = "Usage: ./manager [options] program" in
let options = [
"-dbg", Arg.Set dbg_out, "Show debug output";
"-local-proc", Arg.Set_int local_proc, "Set number of local processes. Default: 1";
+ "-my-addr", Arg.Set_string my_addr, "Address (name) of the computer this program is running on.";
+ "-my-port", Arg.Set_int my_port, "Port for me to listen";
+ "-pool-addr", Arg.Set_string pool_addr, "Pool server to use";
+ "-pool-port", Arg.Set_int pool_port, "Port on which to connect to pool";
+ "-pool-count", Arg.Set_int pool_count, "Number of processes to ask to pool";
] in
Arg.parse options (fun n -> program := n) usage
@@ -235,6 +282,23 @@ let () =
pids := pid :: (!pids)
done;
+ if !pool_addr <> "" && !pool_count > 0 then begin
+ let sock = socket PF_INET SOCK_STREAM 0 in
+ connect sock (make_addr !pool_addr !pool_port);
+ let outc = out_channel_of_descr sock in
+ let send m = Marshal.to_channel outc m []; flush outc in
+
+ send PoolHello;
+ if read_one_msg sock <> PoolHello then
+ raise (ProtocolError "Expected PoolHello reply.");
+
+ send (PoolRequest(!program, (!my_addr, !my_port), !pool_count));
+
+ shutdown sock SHUTDOWN_ALL;
+ close sock
+ end;
+
server_run server;
+ shutdown server.sock SHUTDOWN_ALL;
List.iter (fun pid -> ignore (waitpid [] pid)) !pids