From 4a5ca97b0970e191332fb6fb4684a397b93390f5 Mon Sep 17 00:00:00 2001 From: Alex AUVOLAT Date: Sun, 25 May 2014 01:00:44 +0200 Subject: ./manager -pool-addr mypool -pool-count 16 -my-addr my_addr ./example.native --- src/manager.ml | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 74 insertions(+), 10 deletions(-) (limited to 'src/manager.ml') diff --git a/src/manager.ml b/src/manager.ml index 39b1ee9..0da7df2 100644 --- a/src/manager.ml +++ b/src/manager.ml @@ -7,6 +7,16 @@ let dbg x = if !dbg_out then Format.eprintf "(srv) %s@." x let dbg1 a x = if !dbg_out then Format.eprintf "(srv) %s %s@." (id_str a) x let dbg2 a b x = if !dbg_out then Format.eprintf "(srv) %s %s %s@." (id_str a) (id_str b) x +(* Program options *) +let program = ref "" +let local_proc = ref 1 + +let my_addr = ref "" +let my_port = ref 9011 +let pool_addr = ref "" +let pool_port = ref 9082 +let pool_count = ref 0 + (* Server data structures *) @@ -32,15 +42,33 @@ type server = { msg_chan: (id, string Queue.t) Hashtbl.t; mutable final_result: string option; clients: (id, client) Hashtbl.t; + sock: file_descr; } let new_server () = + let server = { tasks = Queue.create (); tsk_chan = Hashtbl.create 12; msg_chan = Hashtbl.create 12; final_result = None; clients = Hashtbl.create 4; - } + sock = socket PF_INET SOCK_STREAM 0; + } in + (* Setup networking *) + if !my_addr <> "" then begin + dbg @@ Format.sprintf "Listening on port %d" !my_port; + bind server.sock (make_addr "0.0.0.0" !my_port); + listen server.sock (min 1 !pool_count); + + let stop_srv _ = + dbg "Shutting down server..."; + shutdown server.sock SHUTDOWN_ALL; + exit 0 + in + Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv); + Sys.set_signal Sys.sigterm (Sys.Signal_handle stop_srv) + end; + server let push_task server task = let cli = ref None in @@ -102,6 +130,21 @@ let client_of_fd server fd = | None -> assert false | Some c -> c +let server_accept_client server = + let cli, cli_addr = accept server.sock in + let oc = out_channel_of_descr cli in + let cli = + { id = new_id(); + input = cli; + send = (fun msg -> Marshal.to_channel oc msg []; flush oc); + disconnect = (fun () -> shutdown cli SHUTDOWN_ALL; close cli); + status = Busy; + } in + server_add_client server cli + +let client_disconnect server cli = + cli.disconnect (); + Hashtbl.remove server.clients cli.id let rec server_run server = let fds = Hashtbl.fold @@ -110,10 +153,13 @@ let rec server_run server = then c.input::l else l) server.clients [] in - if not (fds = []) then begin + if List.length fds > 0 then begin + let fds = if !my_addr = "" then fds else server.sock :: fds in dbg "selecting..."; let qi, _, qe = select fds [] fds (-1.0) in begin match qi, qe with + | x::_, _ when x = server.sock -> + server_accept_client server | x::_, _ -> let cli = client_of_fd server x in dbg1 cli.id "reading..."; @@ -129,8 +175,7 @@ let rec server_run server = | Task(a, b) -> GiveTask(a,b)) | Some r -> cli.send(FinalResult r); - cli.disconnect(); - Hashtbl.remove server.clients cli.id + client_disconnect server cli end; | Get(chan, td) -> dbg2 cli.id chan "got GET"; @@ -150,8 +195,7 @@ let rec server_run server = List.iter (fun c -> c.send(FinalResult x); - c.disconnect(); - Hashtbl.remove server.clients c.id) + client_disconnect server c) !p | GiveTask(a, b) -> dbg "got Task"; @@ -159,12 +203,12 @@ let rec server_run server = | GiveMsgTask(a, b) -> dbg "got MsgTask"; push_task server (MsgTask(a, b)) + | Output s -> Format.printf "%s@?" s | Hello -> raise (ProtocolError "Unexpected Hello.") end | [], x::_ -> let cli = client_of_fd server x in - cli.disconnect(); - Hashtbl.remove server.clients cli.id + client_disconnect server cli | _ -> assert false end; server_run server @@ -178,14 +222,17 @@ let rec server_run server = (* Main function *) -let program = ref "" -let local_proc = ref 1 let parse_args () = let usage = "Usage: ./manager [options] program" in let options = [ "-dbg", Arg.Set dbg_out, "Show debug output"; "-local-proc", Arg.Set_int local_proc, "Set number of local processes. Default: 1"; + "-my-addr", Arg.Set_string my_addr, "Address (name) of the computer this program is running on."; + "-my-port", Arg.Set_int my_port, "Port for me to listen"; + "-pool-addr", Arg.Set_string pool_addr, "Pool server to use"; + "-pool-port", Arg.Set_int pool_port, "Port on which to connect to pool"; + "-pool-count", Arg.Set_int pool_count, "Number of processes to ask to pool"; ] in Arg.parse options (fun n -> program := n) usage @@ -235,6 +282,23 @@ let () = pids := pid :: (!pids) done; + if !pool_addr <> "" && !pool_count > 0 then begin + let sock = socket PF_INET SOCK_STREAM 0 in + connect sock (make_addr !pool_addr !pool_port); + let outc = out_channel_of_descr sock in + let send m = Marshal.to_channel outc m []; flush outc in + + send PoolHello; + if read_one_msg sock <> PoolHello then + raise (ProtocolError "Expected PoolHello reply."); + + send (PoolRequest(!program, (!my_addr, !my_port), !pool_count)); + + shutdown sock SHUTDOWN_ALL; + close sock + end; + server_run server; + shutdown server.sock SHUTDOWN_ALL; List.iter (fun pid -> ignore (waitpid [] pid)) !pids -- cgit v1.2.3