summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Makefile2
-rw-r--r--src/example.ml12
-rw-r--r--src/kahn.ml3
-rw-r--r--src/kahn_pipe.ml3
-rw-r--r--src/kahn_seq.ml4
-rw-r--r--src/kahn_stdio.ml10
-rw-r--r--src/manager.ml84
-rw-r--r--src/poolclient.ml72
-rw-r--r--src/poolserver.ml122
-rw-r--r--src/primes.ml2
-rw-r--r--src/proto.ml8
11 files changed, 291 insertions, 31 deletions
diff --git a/src/Makefile b/src/Makefile
index ebd2516..dca72f6 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -2,7 +2,7 @@ OCAMLBUILD=ocamlbuild -classic-display \
-tags annot,debug,thread \
-libs unix
-all: primes.native example.native manager.native
+all: primes.native example.native manager.native poolserver.native poolclient.native
%.native: %.ml kahn_pipe.ml kahn_seq.ml kahn_stdio.ml proto.ml util.ml
$(OCAMLBUILD) $@
diff --git a/src/example.ml b/src/example.ml
index 43e5327..1b0f274 100644
--- a/src/example.ml
+++ b/src/example.ml
@@ -28,11 +28,10 @@ module Example (K : Kahn.S) = struct
let output (qi : (int * int) K.in_port) : unit K.process =
let rec loop () =
- K.bind_io
- (K.get qi)
+ (K.get qi) >>=
(fun (v, s) ->
if v <> -1 then
- begin Format.eprintf "f(%d) = %d@." v s; loop () end
+ begin K.output @@ Format.sprintf "f(%d) = %d@." v s; loop () end
else K.return ())
in
loop ()
@@ -64,10 +63,9 @@ module Example (K : Kahn.S) = struct
fib_rec (n-2) (r-1) q_out ;
fib_rec (n-1) (r-1) q_out2 ;
K.get q_in >>= (fun x ->
- K.bind_io
- (K.get q_in2)
+ (K.get q_in2) >>=
(fun y ->
- Format.eprintf "f(%d) = %d@." n (x+y);
+ K.output @@ Format.sprintf "f(%d) = %d@." n (x+y);
K.put (x+y) qo))
]
)))
@@ -76,7 +74,7 @@ module Example (K : Kahn.S) = struct
let main2 : int K.process =
(delay K.new_channel()) >>=
(fun (qi, qo) ->
- (fib_rec 47 7 qo) >>=
+ (fib_rec 50 7 qo) >>=
(fun () -> K.get qi))
end
diff --git a/src/kahn.ml b/src/kahn.ml
index 08eac19..a458724 100644
--- a/src/kahn.ml
+++ b/src/kahn.ml
@@ -7,6 +7,8 @@ module type S = sig
val put: 'a -> 'a out_port -> unit process
val get: 'a in_port -> 'a process
+ val output: string -> unit
+
val select: ('a in_port * ('a -> 'b)) list -> 'b process
val select_default: ('a in_port * ('a -> 'b)) list -> (unit -> 'b) -> 'b process
@@ -14,7 +16,6 @@ module type S = sig
val return: 'a -> 'a process
val bind: 'a process -> ('a -> 'b process) -> 'b process
- val bind_io: 'a process -> ('a -> 'b process) -> 'b process
val run: 'a process -> 'a
end
diff --git a/src/kahn_pipe.ml b/src/kahn_pipe.ml
index 9f3da0a..2df8bc5 100644
--- a/src/kahn_pipe.ml
+++ b/src/kahn_pipe.ml
@@ -18,6 +18,8 @@ module Pipe: S = struct
fun () ->
Marshal.to_channel c x [];
flush c
+
+ let output s = Format.printf "%s@?" s
let try_get block prt_list =
let fds = List.map fst prt_list in
@@ -52,7 +54,6 @@ module Pipe: S = struct
let bind e f =
fun () -> f (e ()) ()
- let bind_io = bind
let run p =
p()
diff --git a/src/kahn_seq.ml b/src/kahn_seq.ml
index ce82117..177d6dd 100644
--- a/src/kahn_seq.ml
+++ b/src/kahn_seq.ml
@@ -19,6 +19,8 @@ module Seq: S = struct
let new_channel () =
let q = Queue.create () in
q, q
+
+ let output s = Format.printf "%s@?" s
let put x c =
fun cont ->
@@ -64,8 +66,6 @@ module Seq: S = struct
fun cont ->
e (Some (fun (r : 'a) -> f r cont))
- let bind_io = bind
-
let run e =
let ret = ref None in
e (Some (fun v -> ret := Some v));
diff --git a/src/kahn_stdio.ml b/src/kahn_stdio.ml
index a149742..badf338 100644
--- a/src/kahn_stdio.ml
+++ b/src/kahn_stdio.ml
@@ -45,6 +45,8 @@ module ProtoKahn: S = struct
)
)
+ let output s = send (Output s)
+
let select pl = fun cont -> assert false
let select_default = fun cont -> assert false
@@ -74,14 +76,6 @@ module ProtoKahn: S = struct
a (Some (fun va ->
let b = (f va) in
b cont))
- let bind_io a f =
- fun cont ->
- a (Some (fun va ->
- send_task
- (fun () ->
- let b = f va in
- send_task (fun () -> b cont) false)
- true))
let origin = ref false
let dbg_out = ref false
diff --git a/src/manager.ml b/src/manager.ml
index 39b1ee9..0da7df2 100644
--- a/src/manager.ml
+++ b/src/manager.ml
@@ -7,6 +7,16 @@ let dbg x = if !dbg_out then Format.eprintf "(srv) %s@." x
let dbg1 a x = if !dbg_out then Format.eprintf "(srv) %s %s@." (id_str a) x
let dbg2 a b x = if !dbg_out then Format.eprintf "(srv) %s %s %s@." (id_str a) (id_str b) x
+(* Program options *)
+let program = ref ""
+let local_proc = ref 1
+
+let my_addr = ref ""
+let my_port = ref 9011
+let pool_addr = ref ""
+let pool_port = ref 9082
+let pool_count = ref 0
+
(* Server data structures *)
@@ -32,15 +42,33 @@ type server = {
msg_chan: (id, string Queue.t) Hashtbl.t;
mutable final_result: string option;
clients: (id, client) Hashtbl.t;
+ sock: file_descr;
}
let new_server () =
+ let server =
{ tasks = Queue.create ();
tsk_chan = Hashtbl.create 12;
msg_chan = Hashtbl.create 12;
final_result = None;
clients = Hashtbl.create 4;
- }
+ sock = socket PF_INET SOCK_STREAM 0;
+ } in
+ (* Setup networking *)
+ if !my_addr <> "" then begin
+ dbg @@ Format.sprintf "Listening on port %d" !my_port;
+ bind server.sock (make_addr "0.0.0.0" !my_port);
+ listen server.sock (min 1 !pool_count);
+
+ let stop_srv _ =
+ dbg "Shutting down server...";
+ shutdown server.sock SHUTDOWN_ALL;
+ exit 0
+ in
+ Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv);
+ Sys.set_signal Sys.sigterm (Sys.Signal_handle stop_srv)
+ end;
+ server
let push_task server task =
let cli = ref None in
@@ -102,6 +130,21 @@ let client_of_fd server fd =
| None -> assert false
| Some c -> c
+let server_accept_client server =
+ let cli, cli_addr = accept server.sock in
+ let oc = out_channel_of_descr cli in
+ let cli =
+ { id = new_id();
+ input = cli;
+ send = (fun msg -> Marshal.to_channel oc msg []; flush oc);
+ disconnect = (fun () -> shutdown cli SHUTDOWN_ALL; close cli);
+ status = Busy;
+ } in
+ server_add_client server cli
+
+let client_disconnect server cli =
+ cli.disconnect ();
+ Hashtbl.remove server.clients cli.id
let rec server_run server =
let fds = Hashtbl.fold
@@ -110,10 +153,13 @@ let rec server_run server =
then c.input::l
else l)
server.clients [] in
- if not (fds = []) then begin
+ if List.length fds > 0 then begin
+ let fds = if !my_addr = "" then fds else server.sock :: fds in
dbg "selecting...";
let qi, _, qe = select fds [] fds (-1.0) in
begin match qi, qe with
+ | x::_, _ when x = server.sock ->
+ server_accept_client server
| x::_, _ ->
let cli = client_of_fd server x in
dbg1 cli.id "reading...";
@@ -129,8 +175,7 @@ let rec server_run server =
| Task(a, b) -> GiveTask(a,b))
| Some r ->
cli.send(FinalResult r);
- cli.disconnect();
- Hashtbl.remove server.clients cli.id
+ client_disconnect server cli
end;
| Get(chan, td) ->
dbg2 cli.id chan "got GET";
@@ -150,8 +195,7 @@ let rec server_run server =
List.iter
(fun c ->
c.send(FinalResult x);
- c.disconnect();
- Hashtbl.remove server.clients c.id)
+ client_disconnect server c)
!p
| GiveTask(a, b) ->
dbg "got Task";
@@ -159,12 +203,12 @@ let rec server_run server =
| GiveMsgTask(a, b) ->
dbg "got MsgTask";
push_task server (MsgTask(a, b))
+ | Output s -> Format.printf "%s@?" s
| Hello -> raise (ProtocolError "Unexpected Hello.")
end
| [], x::_ ->
let cli = client_of_fd server x in
- cli.disconnect();
- Hashtbl.remove server.clients cli.id
+ client_disconnect server cli
| _ -> assert false
end;
server_run server
@@ -178,14 +222,17 @@ let rec server_run server =
(* Main function *)
-let program = ref ""
-let local_proc = ref 1
let parse_args () =
let usage = "Usage: ./manager [options] program" in
let options = [
"-dbg", Arg.Set dbg_out, "Show debug output";
"-local-proc", Arg.Set_int local_proc, "Set number of local processes. Default: 1";
+ "-my-addr", Arg.Set_string my_addr, "Address (name) of the computer this program is running on.";
+ "-my-port", Arg.Set_int my_port, "Port for me to listen";
+ "-pool-addr", Arg.Set_string pool_addr, "Pool server to use";
+ "-pool-port", Arg.Set_int pool_port, "Port on which to connect to pool";
+ "-pool-count", Arg.Set_int pool_count, "Number of processes to ask to pool";
] in
Arg.parse options (fun n -> program := n) usage
@@ -235,6 +282,23 @@ let () =
pids := pid :: (!pids)
done;
+ if !pool_addr <> "" && !pool_count > 0 then begin
+ let sock = socket PF_INET SOCK_STREAM 0 in
+ connect sock (make_addr !pool_addr !pool_port);
+ let outc = out_channel_of_descr sock in
+ let send m = Marshal.to_channel outc m []; flush outc in
+
+ send PoolHello;
+ if read_one_msg sock <> PoolHello then
+ raise (ProtocolError "Expected PoolHello reply.");
+
+ send (PoolRequest(!program, (!my_addr, !my_port), !pool_count));
+
+ shutdown sock SHUTDOWN_ALL;
+ close sock
+ end;
+
server_run server;
+ shutdown server.sock SHUTDOWN_ALL;
List.iter (fun pid -> ignore (waitpid [] pid)) !pids
diff --git a/src/poolclient.ml b/src/poolclient.ml
new file mode 100644
index 0000000..00b0d35
--- /dev/null
+++ b/src/poolclient.ml
@@ -0,0 +1,72 @@
+open Unix
+open Util
+open Proto
+
+let pool_port = ref 9082
+let pool_server = ref ""
+let provide = ref 4
+
+let fullfill_request task (addr, port) n =
+ for i = 0 to n-1 do
+ Format.eprintf "Spawn %s@." task;
+ if fork() = 0 then begin
+ let sock = socket PF_INET SOCK_STREAM 0 in
+ connect sock (make_addr addr port);
+ dup2 sock stdin;
+ dup2 sock stdout;
+ execv task [|task|]
+ end
+ done
+
+let run_client () =
+ let sock = socket PF_INET SOCK_STREAM 0 in
+ connect sock (make_addr !pool_server !pool_port);
+ Format.eprintf "Connected.@.";
+
+ let outc = out_channel_of_descr sock in
+ let send m = Marshal.to_channel outc m []; flush outc in
+
+ send PoolHello;
+ if read_one_msg sock <> PoolHello then
+ raise (ProtocolError "Expected PoolHello reply.");
+
+ Format.eprintf "Provide %d@." !provide;
+ send (PoolProvide !provide);
+
+ let cont = ref true in
+ while !cont do
+ let qi, _, qe = select [sock] [] [sock] 1.0 in
+ begin match qi, qe with
+ | a::_, _ ->
+ begin match read_one_msg sock with
+ | PoolRequest(task, addr, n) ->
+ fullfill_request task addr n
+ | _ -> raise (ProtocolError "Unexpected message.")
+ end
+ | _, b::_ ->
+ shutdown sock SHUTDOWN_ALL;
+ close sock;
+ cont := false
+ | _ -> ()
+ end;
+ try match waitpid [WNOHANG] (-1) with
+ | x, _ when x > 0 ->
+ send (PoolProvide 1)
+ | _ -> ()
+ with _ -> ()
+ done
+
+let () =
+ let usage = "Usage: ./poolclient [options] server" in
+ let options = [
+ "-port", Arg.Set_int pool_port, "Set port for pooling server.";
+ "-provide", Arg.Set_int provide, "Number of processes to provide.";
+ ] in
+ Arg.parse options (fun s -> pool_server := s) usage;
+
+ if !pool_server = "" then begin
+ Format.eprintf "%s@." usage;
+ exit 0
+ end;
+
+ run_client ()
diff --git a/src/poolserver.ml b/src/poolserver.ml
new file mode 100644
index 0000000..15a700c
--- /dev/null
+++ b/src/poolserver.ml
@@ -0,0 +1,122 @@
+open Unix
+open Util
+open Proto
+
+let pool_port = ref 9082
+
+type client = {
+ id: id;
+ addr: sockaddr;
+ input: file_descr;
+ send: pool_message -> unit;
+ disconnect: unit -> unit;
+ mutable slots: int;
+}
+
+type server = {
+ clients: (id, client) Hashtbl.t;
+ sock: file_descr;
+}
+
+let new_server () =
+ let server =
+ { clients = Hashtbl.create 12;
+ sock = socket PF_INET SOCK_STREAM 0;
+ } in
+ Format.eprintf "Listening on port %d...@." !pool_port;
+ bind server.sock (make_addr "0.0.0.0" !pool_port);
+ listen server.sock 2;
+
+ let stop_srv _ =
+ Format.eprintf "Shutting down server...@.";
+ shutdown server.sock SHUTDOWN_ALL;
+ exit 0
+ in
+ Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv);
+ Sys.set_signal Sys.sigterm (Sys.Signal_handle stop_srv);
+
+ server
+
+let server_add_client server cli =
+ (* Say hello *)
+ let msg = read_one_msg cli.input in
+ if msg <> PoolHello then raise (ProtocolError "Client must say PoolHello first thing.");
+ cli.send PoolHello;
+ (* Put client somewhere *)
+ Hashtbl.add server.clients cli.id cli
+
+let client_of_fd server fd =
+ let cli = ref None in
+ Hashtbl.iter (fun _ c -> if c.input = fd then cli := Some c) server.clients;
+ match !cli with
+ | None -> assert false
+ | Some c -> c
+
+let client_disconnect server cli =
+ cli.disconnect ();
+ Hashtbl.remove server.clients cli.id;
+ Format.eprintf "Disconnected: %s@." (id_str cli.id)
+
+let rec server_run server =
+ let fds = Hashtbl.fold
+ (fun _ c l -> c.input::l)
+ server.clients [server.sock]
+ in
+ let qi, _, qe = select fds [] fds (-1.0) in
+ begin match qi, qe with
+ | x::_, _ when x = server.sock ->
+ let cli, cli_addr = accept server.sock in
+ let oc = out_channel_of_descr cli in
+ let cli =
+ { id = new_id();
+ addr = cli_addr;
+ input = cli;
+ send = (fun msg -> Marshal.to_channel oc msg []; flush oc);
+ disconnect = (fun () -> shutdown cli SHUTDOWN_ALL; close cli);
+ slots = 0;
+ } in
+ server_add_client server cli;
+ Format.eprintf "New client %s.@." (id_str cli.id)
+ | x::_, _ ->
+ let cli = client_of_fd server x in
+ begin try match read_one_msg cli.input with
+ | PoolProvide n ->
+ Format.eprintf "%s provide %d@." (id_str cli.id) n;
+ cli.slots <- cli.slots + n
+ | PoolRequest(task, addr, n) ->
+ Format.eprintf "%s request %d for %s@." (id_str cli.id) n task;
+ (* Distribute tasks in the pool *)
+ let rec aux n =
+ if n > 0 then begin
+ let cli = ref None in
+ Hashtbl.iter (fun _ c -> if c.slots > 0 then cli := Some c) server.clients;
+ match !cli with
+ | None -> () (* Sorry, pool is to small for your request. *)
+ | Some c ->
+ let k = min n c.slots in
+ c.slots <- c.slots - k;
+ c.send (PoolRequest(task, addr, k));
+ aux (n-k)
+ end
+ in aux n
+ | PoolHello -> raise (ProtocolError "Misplaced PoolHello.")
+ with _ ->
+ client_disconnect server cli
+ end
+ | [], x::_ ->
+ let cli = client_of_fd server x in
+ client_disconnect server cli
+ | _ -> assert false
+ end;
+ server_run server (* Infinite Loop ! *)
+
+
+let () =
+ let usage = "Usage: ./poolserver [options]" in
+ let options = [
+ "-port", Arg.Set_int pool_port, "Set port for pooling server.";
+ ] in
+ Arg.parse options (fun _ -> ()) usage;
+
+ let server = new_server() in
+ server_run server
diff --git a/src/primes.ml b/src/primes.ml
index 21d979a..a95fb64 100644
--- a/src/primes.ml
+++ b/src/primes.ml
@@ -26,7 +26,7 @@ module Primes (K : Kahn.S) = struct
(get qi) >>= (fun v ->
if v <> -1 then
begin
- Format.eprintf "%d@." v;
+ K.output @@ Format.sprintf "%d@." v;
(delay new_channel ()) >>=
(fun (qi2, qo2) -> doco [ filter v qi qo2 ; primes qi2 ])
end
diff --git a/src/proto.ml b/src/proto.ml
index f0517d4..1f6d8e6 100644
--- a/src/proto.ml
+++ b/src/proto.ml
@@ -11,6 +11,7 @@ exception ProtocolError of string
type message =
| Hello
+ | Output of string
| Get of id * msg_task_descr
| Put of id * string
| RequestTask
@@ -19,3 +20,10 @@ type message =
| FinalResult of string
+(* Protocol for pooling *)
+
+type pool_message =
+ | PoolHello
+ | PoolProvide of int
+ | PoolRequest of string * (string * int) * int
+