diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/example.ml | 2 | ||||
-rw-r--r-- | src/kahn_pipe.ml | 124 | ||||
-rw-r--r-- | src/kahn_seq.ml | 15 | ||||
-rw-r--r-- | src/kahn_sock.ml | 215 | ||||
-rw-r--r-- | src/kahn_sock_0.ml | 115 | ||||
-rw-r--r-- | src/primes.ml | 6 |
6 files changed, 296 insertions, 181 deletions
diff --git a/src/example.ml b/src/example.ml index 86c4f5b..4971448 100644 --- a/src/example.ml +++ b/src/example.ml @@ -86,6 +86,6 @@ module Example (K : Kahn.S) = struct end -module E = Example(Kahn_pipe.Pipe) +module E = Example(Kahn_seq.Seq) let () = E.K.run E.main2 diff --git a/src/kahn_pipe.ml b/src/kahn_pipe.ml index f0bec97..9f3da0a 100644 --- a/src/kahn_pipe.ml +++ b/src/kahn_pipe.ml @@ -1,75 +1,75 @@ open Kahn module Pipe: S = struct - type 'a process = unit -> 'a + type 'a process = unit -> 'a - type 'a in_port = in_channel - type 'a out_port = out_channel + type 'a in_port = in_channel + type 'a out_port = out_channel - let new_channel = - fun () -> - let i, o = Unix.pipe () in - Unix.in_channel_of_descr i, Unix.out_channel_of_descr o + let new_channel = + fun () -> + let i, o = Unix.pipe () in + Unix.in_channel_of_descr i, Unix.out_channel_of_descr o - let get c = - fun () -> Marshal.from_channel c + let get (c : 'a in_port) : 'a = + fun () -> Marshal.from_channel c - let put x c = - fun () -> - Marshal.to_channel c x []; - flush c - - let try_get block prt_list = - let fds = List.map fst prt_list in - let fds = List.map Unix.descr_of_in_channel fds in - let ok_fds, _, _ = Unix.select fds [] [] - (if block then -1.0 else 0.0) - in - match ok_fds with - | [] -> None - | fd::x -> - let chan, f = - List.find - (fun (s, _) -> Unix.descr_of_in_channel s = fd) - prt_list - in - Some(f (Marshal.from_channel chan)) - - let select prt_list = - fun () -> - match try_get true prt_list with - | Some x -> x - | None -> assert false + let put x c = + fun () -> + Marshal.to_channel c x []; + flush c + + let try_get block prt_list = + let fds = List.map fst prt_list in + let fds = List.map Unix.descr_of_in_channel fds in + let ok_fds, _, _ = Unix.select fds [] [] + (if block then -1.0 else 0.0) + in + match ok_fds with + | [] -> None + | fd::x -> + let chan, f = + List.find + (fun (s, _) -> Unix.descr_of_in_channel s = fd) + prt_list + in + Some(f (Marshal.from_channel chan)) + + let select prt_list = + fun () -> + match try_get true prt_list with + | Some x -> x + | None -> assert false - let select_default prt_list def = - fun () -> - match try_get false prt_list with - | Some x -> x - | None -> def () + let select_default prt_list def = + fun () -> + match try_get false prt_list with + | Some x -> x + | None -> def () - let return v = - fun () -> v + let return v = + fun () -> v - let bind e f = - fun () -> f (e ()) () - let bind_io = bind + let bind e f = + fun () -> f (e ()) () + let bind_io = bind - let run p = - p() + let run p = + p() - let doco l = - fun () -> - let children = - List.map - (fun p -> - match Unix.fork () with - | 0 -> - run p; - exit 0 - | i -> i) - l - in - List.iter - (fun x -> try ignore(Unix.waitpid [] x) with _ -> ()) - children + let doco l = + fun () -> + let children = + List.map + (fun p -> + match Unix.fork () with + | 0 -> + run p; + exit 0 + | i -> i) + l + in + List.iter + (fun i -> try ignore(Unix.waitpid [] i) with _ -> ()) + children end diff --git a/src/kahn_seq.ml b/src/kahn_seq.ml index 7f0eec5..ce82117 100644 --- a/src/kahn_seq.ml +++ b/src/kahn_seq.ml @@ -14,7 +14,7 @@ module Seq: S = struct let push_cont (cont : ('a -> unit) option) (v : 'a) = match cont with | None -> () - | Some cont -> Queue.push (fun () -> cont v) tasks + | Some cont_f -> Queue.push (fun () -> cont_f v) tasks let new_channel () = let q = Queue.create () in @@ -23,9 +23,7 @@ module Seq: S = struct let put x c = fun cont -> Queue.push x c; - match cont with - | None -> () - | Some cont -> Queue.push cont tasks + push_cont cont () let rec get c = fun cont -> @@ -62,12 +60,11 @@ module Seq: S = struct fun cont -> push_cont cont v - let bind e f = - fun cont -> - Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks - let bind_io e f = + let bind (e : 'a process) (f : 'a -> 'b process) : 'b process = fun cont -> - Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks + e (Some (fun (r : 'a) -> f r cont)) + + let bind_io = bind let run e = let ret = ref None in diff --git a/src/kahn_sock.ml b/src/kahn_sock.ml index 89ee65c..bce6cf2 100644 --- a/src/kahn_sock.ml +++ b/src/kahn_sock.ml @@ -1,115 +1,120 @@ -Random.self_init () - -type ident = (int * int * int * int) -let gen_ident () = - Random.int 1000000000, Random.int 1000000000, - Random.int 1000000000, Random.int 1000000000 - -module Sock : Kahn.S = struct - - (* L'idée : - - L'ensemble des noeuds qui font du calcul est un arbre. - Le premier noeud lancé est la racine de l'arbre ; tous les - noeuds qui se connectent par la suite se connectent à un - noeud déjà présent et sont donc son fils. - - Les processus sont des fermetures de type unit -> unit, - transmises par des canaux - - Un noeud de calcul est un processus ocaml avec un seul - thread. Le parallélisme est coopératif (penser à faire - des binds assez souvent). - - Les noeuds publient régulièrement leur load, ie le nombre - de processus en attente et qui ne sont pas en train - d'attendre des données depuis un canal. Si un noeud a un - voisin dont le load est plus faible que le sien d'une - quantité plus grande que 2, il délègue une tâche. - - Le noeud racine délègue toutes ses tâches et sert uniquement - pour les entrées-sorties - - Comportement indéterminé lorsqu'un noeud se déconnecte - (des processus peuvent disparaître, le réseau est cassé...) - - Les canaux sont identifiés par le type ident décrit - ci-dessus. Lorsque quelqu'un écrit sur un canal, tout le - monde le sait. Lorsque quelqu'un lit sur un canal, tout le - monde le sait. (on n'est pas capable de déterminer - quel est le noeud propriétaire du processus devant lire - le message) Les communications sont donc coûteuses. - - On garantit que si chaque canal est lu par un processus - et écrit par un autre, alors l'ordre des messages est - conservé. On ne garantit pas l'ordre s'il y a plusieurs - écrivains, et on est à peu près sûrs que le réseau va - planter s'il y a plusieurs lecteurs. - *) +open Kahn +open Unix + +(* make_addr : string -> int -> sockaddr *) +let make_addr host port = + let host = gethostbyname host in + ADDR_INET(host.h_addr_list.(Random.int (Array.length host.h_addr_list)), port) + +module Sock: S = struct + + let kahn_port = 8197 type 'a process = (('a -> unit) option) -> unit - type 'a in_port = ident - type 'a out_port = ident - type task = unit -> unit + type 'a channel = int + type 'a in_port = 'a channel + type 'a out_port = 'a channel + type task = unit -> unit let tasks = Queue.create () - let read_wait_tasks = Hashtbl.create 42 - let channels = Hashtbl.create 42 - - type host_id = string - type message = host_id * message_data - (* message contains sender ID *) - and message_data = + let socket_to_srv = ref None + + type cli_msg = + | Hello + | Put of int * string + | Get of int * (string -> task) + | AskTask + | GiveTask of task + | GiveIOTask of task + | FinalResult of string + type srv_msg = | Hello - | LoadAdvert of host_id * int - (* Host X has N tasks waiting *) - | Delegate of task - (* I need you to do this for me *) - | SendChan of ident * string - (* Put message in buffer *) - | RecvChan of ident - (* Read message from buffer (everybody clear it from - memory !) *) - | IOWrite of string - | Bye - - let peers = Hashtbl.create 12 (* host_id -> in_chan * out_chan *) - let parent = ref "" (* id of parent *) - let myself = ref "" - - let tell peer msg = - let _, o = Hashtbl.find peers peer in - Marshall.to_channel o msg - - let tell_all msg = - Hashtbl.iter peers - (fun _ (_, o) -> Marshall.to_channel o msg) - - let tell_all_except peer msg = - Hashtbl.iter peers - (fun k (_, o) -> if k <> peer then - Marshall.to_channel o msg) - - let io_read () = "" - let io_write msg = - tell !parent (!myself, IOWrite msg) - - let new_channel () = - let x = gen_ident () in x, x - - let put port x = - fun cont -> - tell_all (!myself, SendChan(port, Marshal.to_string x)); - match cont with - | Some cont -> Queue.push cont tasks - | None -> () - - let rec get port = - fun cont -> - try - let p = Hashtbl.find channels port in - let v = Queue.pop p in - tell_all (!myself, RecvChan port) - match cont with - | None -> () - | Some -> Queue.push (fun () -> cont v) tasks - with _ -> (* no message in queue *) - Hashtbl.add read_wait_tasks - port (fun () -> get port cont) + | GiveTask of task + | PleaseWait + | FinalResult of string + + + let rec tell_server (msg : cli_msg) = + match !socket_to_srv with + | Some s -> Marshal.to_channel s msg [Marshal.Closures]; flush s + | None -> handle_msg_server (fun _ -> assert false) msg + + and handle_msg_server (reply_fun : srv_msg -> unit) = function + | Hello -> reply_fun Hello + | _ -> () (* TODO *) + + and client host = + (* Initialize socket *) + let sock = socket PF_INET SOCK_STREAM 0 in + connect sock (make_addr host kahn_port); + let i, o = in_channel_of_descr sock, out_channel_of_descr sock in + socket_to_srv := Some o; + let get_msg () = Marshal.from_channel i in + (* Initialize protocol *) + tell_server Hello; + assert (get_msg() = Hello); + (* Loop *) + let rec loop () = + tell_server AskTask; + match get_msg () with + | Hello -> assert false + | GiveTask task -> task (); loop () + | PleaseWait -> sleep 2; loop () + | FinalResult s -> Marshal.from_string s + in + let result = loop() in + shutdown sock SHUTDOWN_ALL; + result + + and server e = + (* Initialize task list *) + push_task (fun () -> e None); + + (* Initialize socket *) + let sock = socket PF_INET SOCK_STREAM 0 in + bind sock (make_addr "0.0.0.0" kahn_port); + listen sock 10; + let stop_srv _ = + Format.eprintf "Shutdown server...@."; + shutdown sock SHUTDOWN_ALL; + exit 0 + in + Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv); + Sys.set_signal Sys.sigterm (Sys.Signal_handle stop_srv); + + (* Loop *) + let clients = ref [] in + while true do + let fds = List.map (fun (i, o, a) -> descr_of_in_channel i) !clients in + match select (sock::fds) [] [] (-1.0) with + | s::_, _, _ when s = sock -> + (* New client ! *) + let fd, addr = accept sock in + clients := + (in_channel_of_descr fd, + out_channel_of_descr fd, + addr)::!clients + | s::_, _, _ -> + (* Client sent something *) + let i, o, a = List.find + (fun (i, _, _) -> descr_of_in_channel i = s) !clients in + let msg = Marshal.from_channel i in + handle_msg_server + (fun m -> Marshal.to_channel o m [Marshall.Closures]; flush o) + msg + | _ -> assert false + done + + let srv = ref "" + let set_var v s = v := s + let run e = + Arg.parse [] (set_var srv) "usage: kahn [server_addr]"; + if !srv = "" then + server e + else + client !sr end diff --git a/src/kahn_sock_0.ml b/src/kahn_sock_0.ml new file mode 100644 index 0000000..89ee65c --- /dev/null +++ b/src/kahn_sock_0.ml @@ -0,0 +1,115 @@ +Random.self_init () + +type ident = (int * int * int * int) +let gen_ident () = + Random.int 1000000000, Random.int 1000000000, + Random.int 1000000000, Random.int 1000000000 + +module Sock : Kahn.S = struct + + (* L'idée : + - L'ensemble des noeuds qui font du calcul est un arbre. + Le premier noeud lancé est la racine de l'arbre ; tous les + noeuds qui se connectent par la suite se connectent à un + noeud déjà présent et sont donc son fils. + - Les processus sont des fermetures de type unit -> unit, + transmises par des canaux + - Un noeud de calcul est un processus ocaml avec un seul + thread. Le parallélisme est coopératif (penser à faire + des binds assez souvent). + - Les noeuds publient régulièrement leur load, ie le nombre + de processus en attente et qui ne sont pas en train + d'attendre des données depuis un canal. Si un noeud a un + voisin dont le load est plus faible que le sien d'une + quantité plus grande que 2, il délègue une tâche. + - Le noeud racine délègue toutes ses tâches et sert uniquement + pour les entrées-sorties + - Comportement indéterminé lorsqu'un noeud se déconnecte + (des processus peuvent disparaître, le réseau est cassé...) + - Les canaux sont identifiés par le type ident décrit + ci-dessus. Lorsque quelqu'un écrit sur un canal, tout le + monde le sait. Lorsque quelqu'un lit sur un canal, tout le + monde le sait. (on n'est pas capable de déterminer + quel est le noeud propriétaire du processus devant lire + le message) Les communications sont donc coûteuses. + - On garantit que si chaque canal est lu par un processus + et écrit par un autre, alors l'ordre des messages est + conservé. On ne garantit pas l'ordre s'il y a plusieurs + écrivains, et on est à peu près sûrs que le réseau va + planter s'il y a plusieurs lecteurs. + *) + + type 'a process = (('a -> unit) option) -> unit + + type 'a in_port = ident + type 'a out_port = ident + + type task = unit -> unit + + let tasks = Queue.create () + let read_wait_tasks = Hashtbl.create 42 + + let channels = Hashtbl.create 42 + + type host_id = string + type message = host_id * message_data + (* message contains sender ID *) + and message_data = + | Hello + | LoadAdvert of host_id * int + (* Host X has N tasks waiting *) + | Delegate of task + (* I need you to do this for me *) + | SendChan of ident * string + (* Put message in buffer *) + | RecvChan of ident + (* Read message from buffer (everybody clear it from + memory !) *) + | IOWrite of string + | Bye + + let peers = Hashtbl.create 12 (* host_id -> in_chan * out_chan *) + let parent = ref "" (* id of parent *) + let myself = ref "" + + let tell peer msg = + let _, o = Hashtbl.find peers peer in + Marshall.to_channel o msg + + let tell_all msg = + Hashtbl.iter peers + (fun _ (_, o) -> Marshall.to_channel o msg) + + let tell_all_except peer msg = + Hashtbl.iter peers + (fun k (_, o) -> if k <> peer then + Marshall.to_channel o msg) + + let io_read () = "" + let io_write msg = + tell !parent (!myself, IOWrite msg) + + let new_channel () = + let x = gen_ident () in x, x + + let put port x = + fun cont -> + tell_all (!myself, SendChan(port, Marshal.to_string x)); + match cont with + | Some cont -> Queue.push cont tasks + | None -> () + + let rec get port = + fun cont -> + try + let p = Hashtbl.find channels port in + let v = Queue.pop p in + tell_all (!myself, RecvChan port) + match cont with + | None -> () + | Some -> Queue.push (fun () -> cont v) tasks + with _ -> (* no message in queue *) + Hashtbl.add read_wait_tasks + port (fun () -> get port cont) + +end diff --git a/src/primes.ml b/src/primes.ml index 11f5387..b9a57ed 100644 --- a/src/primes.ml +++ b/src/primes.ml @@ -23,9 +23,7 @@ module Primes (K : Kahn.S) = struct in loop() let rec primes (qi : int in_port) : unit process = - bind_io - (get qi) - (fun v -> + (get qi) >>= (fun v -> if v <> -1 then begin Format.printf "%d@." v; @@ -36,7 +34,7 @@ module Primes (K : Kahn.S) = struct let main : unit process = (delay new_channel ()) >>= - (fun (q_in, q_out) -> doco [ integers 1000 q_out ; primes q_in ]) + (fun (q_in, q_out) -> doco [ integers 2000 q_out ; primes q_in ]) end |