From acfa0090d68a21be6c83815f484142b4eb814f4a Mon Sep 17 00:00:00 2001 From: Alex AUVOLAT Date: Tue, 13 May 2014 22:44:50 +0200 Subject: Change interface, update some stuff, new example... --- src/example.ml | 85 +++++++++++++++++++++++----- src/kahn.ml | 167 ++----------------------------------------------------- src/kahn_pipe.ml | 75 +++++++++++++++++++++++++ src/kahn_seq.ml | 83 +++++++++++++++++++++++++++ src/kahn_sock.ml | 115 ++++++++++++++++++++++++++++++++++++++ src/kahn_th.ml | 45 +++++++++++++++ src/kahnsock.ml | 115 -------------------------------------- src/primes.ml | 20 ++++--- 8 files changed, 406 insertions(+), 299 deletions(-) create mode 100644 src/kahn_pipe.ml create mode 100644 src/kahn_seq.ml create mode 100644 src/kahn_sock.ml create mode 100644 src/kahn_th.ml delete mode 100644 src/kahnsock.ml (limited to 'src') diff --git a/src/example.ml b/src/example.ml index a22f1b9..86c4f5b 100644 --- a/src/example.ml +++ b/src/example.ml @@ -3,30 +3,89 @@ module Example (K : Kahn.S) = struct module Lib = Kahn.Lib(K) open Lib - let integers nmax (qo : int K.out_port) : unit K.process = + (* First test : distribute calculation of 50 first Fibonacci numbers *) + + let integers first step nmax (qo : int K.out_port) : unit K.process = let rec loop n = if n > nmax then K.put (-1) qo - else - (K.put n qo) >>= (fun () -> loop (n + 1)) + else begin + (K.put n qo) >>= (fun () -> loop (n + step)) + end in - loop 2 + loop first + + let rec fib n = + if n < 2 then n + else fib (n-1) + fib (n-2) - let output (qi : int K.in_port) : unit K.process = + let rec slow_fib (qi : int K.in_port) (qo : (int * int) K.out_port) : unit K.process = + (K.get qi) >>= + (fun i -> + if i <> -1 then + (K.put (i, fib i) qo) >>= (fun () -> slow_fib qi qo) + else (K.put (i, i) qo) >>= (fun () -> K.return ())) + + let output (qi : (int * int) K.in_port) : unit K.process = let rec loop () = - (K.get qi) >>= (fun v -> - if v <> -1 then - begin Format.printf "%d@." v; loop () end - else K.return ()) + K.bind_io + (K.get qi) + (fun (v, s) -> + if v <> -1 then + begin Format.printf "f(%d) = %d@." v s; loop () end + else K.return ()) in loop () let main : unit K.process = - (delay K.new_channel ()) >>= - (fun (q_in, q_out) -> K.doco [ integers 10000 q_out ; output q_in ]) + let max = 4 in + let rec aux n = + (delay K.new_channel ()) >>= (fun (q_in, q_out) -> + (delay K.new_channel ()) >>= (fun (q_in2, q_out2) -> + K.doco [ integers n max 50 q_out ; slow_fib q_in q_out2 ; output q_in2 ])) + in + let rec aux2 n = + if n = max then [] + else aux n :: aux2 (n+1) + in + (K.return ()) >>= (fun () -> K.doco (aux2 0)) + + (* Second test : distribute the calculation of fib 53 *) + + let rec fib_rec n r (qo : int K.out_port) = + (K.return ()) >>= (fun () -> + if r = 0 then + K.put (fib n) qo + else + (delay K.new_channel ()) >>= (fun (q_in, q_out) -> + (delay K.new_channel ()) >>= (fun (q_in2, q_out2) -> + K.doco + [ + fib_rec (n-2) (r-1) q_out ; + fib_rec (n-1) (r-1) q_out2 ; + K.get q_in >>= (fun x -> + K.bind_io + (K.get q_in2) + (fun y -> + Format.printf "f(%d) = %d@." n (x+y); + K.put (x+y) qo)) + ] + ))) + + + let main2 : unit K.process = + (delay K.new_channel()) >>= + (fun (qi, qo) -> + K.doco + [ + fib_rec 53 7 qo; + K.bind_io + (K.get qi) + (fun v -> Format.printf "Got it! Result is %d@." v; K.return ()) + ]) end -module E = Example(Kahn.Seq) +module E = Example(Kahn_pipe.Pipe) -let () = E.K.run E.main +let () = E.K.run E.main2 diff --git a/src/kahn.ml b/src/kahn.ml index a02ee24..08eac19 100644 --- a/src/kahn.ml +++ b/src/kahn.ml @@ -3,17 +3,18 @@ module type S = sig type 'a in_port type 'a out_port - val io_read: unit -> string - val io_write: string -> unit - val new_channel: unit -> 'a in_port * 'a out_port val put: 'a -> 'a out_port -> unit process val get: 'a in_port -> 'a process + val select: ('a in_port * ('a -> 'b)) list -> 'b process + val select_default: ('a in_port * ('a -> 'b)) list -> (unit -> 'b) -> 'b process + val doco: unit process list -> unit process val return: 'a -> 'a process val bind: 'a process -> ('a -> 'b process) -> 'b process + val bind_io: 'a process -> ('a -> 'b process) -> 'b process val run: 'a process -> 'a end @@ -49,163 +50,3 @@ module Lib (K : S) = struct end -module Th: S = struct - type 'a process = (unit -> 'a) - - type 'a channel = { q: 'a Queue.t ; m: Mutex.t; } - type 'a in_port = 'a channel - type 'a out_port = 'a channel - - let new_channel () = - let q = { q = Queue.create (); m = Mutex.create (); } in - q, q - - let io_read () = "" - let io_write s = print_string s; flush stdout - - let put v c () = - Mutex.lock c.m; - Queue.push v c.q; - Mutex.unlock c.m; - Thread.yield () - - let rec get c () = - try - Mutex.lock c.m; - let v = Queue.pop c.q in - Mutex.unlock c.m; - v - with Queue.Empty -> - Mutex.unlock c.m; - Thread.yield (); - get c () - - let doco l () = - let ths = List.map (fun f -> Thread.create f ()) l in - List.iter (fun th -> Thread.join th) ths - - let return v = (fun () -> v) - - let bind e e' () = - let v = e () in - Thread.yield (); - e' v () - - let run e = e () -end - -module Seq: S = struct - type 'a process = (('a -> unit) option) -> unit - - type 'a channel = 'a Queue.t - type 'a in_port = 'a channel - type 'a out_port = 'a channel - - type task = unit -> unit - - let tasks = Queue.create () - - let io_read () = "" - let io_write s = print_string s; flush stdout - - let new_channel () = - let q = Queue.create () in - q, q - - let put x c = - fun cont -> - Queue.push x c; - match cont with - | None -> () - | Some cont -> Queue.push cont tasks - - let rec get c = - fun cont -> - try - let v = Queue.pop c in - match cont with - | None -> () - | Some cont -> Queue.push (fun () -> cont v) tasks - with Queue.Empty -> - Queue.push (fun () -> get c cont) tasks - - let doco l = - fun cont -> - List.iter (fun proc -> Queue.push (fun () -> proc None) tasks) l; - match cont with - | None -> () - | Some cont -> Queue.push cont tasks - - let return v = - fun cont -> - match cont with - | None -> () - | Some cont -> Queue.push (fun () -> cont v) tasks - - let bind e f = - fun cont -> - Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks - - let run e = - let ret = ref None in - e (Some (fun v -> ret := Some v)); - while not (Queue.is_empty tasks) do - let task = Queue.pop tasks in - task () - done; - match !ret with - | Some k -> k - | None -> assert false - -end - - -module Pipe: S = struct - type 'a process = unit -> 'a - - type 'a in_port = in_channel - type 'a out_port = out_channel - - let children = ref [] - - let io_read () = "" - let io_write s = print_string s; flush stdout - - let new_channel = - fun () -> - let i, o = Unix.pipe () in - Unix.in_channel_of_descr i, Unix.out_channel_of_descr o - - let get c = - fun () -> Marshal.from_channel c - - let put x c = - fun () -> Marshal.to_channel c x [] - - - let return v = - fun () -> v - - let bind e f = - fun () -> f (e ()) () - - let run p = - let v = p() in - List.iter - (fun x -> try ignore(Unix.waitpid [] x) with _ -> ()) - !children; - v - - let doco l = - fun () -> - List.iter (fun p -> - let i = Unix.fork () in - if i = 0 then begin - children := []; - run p; - exit 0 - end else begin - children := i::!children - end) - l -end diff --git a/src/kahn_pipe.ml b/src/kahn_pipe.ml new file mode 100644 index 0000000..f0bec97 --- /dev/null +++ b/src/kahn_pipe.ml @@ -0,0 +1,75 @@ +open Kahn + +module Pipe: S = struct + type 'a process = unit -> 'a + + type 'a in_port = in_channel + type 'a out_port = out_channel + + let new_channel = + fun () -> + let i, o = Unix.pipe () in + Unix.in_channel_of_descr i, Unix.out_channel_of_descr o + + let get c = + fun () -> Marshal.from_channel c + + let put x c = + fun () -> + Marshal.to_channel c x []; + flush c + + let try_get block prt_list = + let fds = List.map fst prt_list in + let fds = List.map Unix.descr_of_in_channel fds in + let ok_fds, _, _ = Unix.select fds [] [] + (if block then -1.0 else 0.0) + in + match ok_fds with + | [] -> None + | fd::x -> + let chan, f = + List.find + (fun (s, _) -> Unix.descr_of_in_channel s = fd) + prt_list + in + Some(f (Marshal.from_channel chan)) + + let select prt_list = + fun () -> + match try_get true prt_list with + | Some x -> x + | None -> assert false + + let select_default prt_list def = + fun () -> + match try_get false prt_list with + | Some x -> x + | None -> def () + + let return v = + fun () -> v + + let bind e f = + fun () -> f (e ()) () + let bind_io = bind + + let run p = + p() + + let doco l = + fun () -> + let children = + List.map + (fun p -> + match Unix.fork () with + | 0 -> + run p; + exit 0 + | i -> i) + l + in + List.iter + (fun x -> try ignore(Unix.waitpid [] x) with _ -> ()) + children +end diff --git a/src/kahn_seq.ml b/src/kahn_seq.ml new file mode 100644 index 0000000..7f0eec5 --- /dev/null +++ b/src/kahn_seq.ml @@ -0,0 +1,83 @@ +open Kahn + +module Seq: S = struct + type 'a process = (('a -> unit) option) -> unit + + type 'a channel = 'a Queue.t + type 'a in_port = 'a channel + type 'a out_port = 'a channel + + type task = unit -> unit + + let tasks = Queue.create () + + let push_cont (cont : ('a -> unit) option) (v : 'a) = + match cont with + | None -> () + | Some cont -> Queue.push (fun () -> cont v) tasks + + let new_channel () = + let q = Queue.create () in + q, q + + let put x c = + fun cont -> + Queue.push x c; + match cont with + | None -> () + | Some cont -> Queue.push cont tasks + + let rec get c = + fun cont -> + try + let v = Queue.pop c in push_cont cont v + with Queue.Empty -> + Queue.push (fun () -> get c cont) tasks + + let rec try_get = function + | [] -> None + | (prt, f)::q -> + try + let v = Queue.pop prt in Some (f v) + with Queue.Empty -> try_get q + + let rec select prt_list = + fun cont -> + match try_get prt_list with + | Some x -> push_cont cont x + | None -> Queue.push (fun () -> select prt_list cont) tasks + + let select_default prt_list def = + fun cont -> + match try_get prt_list with + | Some x -> push_cont cont x + | None -> push_cont cont (def()) + + let doco l = + fun cont -> + List.iter (fun proc -> Queue.push (fun () -> proc None) tasks) l; + push_cont cont () + + let return v = + fun cont -> + push_cont cont v + + let bind e f = + fun cont -> + Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks + let bind_io e f = + fun cont -> + Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks + + let run e = + let ret = ref None in + e (Some (fun v -> ret := Some v)); + while not (Queue.is_empty tasks) do + let task = Queue.pop tasks in + task () + done; + match !ret with + | Some k -> k + | None -> assert false + +end diff --git a/src/kahn_sock.ml b/src/kahn_sock.ml new file mode 100644 index 0000000..89ee65c --- /dev/null +++ b/src/kahn_sock.ml @@ -0,0 +1,115 @@ +Random.self_init () + +type ident = (int * int * int * int) +let gen_ident () = + Random.int 1000000000, Random.int 1000000000, + Random.int 1000000000, Random.int 1000000000 + +module Sock : Kahn.S = struct + + (* L'idée : + - L'ensemble des noeuds qui font du calcul est un arbre. + Le premier noeud lancé est la racine de l'arbre ; tous les + noeuds qui se connectent par la suite se connectent à un + noeud déjà présent et sont donc son fils. + - Les processus sont des fermetures de type unit -> unit, + transmises par des canaux + - Un noeud de calcul est un processus ocaml avec un seul + thread. Le parallélisme est coopératif (penser à faire + des binds assez souvent). + - Les noeuds publient régulièrement leur load, ie le nombre + de processus en attente et qui ne sont pas en train + d'attendre des données depuis un canal. Si un noeud a un + voisin dont le load est plus faible que le sien d'une + quantité plus grande que 2, il délègue une tâche. + - Le noeud racine délègue toutes ses tâches et sert uniquement + pour les entrées-sorties + - Comportement indéterminé lorsqu'un noeud se déconnecte + (des processus peuvent disparaître, le réseau est cassé...) + - Les canaux sont identifiés par le type ident décrit + ci-dessus. Lorsque quelqu'un écrit sur un canal, tout le + monde le sait. Lorsque quelqu'un lit sur un canal, tout le + monde le sait. (on n'est pas capable de déterminer + quel est le noeud propriétaire du processus devant lire + le message) Les communications sont donc coûteuses. + - On garantit que si chaque canal est lu par un processus + et écrit par un autre, alors l'ordre des messages est + conservé. On ne garantit pas l'ordre s'il y a plusieurs + écrivains, et on est à peu près sûrs que le réseau va + planter s'il y a plusieurs lecteurs. + *) + + type 'a process = (('a -> unit) option) -> unit + + type 'a in_port = ident + type 'a out_port = ident + + type task = unit -> unit + + let tasks = Queue.create () + let read_wait_tasks = Hashtbl.create 42 + + let channels = Hashtbl.create 42 + + type host_id = string + type message = host_id * message_data + (* message contains sender ID *) + and message_data = + | Hello + | LoadAdvert of host_id * int + (* Host X has N tasks waiting *) + | Delegate of task + (* I need you to do this for me *) + | SendChan of ident * string + (* Put message in buffer *) + | RecvChan of ident + (* Read message from buffer (everybody clear it from + memory !) *) + | IOWrite of string + | Bye + + let peers = Hashtbl.create 12 (* host_id -> in_chan * out_chan *) + let parent = ref "" (* id of parent *) + let myself = ref "" + + let tell peer msg = + let _, o = Hashtbl.find peers peer in + Marshall.to_channel o msg + + let tell_all msg = + Hashtbl.iter peers + (fun _ (_, o) -> Marshall.to_channel o msg) + + let tell_all_except peer msg = + Hashtbl.iter peers + (fun k (_, o) -> if k <> peer then + Marshall.to_channel o msg) + + let io_read () = "" + let io_write msg = + tell !parent (!myself, IOWrite msg) + + let new_channel () = + let x = gen_ident () in x, x + + let put port x = + fun cont -> + tell_all (!myself, SendChan(port, Marshal.to_string x)); + match cont with + | Some cont -> Queue.push cont tasks + | None -> () + + let rec get port = + fun cont -> + try + let p = Hashtbl.find channels port in + let v = Queue.pop p in + tell_all (!myself, RecvChan port) + match cont with + | None -> () + | Some -> Queue.push (fun () -> cont v) tasks + with _ -> (* no message in queue *) + Hashtbl.add read_wait_tasks + port (fun () -> get port cont) + +end diff --git a/src/kahn_th.ml b/src/kahn_th.ml new file mode 100644 index 0000000..b6fccad --- /dev/null +++ b/src/kahn_th.ml @@ -0,0 +1,45 @@ + +module Th: S = struct + type 'a process = (unit -> 'a) + + type 'a channel = { q: 'a Queue.t ; m: Mutex.t; } + type 'a in_port = 'a channel + type 'a out_port = 'a channel + + let new_channel () = + let q = { q = Queue.create (); m = Mutex.create (); } in + q, q + + let io_read () = "" + let io_write s = print_string s; flush stdout + + let put v c () = + Mutex.lock c.m; + Queue.push v c.q; + Mutex.unlock c.m; + Thread.yield () + + let rec get c () = + try + Mutex.lock c.m; + let v = Queue.pop c.q in + Mutex.unlock c.m; + v + with Queue.Empty -> + Mutex.unlock c.m; + Thread.yield (); + get c () + + let doco l () = + let ths = List.map (fun f -> Thread.create f ()) l in + List.iter (fun th -> Thread.join th) ths + + let return v = (fun () -> v) + + let bind e e' () = + let v = e () in + Thread.yield (); + e' v () + + let run e = e () +end diff --git a/src/kahnsock.ml b/src/kahnsock.ml deleted file mode 100644 index 89ee65c..0000000 --- a/src/kahnsock.ml +++ /dev/null @@ -1,115 +0,0 @@ -Random.self_init () - -type ident = (int * int * int * int) -let gen_ident () = - Random.int 1000000000, Random.int 1000000000, - Random.int 1000000000, Random.int 1000000000 - -module Sock : Kahn.S = struct - - (* L'idée : - - L'ensemble des noeuds qui font du calcul est un arbre. - Le premier noeud lancé est la racine de l'arbre ; tous les - noeuds qui se connectent par la suite se connectent à un - noeud déjà présent et sont donc son fils. - - Les processus sont des fermetures de type unit -> unit, - transmises par des canaux - - Un noeud de calcul est un processus ocaml avec un seul - thread. Le parallélisme est coopératif (penser à faire - des binds assez souvent). - - Les noeuds publient régulièrement leur load, ie le nombre - de processus en attente et qui ne sont pas en train - d'attendre des données depuis un canal. Si un noeud a un - voisin dont le load est plus faible que le sien d'une - quantité plus grande que 2, il délègue une tâche. - - Le noeud racine délègue toutes ses tâches et sert uniquement - pour les entrées-sorties - - Comportement indéterminé lorsqu'un noeud se déconnecte - (des processus peuvent disparaître, le réseau est cassé...) - - Les canaux sont identifiés par le type ident décrit - ci-dessus. Lorsque quelqu'un écrit sur un canal, tout le - monde le sait. Lorsque quelqu'un lit sur un canal, tout le - monde le sait. (on n'est pas capable de déterminer - quel est le noeud propriétaire du processus devant lire - le message) Les communications sont donc coûteuses. - - On garantit que si chaque canal est lu par un processus - et écrit par un autre, alors l'ordre des messages est - conservé. On ne garantit pas l'ordre s'il y a plusieurs - écrivains, et on est à peu près sûrs que le réseau va - planter s'il y a plusieurs lecteurs. - *) - - type 'a process = (('a -> unit) option) -> unit - - type 'a in_port = ident - type 'a out_port = ident - - type task = unit -> unit - - let tasks = Queue.create () - let read_wait_tasks = Hashtbl.create 42 - - let channels = Hashtbl.create 42 - - type host_id = string - type message = host_id * message_data - (* message contains sender ID *) - and message_data = - | Hello - | LoadAdvert of host_id * int - (* Host X has N tasks waiting *) - | Delegate of task - (* I need you to do this for me *) - | SendChan of ident * string - (* Put message in buffer *) - | RecvChan of ident - (* Read message from buffer (everybody clear it from - memory !) *) - | IOWrite of string - | Bye - - let peers = Hashtbl.create 12 (* host_id -> in_chan * out_chan *) - let parent = ref "" (* id of parent *) - let myself = ref "" - - let tell peer msg = - let _, o = Hashtbl.find peers peer in - Marshall.to_channel o msg - - let tell_all msg = - Hashtbl.iter peers - (fun _ (_, o) -> Marshall.to_channel o msg) - - let tell_all_except peer msg = - Hashtbl.iter peers - (fun k (_, o) -> if k <> peer then - Marshall.to_channel o msg) - - let io_read () = "" - let io_write msg = - tell !parent (!myself, IOWrite msg) - - let new_channel () = - let x = gen_ident () in x, x - - let put port x = - fun cont -> - tell_all (!myself, SendChan(port, Marshal.to_string x)); - match cont with - | Some cont -> Queue.push cont tasks - | None -> () - - let rec get port = - fun cont -> - try - let p = Hashtbl.find channels port in - let v = Queue.pop p in - tell_all (!myself, RecvChan port) - match cont with - | None -> () - | Some -> Queue.push (fun () -> cont v) tasks - with _ -> (* no message in queue *) - Hashtbl.add read_wait_tasks - port (fun () -> get port cont) - -end diff --git a/src/primes.ml b/src/primes.ml index 924f7d3..11f5387 100644 --- a/src/primes.ml +++ b/src/primes.ml @@ -23,20 +23,24 @@ module Primes (K : Kahn.S) = struct in loop() let rec primes (qi : int in_port) : unit process = - (get qi) >>= (fun v -> - if v <> -1 then begin - io_write ((string_of_int v)^"\n"); - (delay new_channel ()) >>= - (fun (qi2, qo2) -> doco [ filter v qi qo2 ; primes qi2 ]) - end else return ()) + bind_io + (get qi) + (fun v -> + if v <> -1 then + begin + Format.printf "%d@." v; + (delay new_channel ()) >>= + (fun (qi2, qo2) -> doco [ filter v qi qo2 ; primes qi2 ]) + end + else return ()) let main : unit process = (delay new_channel ()) >>= - (fun (q_in, q_out) -> doco [ integers 2000 q_out ; primes q_in ]) + (fun (q_in, q_out) -> doco [ integers 1000 q_out ; primes q_in ]) end -module Eng = Kahn.Seq +module Eng = Kahn_pipe.Pipe module P = Primes(Eng) let () = P.K.run P.main -- cgit v1.2.3