summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex AUVOLAT <alex.auvolat@ens.fr>2014-05-24 23:25:07 +0200
committerAlex AUVOLAT <alex.auvolat@ens.fr>2014-05-24 23:25:07 +0200
commit6e750a757ef6fb1f41cf4c2fe39edba834b76858 (patch)
treef339630beb8a9a1a6f3544b40547ce9c83f23a91 /src
parentc5e69a904e79e807c5b075c08ce82183133e7b4c (diff)
downloadSystemeReseaux-Projet-6e750a757ef6fb1f41cf4c2fe39edba834b76858.tar.gz
SystemeReseaux-Projet-6e750a757ef6fb1f41cf4c2fe39edba834b76858.zip
./manager -local-proc 4 ./example.native does what expected.
Diffstat (limited to 'src')
-rw-r--r--src/Makefile12
-rw-r--r--src/example.ml21
-rw-r--r--src/kahn_sock.ml120
-rw-r--r--src/kahn_stdio.ml131
-rw-r--r--src/manager.ml240
-rw-r--r--src/primes.ml15
-rw-r--r--src/proto.ml21
-rw-r--r--src/util.ml20
8 files changed, 434 insertions, 146 deletions
diff --git a/src/Makefile b/src/Makefile
index ed9aa69..ebd2516 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -1,19 +1,13 @@
OCAMLBUILD=ocamlbuild -classic-display \
-tags annot,debug,thread \
-libs unix
-TARGET=native
-primes:
- $(OCAMLBUILD) primes.$(TARGET)
+all: primes.native example.native manager.native
-example:
- $(OCAMLBUILD) example.$(TARGET)
+%.native: %.ml kahn_pipe.ml kahn_seq.ml kahn_stdio.ml proto.ml util.ml
+ $(OCAMLBUILD) $@
clean:
$(OCAMLBUILD) -clean
-realclean: clean
- rm -f *~
-
-cleanall: realclean
diff --git a/src/example.ml b/src/example.ml
index 4971448..43e5327 100644
--- a/src/example.ml
+++ b/src/example.ml
@@ -32,7 +32,7 @@ module Example (K : Kahn.S) = struct
(K.get qi)
(fun (v, s) ->
if v <> -1 then
- begin Format.printf "f(%d) = %d@." v s; loop () end
+ begin Format.eprintf "f(%d) = %d@." v s; loop () end
else K.return ())
in
loop ()
@@ -67,25 +67,22 @@ module Example (K : Kahn.S) = struct
K.bind_io
(K.get q_in2)
(fun y ->
- Format.printf "f(%d) = %d@." n (x+y);
+ Format.eprintf "f(%d) = %d@." n (x+y);
K.put (x+y) qo))
]
)))
- let main2 : unit K.process =
+ let main2 : int K.process =
(delay K.new_channel()) >>=
(fun (qi, qo) ->
- K.doco
- [
- fib_rec 53 7 qo;
- K.bind_io
- (K.get qi)
- (fun v -> Format.printf "Got it! Result is %d@." v; K.return ())
- ])
+ (fib_rec 47 7 qo) >>=
+ (fun () -> K.get qi))
end
-module E = Example(Kahn_seq.Seq)
+module E = Example(Kahn_stdio.ProtoKahn)
-let () = E.K.run E.main2
+let () =
+ let r = E.K.run E.main2 in
+ Format.eprintf "Final result: %d@." r
diff --git a/src/kahn_sock.ml b/src/kahn_sock.ml
deleted file mode 100644
index bce6cf2..0000000
--- a/src/kahn_sock.ml
+++ /dev/null
@@ -1,120 +0,0 @@
-open Kahn
-open Unix
-
-(* make_addr : string -> int -> sockaddr *)
-let make_addr host port =
- let host = gethostbyname host in
- ADDR_INET(host.h_addr_list.(Random.int (Array.length host.h_addr_list)), port)
-
-module Sock: S = struct
-
- let kahn_port = 8197
-
- type 'a process = (('a -> unit) option) -> unit
-
-
- type 'a channel = int
- type 'a in_port = 'a channel
- type 'a out_port = 'a channel
-
- type task = unit -> unit
- let tasks = Queue.create ()
-
- let socket_to_srv = ref None
-
- type cli_msg =
- | Hello
- | Put of int * string
- | Get of int * (string -> task)
- | AskTask
- | GiveTask of task
- | GiveIOTask of task
- | FinalResult of string
- type srv_msg =
- | Hello
- | GiveTask of task
- | PleaseWait
- | FinalResult of string
-
-
- let rec tell_server (msg : cli_msg) =
- match !socket_to_srv with
- | Some s -> Marshal.to_channel s msg [Marshal.Closures]; flush s
- | None -> handle_msg_server (fun _ -> assert false) msg
-
- and handle_msg_server (reply_fun : srv_msg -> unit) = function
- | Hello -> reply_fun Hello
- | _ -> () (* TODO *)
-
- and client host =
- (* Initialize socket *)
- let sock = socket PF_INET SOCK_STREAM 0 in
- connect sock (make_addr host kahn_port);
- let i, o = in_channel_of_descr sock, out_channel_of_descr sock in
- socket_to_srv := Some o;
- let get_msg () = Marshal.from_channel i in
- (* Initialize protocol *)
- tell_server Hello;
- assert (get_msg() = Hello);
- (* Loop *)
- let rec loop () =
- tell_server AskTask;
- match get_msg () with
- | Hello -> assert false
- | GiveTask task -> task (); loop ()
- | PleaseWait -> sleep 2; loop ()
- | FinalResult s -> Marshal.from_string s
- in
- let result = loop() in
- shutdown sock SHUTDOWN_ALL;
- result
-
- and server e =
- (* Initialize task list *)
- push_task (fun () -> e None);
-
- (* Initialize socket *)
- let sock = socket PF_INET SOCK_STREAM 0 in
- bind sock (make_addr "0.0.0.0" kahn_port);
- listen sock 10;
- let stop_srv _ =
- Format.eprintf "Shutdown server...@.";
- shutdown sock SHUTDOWN_ALL;
- exit 0
- in
- Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv);
- Sys.set_signal Sys.sigterm (Sys.Signal_handle stop_srv);
-
- (* Loop *)
- let clients = ref [] in
- while true do
- let fds = List.map (fun (i, o, a) -> descr_of_in_channel i) !clients in
- match select (sock::fds) [] [] (-1.0) with
- | s::_, _, _ when s = sock ->
- (* New client ! *)
- let fd, addr = accept sock in
- clients :=
- (in_channel_of_descr fd,
- out_channel_of_descr fd,
- addr)::!clients
- | s::_, _, _ ->
- (* Client sent something *)
- let i, o, a = List.find
- (fun (i, _, _) -> descr_of_in_channel i = s) !clients in
- let msg = Marshal.from_channel i in
- handle_msg_server
- (fun m -> Marshal.to_channel o m [Marshall.Closures]; flush o)
- msg
- | _ -> assert false
- done
-
- let srv = ref ""
- let set_var v s = v := s
- let run e =
- Arg.parse [] (set_var srv) "usage: kahn [server_addr]";
- if !srv = "" then
- server e
- else
- client !sr
-
-end
diff --git a/src/kahn_stdio.ml b/src/kahn_stdio.ml
new file mode 100644
index 0000000..a149742
--- /dev/null
+++ b/src/kahn_stdio.ml
@@ -0,0 +1,131 @@
+open Unix
+
+open Util
+open Kahn
+open Proto
+
+
+module ProtoKahn: S = struct
+
+ type 'a process = (('a -> unit) option) -> unit
+
+ type 'a channel = id
+ type 'a in_port = 'a channel
+ type 'a out_port = 'a channel
+
+ let send m = Marshal.to_channel Pervasives.stdout m []; flush Pervasives.stdout
+ let read () : message = read_one_msg stdin
+
+ let task_desc t = Marshal.to_string t [Marshal.Closures]
+
+ let send_task t is_io =
+ send (GiveTask(task_desc t, is_io))
+
+ let new_channel () =
+ let x = new_id() in x, x
+
+ let push_cont cont arg is_io =
+ match cont with
+ | None -> ()
+ | Some cont ->
+ send_task (fun () -> cont arg) is_io
+
+ let put v prt =
+ fun cont ->
+ send (Put(prt, Marshal.to_string v []));
+ push_cont cont () false
+
+ let get prt =
+ fun cont ->
+ send (Get(prt,
+ task_desc
+ (fun s -> match cont with
+ | None -> ()
+ | Some cont -> cont (Marshal.from_string s 0))
+ )
+ )
+
+ let select pl = fun cont -> assert false
+ let select_default = fun cont -> assert false
+
+ let doco plist =
+ fun cont ->
+ let f_ch_id = new_id () in
+ List.iter
+ (fun p ->
+ send_task
+ (fun () -> p
+ (Some (fun () -> send (Put(f_ch_id, ""))))
+ )
+ false
+ ) plist;
+ let rec push_x = function
+ | 0 -> push_cont cont () false
+ | n -> send (Get(f_ch_id, task_desc (fun s -> push_x (n-1))))
+ in push_x (List.length plist)
+
+ let return v =
+ fun cont ->
+ match cont with
+ | None -> ()
+ | Some cont -> cont v
+ let bind a f =
+ fun cont ->
+ a (Some (fun va ->
+ let b = (f va) in
+ b cont))
+ let bind_io a f =
+ fun cont ->
+ a (Some (fun va ->
+ send_task
+ (fun () ->
+ let b = f va in
+ send_task (fun () -> b cont) false)
+ true))
+
+ let origin = ref false
+ let dbg_out = ref false
+ let dbg x = if !dbg_out then Format.eprintf "(cli) %s@." x
+
+ let parse_args () =
+ let usage = "Usage: ./program [options]" in
+ let options = [
+ "-org", Arg.Set origin, "Launch root process";
+ "-dbg", Arg.Set dbg_out, "Show debug output";
+ ] in
+ Arg.parse options (fun _ -> assert false) usage
+
+ let run proc =
+ Random.self_init();
+ parse_args();
+ (* Initialize protocol *)
+ send Hello;
+ if read () <> Hello then raise (ProtocolError "Server did not say Hello correctly.");
+ (* Start task if necessary *)
+ if !origin then proc (Some (fun r -> send (FinalResult (Marshal.to_string r []))));
+ (* While there are things to do... *)
+ let result = ref None in
+ while !result = None do
+ dbg "Requesting task...";
+ send RequestTask;
+ dbg "Reading...";
+ match read() with
+ | GiveTask(td, _) ->
+ dbg "Got task!";
+ let t : task = Marshal.from_string td 0 in
+ t();
+ | GiveMsgTask(msg, td) ->
+ dbg "Got msg task!";
+ let t : msg_task = Marshal.from_string td 0 in
+ t msg;
+ | FinalResult(x) ->
+ dbg "Got result!";
+ result := Some (Marshal.from_string x 0)
+ | _ -> raise (ProtocolError "Invalid message in main loop.")
+ done;
+ (* Return result *)
+ match !result with
+ | None -> assert false
+ | Some r -> r
+
+end
diff --git a/src/manager.ml b/src/manager.ml
new file mode 100644
index 0000000..39b1ee9
--- /dev/null
+++ b/src/manager.ml
@@ -0,0 +1,240 @@
+open Unix
+open Proto
+open Util
+
+let dbg_out = ref false
+let dbg x = if !dbg_out then Format.eprintf "(srv) %s@." x
+let dbg1 a x = if !dbg_out then Format.eprintf "(srv) %s %s@." (id_str a) x
+let dbg2 a b x = if !dbg_out then Format.eprintf "(srv) %s %s %s@." (id_str a) (id_str b) x
+
+
+(* Server data structures *)
+
+type task_el =
+ | Task of task_descr * bool
+ | MsgTask of string * msg_task_descr
+
+type client_status =
+ | Waiting
+ | Busy
+
+type client = {
+ id: id;
+ input: file_descr;
+ send: message -> unit;
+ disconnect: unit -> unit;
+ mutable status: client_status;
+}
+
+type server = {
+ tasks: task_el Queue.t;
+ tsk_chan: (id, msg_task_descr) Hashtbl.t;
+ msg_chan: (id, string Queue.t) Hashtbl.t;
+ mutable final_result: string option;
+ clients: (id, client) Hashtbl.t;
+}
+
+let new_server () =
+ { tasks = Queue.create ();
+ tsk_chan = Hashtbl.create 12;
+ msg_chan = Hashtbl.create 12;
+ final_result = None;
+ clients = Hashtbl.create 4;
+ }
+
+let push_task server task =
+ let cli = ref None in
+ Hashtbl.iter
+ (fun _ c -> if c.status = Waiting then cli := Some c)
+ server.clients;
+ match !cli with
+ | None -> Queue.push task server.tasks
+ | Some c ->
+ c.status <- Busy;
+ c.send
+ (match task with
+ | MsgTask(a, b) -> GiveMsgTask(a, b)
+ | Task(a, b) -> GiveTask(a, b))
+
+let get_task server =
+ Queue.pop server.tasks
+
+let handle_put server chan msg =
+ if Hashtbl.mem server.tsk_chan chan then
+ let task = Hashtbl.find server.tsk_chan chan in
+ Hashtbl.remove server.tsk_chan chan;
+ push_task server (MsgTask(msg, task))
+ else
+ let chq =
+ if Hashtbl.mem server.msg_chan chan then
+ Hashtbl.find server.msg_chan chan
+ else
+ let q = Queue.create () in
+ Hashtbl.add server.msg_chan chan q;
+ q
+ in
+ Queue.push msg chq
+
+let handle_get server chan task =
+ if Hashtbl.mem server.msg_chan chan &&
+ (let q = Hashtbl.find server.msg_chan chan in not (Queue.is_empty q))
+ then
+ let msg = Queue.pop (Hashtbl.find server.msg_chan chan) in
+ push_task server (MsgTask(msg, task))
+ else
+ if Hashtbl.mem server.tsk_chan chan then
+ raise (ProtocolError "Several listeners on same channel.")
+ else
+ Hashtbl.add server.tsk_chan chan task
+
+let server_add_client server cli =
+ (* Say hello *)
+ let msg = read_one_msg cli.input in
+ if msg <> Hello then raise (ProtocolError "Client must say Hello first thing.");
+ cli.send Hello;
+ (* Put client on queue *)
+ Hashtbl.add server.clients cli.id cli
+
+let client_of_fd server fd =
+ let cli = ref None in
+ Hashtbl.iter (fun _ c -> if c.input = fd then cli := Some c) server.clients;
+ match !cli with
+ | None -> assert false
+ | Some c -> c
+
+
+let rec server_run server =
+ let fds = Hashtbl.fold
+ (fun _ c l ->
+ if c.status = Busy
+ then c.input::l
+ else l)
+ server.clients [] in
+ if not (fds = []) then begin
+ dbg "selecting...";
+ let qi, _, qe = select fds [] fds (-1.0) in
+ begin match qi, qe with
+ | x::_, _ ->
+ let cli = client_of_fd server x in
+ dbg1 cli.id "reading...";
+ begin match read_one_msg cli.input with
+ | RequestTask ->
+ dbg "got task request";
+ begin match server.final_result with
+ | None ->
+ if Queue.is_empty server.tasks then
+ cli.status <- Waiting
+ else cli.send (match Queue.pop server.tasks with
+ | MsgTask(a, b) -> GiveMsgTask(a, b)
+ | Task(a, b) -> GiveTask(a,b))
+ | Some r ->
+ cli.send(FinalResult r);
+ cli.disconnect();
+ Hashtbl.remove server.clients cli.id
+ end;
+ | Get(chan, td) ->
+ dbg2 cli.id chan "got GET";
+ handle_get server chan td
+ | Put(chan, msg) ->
+ dbg2 cli.id chan "got PUT";
+ handle_put server chan msg
+ | FinalResult x ->
+ dbg "got FinalResult";
+ cli.status <- Waiting;
+ server.final_result <- Some x;
+
+ let p = ref [] in
+ Hashtbl.iter
+ (fun _ c -> if c.status = Waiting then p := c::(!p))
+ server.clients;
+ List.iter
+ (fun c ->
+ c.send(FinalResult x);
+ c.disconnect();
+ Hashtbl.remove server.clients c.id)
+ !p
+ | GiveTask(a, b) ->
+ dbg "got Task";
+ push_task server (Task(a, b))
+ | GiveMsgTask(a, b) ->
+ dbg "got MsgTask";
+ push_task server (MsgTask(a, b))
+ | Hello -> raise (ProtocolError "Unexpected Hello.")
+ end
+ | [], x::_ ->
+ let cli = client_of_fd server x in
+ cli.disconnect();
+ Hashtbl.remove server.clients cli.id
+ | _ -> assert false
+ end;
+ server_run server
+ end else begin
+ if server.final_result = None then begin
+ Format.eprintf "Queue empty: %s@." (if Queue.is_empty server.tasks then "yes" else "no");
+ Format.eprintf "Client count: %d@." (Hashtbl.length server.clients);
+ raise (ProtocolError "Everybody waiting but nothing to do.")
+ end
+ end
+
+(* Main function *)
+
+let program = ref ""
+let local_proc = ref 1
+
+let parse_args () =
+ let usage = "Usage: ./manager [options] program" in
+ let options = [
+ "-dbg", Arg.Set dbg_out, "Show debug output";
+ "-local-proc", Arg.Set_int local_proc, "Set number of local processes. Default: 1";
+ ] in
+ Arg.parse options (fun n -> program := n) usage
+
+let () =
+ Random.self_init();
+ parse_args();
+ if !local_proc < 1 then begin
+ Format.eprintf "Error: at least one local process must be launched !@.";
+ exit 0;
+ end;
+ if !program = "" then begin
+ Format.eprintf "Error: no program specified!@.";
+ exit 0
+ end;
+
+ let server = new_server () in
+ let pids = ref [] in
+
+ for i = 0 to !local_proc - 1 do
+ (* Create file descriptors *)
+ let m2p_p, m2p_m = pipe () in
+ let p2m_m, p2m_p = pipe () in
+ match fork() with
+ | 0 ->
+ close m2p_m;
+ close p2m_m;
+ dup2 m2p_p stdin;
+ dup2 p2m_p stdout;
+ let args = Array.of_list
+ ([!program] @
+ (if i = 0 then ["-org"] else []) @
+ (if !dbg_out then ["-dbg"] else [])) in
+ execv !program args
+ | pid ->
+ close m2p_p;
+ close p2m_p;
+ let outc = Unix.out_channel_of_descr m2p_m in
+
+ server_add_client server
+ { id = new_id();
+ input = p2m_m;
+ send = (fun msg -> Marshal.to_channel outc msg []; flush outc);
+ disconnect = (fun () -> close p2m_m; close m2p_m);
+ status = Busy;
+ };
+
+ pids := pid :: (!pids)
+ done;
+
+ server_run server;
+ List.iter (fun pid -> ignore (waitpid [] pid)) !pids
+
diff --git a/src/primes.ml b/src/primes.ml
index b9a57ed..21d979a 100644
--- a/src/primes.ml
+++ b/src/primes.ml
@@ -26,20 +26,25 @@ module Primes (K : Kahn.S) = struct
(get qi) >>= (fun v ->
if v <> -1 then
begin
- Format.printf "%d@." v;
+ Format.eprintf "%d@." v;
(delay new_channel ()) >>=
(fun (qi2, qo2) -> doco [ filter v qi qo2 ; primes qi2 ])
end
else return ())
- let main : unit process =
+ let main : int process =
(delay new_channel ()) >>=
- (fun (q_in, q_out) -> doco [ integers 2000 q_out ; primes q_in ])
+ (fun (q_in, q_out) -> doco [ integers 500 q_out ; primes q_in ])
+ >>= (fun () -> return 42)
end
-module Eng = Kahn_pipe.Pipe
+module Eng = Kahn_stdio.ProtoKahn
module P = Primes(Eng)
-let () = P.K.run P.main
+let () =
+ let r = P.K.run P.main in
+ assert (r = 42);
+ Format.eprintf "Primes finished (%d \\o/).@." r
+
diff --git a/src/proto.ml b/src/proto.ml
new file mode 100644
index 0000000..f0517d4
--- /dev/null
+++ b/src/proto.ml
@@ -0,0 +1,21 @@
+open Util
+
+(* Protocol definitions *)
+
+type task = unit -> unit
+type msg_task = string -> unit
+type task_descr = string
+type msg_task_descr = string
+
+exception ProtocolError of string
+
+type message =
+ | Hello
+ | Get of id * msg_task_descr
+ | Put of id * string
+ | RequestTask
+ | GiveTask of task_descr * bool
+ | GiveMsgTask of string * msg_task_descr
+ | FinalResult of string
+
+
diff --git a/src/util.ml b/src/util.ml
new file mode 100644
index 0000000..5502398
--- /dev/null
+++ b/src/util.ml
@@ -0,0 +1,20 @@
+open Unix
+
+type id = int * int
+let new_id () : id = (Random.int 100000, Random.int 100000)
+let id_str (a, b) = Format.sprintf "%d.%d" a b
+
+(* make_addr : string -> int -> sockaddr *)
+let make_addr host port =
+ let host = gethostbyname host in
+ ADDR_INET(host.h_addr_list.(Random.int (Array.length host.h_addr_list)), port)
+
+
+(* Unmarshal ONE message from a file descriptor, and DO NOT buffer more data *)
+let read_one_msg fd =
+ let hdr = String.create Marshal.header_size in
+ assert (read fd hdr 0 Marshal.header_size = Marshal.header_size);
+ let dlen = Marshal.data_size hdr 0 in
+ let data = String.create dlen in
+ assert (read fd data 0 dlen = dlen);
+ Marshal.from_string (hdr ^ data) 0