From 6e750a757ef6fb1f41cf4c2fe39edba834b76858 Mon Sep 17 00:00:00 2001 From: Alex AUVOLAT Date: Sat, 24 May 2014 23:25:07 +0200 Subject: ./manager -local-proc 4 ./example.native does what expected. --- src/Makefile | 12 +-- src/example.ml | 21 ++--- src/kahn_sock.ml | 120 --------------------------- src/kahn_stdio.ml | 131 +++++++++++++++++++++++++++++ src/manager.ml | 240 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/primes.ml | 15 ++-- src/proto.ml | 21 +++++ src/util.ml | 20 +++++ 8 files changed, 434 insertions(+), 146 deletions(-) delete mode 100644 src/kahn_sock.ml create mode 100644 src/kahn_stdio.ml create mode 100644 src/manager.ml create mode 100644 src/proto.ml create mode 100644 src/util.ml (limited to 'src') diff --git a/src/Makefile b/src/Makefile index ed9aa69..ebd2516 100644 --- a/src/Makefile +++ b/src/Makefile @@ -1,19 +1,13 @@ OCAMLBUILD=ocamlbuild -classic-display \ -tags annot,debug,thread \ -libs unix -TARGET=native -primes: - $(OCAMLBUILD) primes.$(TARGET) +all: primes.native example.native manager.native -example: - $(OCAMLBUILD) example.$(TARGET) +%.native: %.ml kahn_pipe.ml kahn_seq.ml kahn_stdio.ml proto.ml util.ml + $(OCAMLBUILD) $@ clean: $(OCAMLBUILD) -clean -realclean: clean - rm -f *~ - -cleanall: realclean diff --git a/src/example.ml b/src/example.ml index 4971448..43e5327 100644 --- a/src/example.ml +++ b/src/example.ml @@ -32,7 +32,7 @@ module Example (K : Kahn.S) = struct (K.get qi) (fun (v, s) -> if v <> -1 then - begin Format.printf "f(%d) = %d@." v s; loop () end + begin Format.eprintf "f(%d) = %d@." v s; loop () end else K.return ()) in loop () @@ -67,25 +67,22 @@ module Example (K : Kahn.S) = struct K.bind_io (K.get q_in2) (fun y -> - Format.printf "f(%d) = %d@." n (x+y); + Format.eprintf "f(%d) = %d@." n (x+y); K.put (x+y) qo)) ] ))) - let main2 : unit K.process = + let main2 : int K.process = (delay K.new_channel()) >>= (fun (qi, qo) -> - K.doco - [ - fib_rec 53 7 qo; - K.bind_io - (K.get qi) - (fun v -> Format.printf "Got it! Result is %d@." v; K.return ()) - ]) + (fib_rec 47 7 qo) >>= + (fun () -> K.get qi)) end -module E = Example(Kahn_seq.Seq) +module E = Example(Kahn_stdio.ProtoKahn) -let () = E.K.run E.main2 +let () = + let r = E.K.run E.main2 in + Format.eprintf "Final result: %d@." r diff --git a/src/kahn_sock.ml b/src/kahn_sock.ml deleted file mode 100644 index bce6cf2..0000000 --- a/src/kahn_sock.ml +++ /dev/null @@ -1,120 +0,0 @@ -open Kahn -open Unix - -(* make_addr : string -> int -> sockaddr *) -let make_addr host port = - let host = gethostbyname host in - ADDR_INET(host.h_addr_list.(Random.int (Array.length host.h_addr_list)), port) - -module Sock: S = struct - - let kahn_port = 8197 - - type 'a process = (('a -> unit) option) -> unit - - - type 'a channel = int - type 'a in_port = 'a channel - type 'a out_port = 'a channel - - type task = unit -> unit - let tasks = Queue.create () - - let socket_to_srv = ref None - - type cli_msg = - | Hello - | Put of int * string - | Get of int * (string -> task) - | AskTask - | GiveTask of task - | GiveIOTask of task - | FinalResult of string - type srv_msg = - | Hello - | GiveTask of task - | PleaseWait - | FinalResult of string - - - let rec tell_server (msg : cli_msg) = - match !socket_to_srv with - | Some s -> Marshal.to_channel s msg [Marshal.Closures]; flush s - | None -> handle_msg_server (fun _ -> assert false) msg - - and handle_msg_server (reply_fun : srv_msg -> unit) = function - | Hello -> reply_fun Hello - | _ -> () (* TODO *) - - and client host = - (* Initialize socket *) - let sock = socket PF_INET SOCK_STREAM 0 in - connect sock (make_addr host kahn_port); - let i, o = in_channel_of_descr sock, out_channel_of_descr sock in - socket_to_srv := Some o; - let get_msg () = Marshal.from_channel i in - (* Initialize protocol *) - tell_server Hello; - assert (get_msg() = Hello); - (* Loop *) - let rec loop () = - tell_server AskTask; - match get_msg () with - | Hello -> assert false - | GiveTask task -> task (); loop () - | PleaseWait -> sleep 2; loop () - | FinalResult s -> Marshal.from_string s - in - let result = loop() in - shutdown sock SHUTDOWN_ALL; - result - - and server e = - (* Initialize task list *) - push_task (fun () -> e None); - - (* Initialize socket *) - let sock = socket PF_INET SOCK_STREAM 0 in - bind sock (make_addr "0.0.0.0" kahn_port); - listen sock 10; - let stop_srv _ = - Format.eprintf "Shutdown server...@."; - shutdown sock SHUTDOWN_ALL; - exit 0 - in - Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv); - Sys.set_signal Sys.sigterm (Sys.Signal_handle stop_srv); - - (* Loop *) - let clients = ref [] in - while true do - let fds = List.map (fun (i, o, a) -> descr_of_in_channel i) !clients in - match select (sock::fds) [] [] (-1.0) with - | s::_, _, _ when s = sock -> - (* New client ! *) - let fd, addr = accept sock in - clients := - (in_channel_of_descr fd, - out_channel_of_descr fd, - addr)::!clients - | s::_, _, _ -> - (* Client sent something *) - let i, o, a = List.find - (fun (i, _, _) -> descr_of_in_channel i = s) !clients in - let msg = Marshal.from_channel i in - handle_msg_server - (fun m -> Marshal.to_channel o m [Marshall.Closures]; flush o) - msg - | _ -> assert false - done - - let srv = ref "" - let set_var v s = v := s - let run e = - Arg.parse [] (set_var srv) "usage: kahn [server_addr]"; - if !srv = "" then - server e - else - client !sr - -end diff --git a/src/kahn_stdio.ml b/src/kahn_stdio.ml new file mode 100644 index 0000000..a149742 --- /dev/null +++ b/src/kahn_stdio.ml @@ -0,0 +1,131 @@ +open Unix + +open Util +open Kahn +open Proto + + +module ProtoKahn: S = struct + + type 'a process = (('a -> unit) option) -> unit + + type 'a channel = id + type 'a in_port = 'a channel + type 'a out_port = 'a channel + + let send m = Marshal.to_channel Pervasives.stdout m []; flush Pervasives.stdout + let read () : message = read_one_msg stdin + + let task_desc t = Marshal.to_string t [Marshal.Closures] + + let send_task t is_io = + send (GiveTask(task_desc t, is_io)) + + let new_channel () = + let x = new_id() in x, x + + let push_cont cont arg is_io = + match cont with + | None -> () + | Some cont -> + send_task (fun () -> cont arg) is_io + + let put v prt = + fun cont -> + send (Put(prt, Marshal.to_string v [])); + push_cont cont () false + + let get prt = + fun cont -> + send (Get(prt, + task_desc + (fun s -> match cont with + | None -> () + | Some cont -> cont (Marshal.from_string s 0)) + ) + ) + + let select pl = fun cont -> assert false + let select_default = fun cont -> assert false + + let doco plist = + fun cont -> + let f_ch_id = new_id () in + List.iter + (fun p -> + send_task + (fun () -> p + (Some (fun () -> send (Put(f_ch_id, "")))) + ) + false + ) plist; + let rec push_x = function + | 0 -> push_cont cont () false + | n -> send (Get(f_ch_id, task_desc (fun s -> push_x (n-1)))) + in push_x (List.length plist) + + let return v = + fun cont -> + match cont with + | None -> () + | Some cont -> cont v + let bind a f = + fun cont -> + a (Some (fun va -> + let b = (f va) in + b cont)) + let bind_io a f = + fun cont -> + a (Some (fun va -> + send_task + (fun () -> + let b = f va in + send_task (fun () -> b cont) false) + true)) + + let origin = ref false + let dbg_out = ref false + let dbg x = if !dbg_out then Format.eprintf "(cli) %s@." x + + let parse_args () = + let usage = "Usage: ./program [options]" in + let options = [ + "-org", Arg.Set origin, "Launch root process"; + "-dbg", Arg.Set dbg_out, "Show debug output"; + ] in + Arg.parse options (fun _ -> assert false) usage + + let run proc = + Random.self_init(); + parse_args(); + (* Initialize protocol *) + send Hello; + if read () <> Hello then raise (ProtocolError "Server did not say Hello correctly."); + (* Start task if necessary *) + if !origin then proc (Some (fun r -> send (FinalResult (Marshal.to_string r [])))); + (* While there are things to do... *) + let result = ref None in + while !result = None do + dbg "Requesting task..."; + send RequestTask; + dbg "Reading..."; + match read() with + | GiveTask(td, _) -> + dbg "Got task!"; + let t : task = Marshal.from_string td 0 in + t(); + | GiveMsgTask(msg, td) -> + dbg "Got msg task!"; + let t : msg_task = Marshal.from_string td 0 in + t msg; + | FinalResult(x) -> + dbg "Got result!"; + result := Some (Marshal.from_string x 0) + | _ -> raise (ProtocolError "Invalid message in main loop.") + done; + (* Return result *) + match !result with + | None -> assert false + | Some r -> r + +end diff --git a/src/manager.ml b/src/manager.ml new file mode 100644 index 0000000..39b1ee9 --- /dev/null +++ b/src/manager.ml @@ -0,0 +1,240 @@ +open Unix +open Proto +open Util + +let dbg_out = ref false +let dbg x = if !dbg_out then Format.eprintf "(srv) %s@." x +let dbg1 a x = if !dbg_out then Format.eprintf "(srv) %s %s@." (id_str a) x +let dbg2 a b x = if !dbg_out then Format.eprintf "(srv) %s %s %s@." (id_str a) (id_str b) x + + +(* Server data structures *) + +type task_el = + | Task of task_descr * bool + | MsgTask of string * msg_task_descr + +type client_status = + | Waiting + | Busy + +type client = { + id: id; + input: file_descr; + send: message -> unit; + disconnect: unit -> unit; + mutable status: client_status; +} + +type server = { + tasks: task_el Queue.t; + tsk_chan: (id, msg_task_descr) Hashtbl.t; + msg_chan: (id, string Queue.t) Hashtbl.t; + mutable final_result: string option; + clients: (id, client) Hashtbl.t; +} + +let new_server () = + { tasks = Queue.create (); + tsk_chan = Hashtbl.create 12; + msg_chan = Hashtbl.create 12; + final_result = None; + clients = Hashtbl.create 4; + } + +let push_task server task = + let cli = ref None in + Hashtbl.iter + (fun _ c -> if c.status = Waiting then cli := Some c) + server.clients; + match !cli with + | None -> Queue.push task server.tasks + | Some c -> + c.status <- Busy; + c.send + (match task with + | MsgTask(a, b) -> GiveMsgTask(a, b) + | Task(a, b) -> GiveTask(a, b)) + +let get_task server = + Queue.pop server.tasks + +let handle_put server chan msg = + if Hashtbl.mem server.tsk_chan chan then + let task = Hashtbl.find server.tsk_chan chan in + Hashtbl.remove server.tsk_chan chan; + push_task server (MsgTask(msg, task)) + else + let chq = + if Hashtbl.mem server.msg_chan chan then + Hashtbl.find server.msg_chan chan + else + let q = Queue.create () in + Hashtbl.add server.msg_chan chan q; + q + in + Queue.push msg chq + +let handle_get server chan task = + if Hashtbl.mem server.msg_chan chan && + (let q = Hashtbl.find server.msg_chan chan in not (Queue.is_empty q)) + then + let msg = Queue.pop (Hashtbl.find server.msg_chan chan) in + push_task server (MsgTask(msg, task)) + else + if Hashtbl.mem server.tsk_chan chan then + raise (ProtocolError "Several listeners on same channel.") + else + Hashtbl.add server.tsk_chan chan task + +let server_add_client server cli = + (* Say hello *) + let msg = read_one_msg cli.input in + if msg <> Hello then raise (ProtocolError "Client must say Hello first thing."); + cli.send Hello; + (* Put client on queue *) + Hashtbl.add server.clients cli.id cli + +let client_of_fd server fd = + let cli = ref None in + Hashtbl.iter (fun _ c -> if c.input = fd then cli := Some c) server.clients; + match !cli with + | None -> assert false + | Some c -> c + + +let rec server_run server = + let fds = Hashtbl.fold + (fun _ c l -> + if c.status = Busy + then c.input::l + else l) + server.clients [] in + if not (fds = []) then begin + dbg "selecting..."; + let qi, _, qe = select fds [] fds (-1.0) in + begin match qi, qe with + | x::_, _ -> + let cli = client_of_fd server x in + dbg1 cli.id "reading..."; + begin match read_one_msg cli.input with + | RequestTask -> + dbg "got task request"; + begin match server.final_result with + | None -> + if Queue.is_empty server.tasks then + cli.status <- Waiting + else cli.send (match Queue.pop server.tasks with + | MsgTask(a, b) -> GiveMsgTask(a, b) + | Task(a, b) -> GiveTask(a,b)) + | Some r -> + cli.send(FinalResult r); + cli.disconnect(); + Hashtbl.remove server.clients cli.id + end; + | Get(chan, td) -> + dbg2 cli.id chan "got GET"; + handle_get server chan td + | Put(chan, msg) -> + dbg2 cli.id chan "got PUT"; + handle_put server chan msg + | FinalResult x -> + dbg "got FinalResult"; + cli.status <- Waiting; + server.final_result <- Some x; + + let p = ref [] in + Hashtbl.iter + (fun _ c -> if c.status = Waiting then p := c::(!p)) + server.clients; + List.iter + (fun c -> + c.send(FinalResult x); + c.disconnect(); + Hashtbl.remove server.clients c.id) + !p + | GiveTask(a, b) -> + dbg "got Task"; + push_task server (Task(a, b)) + | GiveMsgTask(a, b) -> + dbg "got MsgTask"; + push_task server (MsgTask(a, b)) + | Hello -> raise (ProtocolError "Unexpected Hello.") + end + | [], x::_ -> + let cli = client_of_fd server x in + cli.disconnect(); + Hashtbl.remove server.clients cli.id + | _ -> assert false + end; + server_run server + end else begin + if server.final_result = None then begin + Format.eprintf "Queue empty: %s@." (if Queue.is_empty server.tasks then "yes" else "no"); + Format.eprintf "Client count: %d@." (Hashtbl.length server.clients); + raise (ProtocolError "Everybody waiting but nothing to do.") + end + end + +(* Main function *) + +let program = ref "" +let local_proc = ref 1 + +let parse_args () = + let usage = "Usage: ./manager [options] program" in + let options = [ + "-dbg", Arg.Set dbg_out, "Show debug output"; + "-local-proc", Arg.Set_int local_proc, "Set number of local processes. Default: 1"; + ] in + Arg.parse options (fun n -> program := n) usage + +let () = + Random.self_init(); + parse_args(); + if !local_proc < 1 then begin + Format.eprintf "Error: at least one local process must be launched !@."; + exit 0; + end; + if !program = "" then begin + Format.eprintf "Error: no program specified!@."; + exit 0 + end; + + let server = new_server () in + let pids = ref [] in + + for i = 0 to !local_proc - 1 do + (* Create file descriptors *) + let m2p_p, m2p_m = pipe () in + let p2m_m, p2m_p = pipe () in + match fork() with + | 0 -> + close m2p_m; + close p2m_m; + dup2 m2p_p stdin; + dup2 p2m_p stdout; + let args = Array.of_list + ([!program] @ + (if i = 0 then ["-org"] else []) @ + (if !dbg_out then ["-dbg"] else [])) in + execv !program args + | pid -> + close m2p_p; + close p2m_p; + let outc = Unix.out_channel_of_descr m2p_m in + + server_add_client server + { id = new_id(); + input = p2m_m; + send = (fun msg -> Marshal.to_channel outc msg []; flush outc); + disconnect = (fun () -> close p2m_m; close m2p_m); + status = Busy; + }; + + pids := pid :: (!pids) + done; + + server_run server; + List.iter (fun pid -> ignore (waitpid [] pid)) !pids + diff --git a/src/primes.ml b/src/primes.ml index b9a57ed..21d979a 100644 --- a/src/primes.ml +++ b/src/primes.ml @@ -26,20 +26,25 @@ module Primes (K : Kahn.S) = struct (get qi) >>= (fun v -> if v <> -1 then begin - Format.printf "%d@." v; + Format.eprintf "%d@." v; (delay new_channel ()) >>= (fun (qi2, qo2) -> doco [ filter v qi qo2 ; primes qi2 ]) end else return ()) - let main : unit process = + let main : int process = (delay new_channel ()) >>= - (fun (q_in, q_out) -> doco [ integers 2000 q_out ; primes q_in ]) + (fun (q_in, q_out) -> doco [ integers 500 q_out ; primes q_in ]) + >>= (fun () -> return 42) end -module Eng = Kahn_pipe.Pipe +module Eng = Kahn_stdio.ProtoKahn module P = Primes(Eng) -let () = P.K.run P.main +let () = + let r = P.K.run P.main in + assert (r = 42); + Format.eprintf "Primes finished (%d \\o/).@." r + diff --git a/src/proto.ml b/src/proto.ml new file mode 100644 index 0000000..f0517d4 --- /dev/null +++ b/src/proto.ml @@ -0,0 +1,21 @@ +open Util + +(* Protocol definitions *) + +type task = unit -> unit +type msg_task = string -> unit +type task_descr = string +type msg_task_descr = string + +exception ProtocolError of string + +type message = + | Hello + | Get of id * msg_task_descr + | Put of id * string + | RequestTask + | GiveTask of task_descr * bool + | GiveMsgTask of string * msg_task_descr + | FinalResult of string + + diff --git a/src/util.ml b/src/util.ml new file mode 100644 index 0000000..5502398 --- /dev/null +++ b/src/util.ml @@ -0,0 +1,20 @@ +open Unix + +type id = int * int +let new_id () : id = (Random.int 100000, Random.int 100000) +let id_str (a, b) = Format.sprintf "%d.%d" a b + +(* make_addr : string -> int -> sockaddr *) +let make_addr host port = + let host = gethostbyname host in + ADDR_INET(host.h_addr_list.(Random.int (Array.length host.h_addr_list)), port) + + +(* Unmarshal ONE message from a file descriptor, and DO NOT buffer more data *) +let read_one_msg fd = + let hdr = String.create Marshal.header_size in + assert (read fd hdr 0 Marshal.header_size = Marshal.header_size); + let dlen = Marshal.data_size hdr 0 in + let data = String.create dlen in + assert (read fd data 0 dlen = dlen); + Marshal.from_string (hdr ^ data) 0 -- cgit v1.2.3