From 4a5ca97b0970e191332fb6fb4684a397b93390f5 Mon Sep 17 00:00:00 2001 From: Alex AUVOLAT Date: Sun, 25 May 2014 01:00:44 +0200 Subject: ./manager -pool-addr mypool -pool-count 16 -my-addr my_addr ./example.native --- src/Makefile | 2 +- src/example.ml | 12 +++--- src/kahn.ml | 3 +- src/kahn_pipe.ml | 3 +- src/kahn_seq.ml | 4 +- src/kahn_stdio.ml | 10 +---- src/manager.ml | 84 ++++++++++++++++++++++++++++++++----- src/poolclient.ml | 72 ++++++++++++++++++++++++++++++++ src/poolserver.ml | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/primes.ml | 2 +- src/proto.ml | 8 ++++ 11 files changed, 291 insertions(+), 31 deletions(-) create mode 100644 src/poolclient.ml create mode 100644 src/poolserver.ml (limited to 'src') diff --git a/src/Makefile b/src/Makefile index ebd2516..dca72f6 100644 --- a/src/Makefile +++ b/src/Makefile @@ -2,7 +2,7 @@ OCAMLBUILD=ocamlbuild -classic-display \ -tags annot,debug,thread \ -libs unix -all: primes.native example.native manager.native +all: primes.native example.native manager.native poolserver.native poolclient.native %.native: %.ml kahn_pipe.ml kahn_seq.ml kahn_stdio.ml proto.ml util.ml $(OCAMLBUILD) $@ diff --git a/src/example.ml b/src/example.ml index 43e5327..1b0f274 100644 --- a/src/example.ml +++ b/src/example.ml @@ -28,11 +28,10 @@ module Example (K : Kahn.S) = struct let output (qi : (int * int) K.in_port) : unit K.process = let rec loop () = - K.bind_io - (K.get qi) + (K.get qi) >>= (fun (v, s) -> if v <> -1 then - begin Format.eprintf "f(%d) = %d@." v s; loop () end + begin K.output @@ Format.sprintf "f(%d) = %d@." v s; loop () end else K.return ()) in loop () @@ -64,10 +63,9 @@ module Example (K : Kahn.S) = struct fib_rec (n-2) (r-1) q_out ; fib_rec (n-1) (r-1) q_out2 ; K.get q_in >>= (fun x -> - K.bind_io - (K.get q_in2) + (K.get q_in2) >>= (fun y -> - Format.eprintf "f(%d) = %d@." n (x+y); + K.output @@ Format.sprintf "f(%d) = %d@." n (x+y); K.put (x+y) qo)) ] ))) @@ -76,7 +74,7 @@ module Example (K : Kahn.S) = struct let main2 : int K.process = (delay K.new_channel()) >>= (fun (qi, qo) -> - (fib_rec 47 7 qo) >>= + (fib_rec 50 7 qo) >>= (fun () -> K.get qi)) end diff --git a/src/kahn.ml b/src/kahn.ml index 08eac19..a458724 100644 --- a/src/kahn.ml +++ b/src/kahn.ml @@ -7,6 +7,8 @@ module type S = sig val put: 'a -> 'a out_port -> unit process val get: 'a in_port -> 'a process + val output: string -> unit + val select: ('a in_port * ('a -> 'b)) list -> 'b process val select_default: ('a in_port * ('a -> 'b)) list -> (unit -> 'b) -> 'b process @@ -14,7 +16,6 @@ module type S = sig val return: 'a -> 'a process val bind: 'a process -> ('a -> 'b process) -> 'b process - val bind_io: 'a process -> ('a -> 'b process) -> 'b process val run: 'a process -> 'a end diff --git a/src/kahn_pipe.ml b/src/kahn_pipe.ml index 9f3da0a..2df8bc5 100644 --- a/src/kahn_pipe.ml +++ b/src/kahn_pipe.ml @@ -18,6 +18,8 @@ module Pipe: S = struct fun () -> Marshal.to_channel c x []; flush c + + let output s = Format.printf "%s@?" s let try_get block prt_list = let fds = List.map fst prt_list in @@ -52,7 +54,6 @@ module Pipe: S = struct let bind e f = fun () -> f (e ()) () - let bind_io = bind let run p = p() diff --git a/src/kahn_seq.ml b/src/kahn_seq.ml index ce82117..177d6dd 100644 --- a/src/kahn_seq.ml +++ b/src/kahn_seq.ml @@ -19,6 +19,8 @@ module Seq: S = struct let new_channel () = let q = Queue.create () in q, q + + let output s = Format.printf "%s@?" s let put x c = fun cont -> @@ -64,8 +66,6 @@ module Seq: S = struct fun cont -> e (Some (fun (r : 'a) -> f r cont)) - let bind_io = bind - let run e = let ret = ref None in e (Some (fun v -> ret := Some v)); diff --git a/src/kahn_stdio.ml b/src/kahn_stdio.ml index a149742..badf338 100644 --- a/src/kahn_stdio.ml +++ b/src/kahn_stdio.ml @@ -45,6 +45,8 @@ module ProtoKahn: S = struct ) ) + let output s = send (Output s) + let select pl = fun cont -> assert false let select_default = fun cont -> assert false @@ -74,14 +76,6 @@ module ProtoKahn: S = struct a (Some (fun va -> let b = (f va) in b cont)) - let bind_io a f = - fun cont -> - a (Some (fun va -> - send_task - (fun () -> - let b = f va in - send_task (fun () -> b cont) false) - true)) let origin = ref false let dbg_out = ref false diff --git a/src/manager.ml b/src/manager.ml index 39b1ee9..0da7df2 100644 --- a/src/manager.ml +++ b/src/manager.ml @@ -7,6 +7,16 @@ let dbg x = if !dbg_out then Format.eprintf "(srv) %s@." x let dbg1 a x = if !dbg_out then Format.eprintf "(srv) %s %s@." (id_str a) x let dbg2 a b x = if !dbg_out then Format.eprintf "(srv) %s %s %s@." (id_str a) (id_str b) x +(* Program options *) +let program = ref "" +let local_proc = ref 1 + +let my_addr = ref "" +let my_port = ref 9011 +let pool_addr = ref "" +let pool_port = ref 9082 +let pool_count = ref 0 + (* Server data structures *) @@ -32,15 +42,33 @@ type server = { msg_chan: (id, string Queue.t) Hashtbl.t; mutable final_result: string option; clients: (id, client) Hashtbl.t; + sock: file_descr; } let new_server () = + let server = { tasks = Queue.create (); tsk_chan = Hashtbl.create 12; msg_chan = Hashtbl.create 12; final_result = None; clients = Hashtbl.create 4; - } + sock = socket PF_INET SOCK_STREAM 0; + } in + (* Setup networking *) + if !my_addr <> "" then begin + dbg @@ Format.sprintf "Listening on port %d" !my_port; + bind server.sock (make_addr "0.0.0.0" !my_port); + listen server.sock (min 1 !pool_count); + + let stop_srv _ = + dbg "Shutting down server..."; + shutdown server.sock SHUTDOWN_ALL; + exit 0 + in + Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv); + Sys.set_signal Sys.sigterm (Sys.Signal_handle stop_srv) + end; + server let push_task server task = let cli = ref None in @@ -102,6 +130,21 @@ let client_of_fd server fd = | None -> assert false | Some c -> c +let server_accept_client server = + let cli, cli_addr = accept server.sock in + let oc = out_channel_of_descr cli in + let cli = + { id = new_id(); + input = cli; + send = (fun msg -> Marshal.to_channel oc msg []; flush oc); + disconnect = (fun () -> shutdown cli SHUTDOWN_ALL; close cli); + status = Busy; + } in + server_add_client server cli + +let client_disconnect server cli = + cli.disconnect (); + Hashtbl.remove server.clients cli.id let rec server_run server = let fds = Hashtbl.fold @@ -110,10 +153,13 @@ let rec server_run server = then c.input::l else l) server.clients [] in - if not (fds = []) then begin + if List.length fds > 0 then begin + let fds = if !my_addr = "" then fds else server.sock :: fds in dbg "selecting..."; let qi, _, qe = select fds [] fds (-1.0) in begin match qi, qe with + | x::_, _ when x = server.sock -> + server_accept_client server | x::_, _ -> let cli = client_of_fd server x in dbg1 cli.id "reading..."; @@ -129,8 +175,7 @@ let rec server_run server = | Task(a, b) -> GiveTask(a,b)) | Some r -> cli.send(FinalResult r); - cli.disconnect(); - Hashtbl.remove server.clients cli.id + client_disconnect server cli end; | Get(chan, td) -> dbg2 cli.id chan "got GET"; @@ -150,8 +195,7 @@ let rec server_run server = List.iter (fun c -> c.send(FinalResult x); - c.disconnect(); - Hashtbl.remove server.clients c.id) + client_disconnect server c) !p | GiveTask(a, b) -> dbg "got Task"; @@ -159,12 +203,12 @@ let rec server_run server = | GiveMsgTask(a, b) -> dbg "got MsgTask"; push_task server (MsgTask(a, b)) + | Output s -> Format.printf "%s@?" s | Hello -> raise (ProtocolError "Unexpected Hello.") end | [], x::_ -> let cli = client_of_fd server x in - cli.disconnect(); - Hashtbl.remove server.clients cli.id + client_disconnect server cli | _ -> assert false end; server_run server @@ -178,14 +222,17 @@ let rec server_run server = (* Main function *) -let program = ref "" -let local_proc = ref 1 let parse_args () = let usage = "Usage: ./manager [options] program" in let options = [ "-dbg", Arg.Set dbg_out, "Show debug output"; "-local-proc", Arg.Set_int local_proc, "Set number of local processes. Default: 1"; + "-my-addr", Arg.Set_string my_addr, "Address (name) of the computer this program is running on."; + "-my-port", Arg.Set_int my_port, "Port for me to listen"; + "-pool-addr", Arg.Set_string pool_addr, "Pool server to use"; + "-pool-port", Arg.Set_int pool_port, "Port on which to connect to pool"; + "-pool-count", Arg.Set_int pool_count, "Number of processes to ask to pool"; ] in Arg.parse options (fun n -> program := n) usage @@ -235,6 +282,23 @@ let () = pids := pid :: (!pids) done; + if !pool_addr <> "" && !pool_count > 0 then begin + let sock = socket PF_INET SOCK_STREAM 0 in + connect sock (make_addr !pool_addr !pool_port); + let outc = out_channel_of_descr sock in + let send m = Marshal.to_channel outc m []; flush outc in + + send PoolHello; + if read_one_msg sock <> PoolHello then + raise (ProtocolError "Expected PoolHello reply."); + + send (PoolRequest(!program, (!my_addr, !my_port), !pool_count)); + + shutdown sock SHUTDOWN_ALL; + close sock + end; + server_run server; + shutdown server.sock SHUTDOWN_ALL; List.iter (fun pid -> ignore (waitpid [] pid)) !pids diff --git a/src/poolclient.ml b/src/poolclient.ml new file mode 100644 index 0000000..00b0d35 --- /dev/null +++ b/src/poolclient.ml @@ -0,0 +1,72 @@ +open Unix +open Util +open Proto + +let pool_port = ref 9082 +let pool_server = ref "" +let provide = ref 4 + +let fullfill_request task (addr, port) n = + for i = 0 to n-1 do + Format.eprintf "Spawn %s@." task; + if fork() = 0 then begin + let sock = socket PF_INET SOCK_STREAM 0 in + connect sock (make_addr addr port); + dup2 sock stdin; + dup2 sock stdout; + execv task [|task|] + end + done + +let run_client () = + let sock = socket PF_INET SOCK_STREAM 0 in + connect sock (make_addr !pool_server !pool_port); + Format.eprintf "Connected.@."; + + let outc = out_channel_of_descr sock in + let send m = Marshal.to_channel outc m []; flush outc in + + send PoolHello; + if read_one_msg sock <> PoolHello then + raise (ProtocolError "Expected PoolHello reply."); + + Format.eprintf "Provide %d@." !provide; + send (PoolProvide !provide); + + let cont = ref true in + while !cont do + let qi, _, qe = select [sock] [] [sock] 1.0 in + begin match qi, qe with + | a::_, _ -> + begin match read_one_msg sock with + | PoolRequest(task, addr, n) -> + fullfill_request task addr n + | _ -> raise (ProtocolError "Unexpected message.") + end + | _, b::_ -> + shutdown sock SHUTDOWN_ALL; + close sock; + cont := false + | _ -> () + end; + try match waitpid [WNOHANG] (-1) with + | x, _ when x > 0 -> + send (PoolProvide 1) + | _ -> () + with _ -> () + done + +let () = + let usage = "Usage: ./poolclient [options] server" in + let options = [ + "-port", Arg.Set_int pool_port, "Set port for pooling server."; + "-provide", Arg.Set_int provide, "Number of processes to provide."; + ] in + Arg.parse options (fun s -> pool_server := s) usage; + + if !pool_server = "" then begin + Format.eprintf "%s@." usage; + exit 0 + end; + + run_client () diff --git a/src/poolserver.ml b/src/poolserver.ml new file mode 100644 index 0000000..15a700c --- /dev/null +++ b/src/poolserver.ml @@ -0,0 +1,122 @@ +open Unix +open Util +open Proto + +let pool_port = ref 9082 + +type client = { + id: id; + addr: sockaddr; + input: file_descr; + send: pool_message -> unit; + disconnect: unit -> unit; + mutable slots: int; +} + +type server = { + clients: (id, client) Hashtbl.t; + sock: file_descr; +} + +let new_server () = + let server = + { clients = Hashtbl.create 12; + sock = socket PF_INET SOCK_STREAM 0; + } in + Format.eprintf "Listening on port %d...@." !pool_port; + bind server.sock (make_addr "0.0.0.0" !pool_port); + listen server.sock 2; + + let stop_srv _ = + Format.eprintf "Shutting down server...@."; + shutdown server.sock SHUTDOWN_ALL; + exit 0 + in + Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv); + Sys.set_signal Sys.sigterm (Sys.Signal_handle stop_srv); + + server + +let server_add_client server cli = + (* Say hello *) + let msg = read_one_msg cli.input in + if msg <> PoolHello then raise (ProtocolError "Client must say PoolHello first thing."); + cli.send PoolHello; + (* Put client somewhere *) + Hashtbl.add server.clients cli.id cli + +let client_of_fd server fd = + let cli = ref None in + Hashtbl.iter (fun _ c -> if c.input = fd then cli := Some c) server.clients; + match !cli with + | None -> assert false + | Some c -> c + +let client_disconnect server cli = + cli.disconnect (); + Hashtbl.remove server.clients cli.id; + Format.eprintf "Disconnected: %s@." (id_str cli.id) + +let rec server_run server = + let fds = Hashtbl.fold + (fun _ c l -> c.input::l) + server.clients [server.sock] + in + let qi, _, qe = select fds [] fds (-1.0) in + begin match qi, qe with + | x::_, _ when x = server.sock -> + let cli, cli_addr = accept server.sock in + let oc = out_channel_of_descr cli in + let cli = + { id = new_id(); + addr = cli_addr; + input = cli; + send = (fun msg -> Marshal.to_channel oc msg []; flush oc); + disconnect = (fun () -> shutdown cli SHUTDOWN_ALL; close cli); + slots = 0; + } in + server_add_client server cli; + Format.eprintf "New client %s.@." (id_str cli.id) + | x::_, _ -> + let cli = client_of_fd server x in + begin try match read_one_msg cli.input with + | PoolProvide n -> + Format.eprintf "%s provide %d@." (id_str cli.id) n; + cli.slots <- cli.slots + n + | PoolRequest(task, addr, n) -> + Format.eprintf "%s request %d for %s@." (id_str cli.id) n task; + (* Distribute tasks in the pool *) + let rec aux n = + if n > 0 then begin + let cli = ref None in + Hashtbl.iter (fun _ c -> if c.slots > 0 then cli := Some c) server.clients; + match !cli with + | None -> () (* Sorry, pool is to small for your request. *) + | Some c -> + let k = min n c.slots in + c.slots <- c.slots - k; + c.send (PoolRequest(task, addr, k)); + aux (n-k) + end + in aux n + | PoolHello -> raise (ProtocolError "Misplaced PoolHello.") + with _ -> + client_disconnect server cli + end + | [], x::_ -> + let cli = client_of_fd server x in + client_disconnect server cli + | _ -> assert false + end; + server_run server (* Infinite Loop ! *) + + +let () = + let usage = "Usage: ./poolserver [options]" in + let options = [ + "-port", Arg.Set_int pool_port, "Set port for pooling server."; + ] in + Arg.parse options (fun _ -> ()) usage; + + let server = new_server() in + server_run server diff --git a/src/primes.ml b/src/primes.ml index 21d979a..a95fb64 100644 --- a/src/primes.ml +++ b/src/primes.ml @@ -26,7 +26,7 @@ module Primes (K : Kahn.S) = struct (get qi) >>= (fun v -> if v <> -1 then begin - Format.eprintf "%d@." v; + K.output @@ Format.sprintf "%d@." v; (delay new_channel ()) >>= (fun (qi2, qo2) -> doco [ filter v qi qo2 ; primes qi2 ]) end diff --git a/src/proto.ml b/src/proto.ml index f0517d4..1f6d8e6 100644 --- a/src/proto.ml +++ b/src/proto.ml @@ -11,6 +11,7 @@ exception ProtocolError of string type message = | Hello + | Output of string | Get of id * msg_task_descr | Put of id * string | RequestTask @@ -19,3 +20,10 @@ type message = | FinalResult of string +(* Protocol for pooling *) + +type pool_message = + | PoolHello + | PoolProvide of int + | PoolRequest of string * (string * int) * int + -- cgit v1.2.3