From 9b9bc4e787d6ecebcf15182a562fc47d27d9880d Mon Sep 17 00:00:00 2001 From: Alex AUVOLAT Date: Thu, 20 Mar 2014 16:39:16 +0100 Subject: Added implementation with pipes. --- src/kahn.ml | 47 +++++++++++++++++++++++++++++++++++++++++++++++ src/kahnsock.ml | 45 +++++++++++++++++++++++++++++++++++++++++++++ src/primes.ml | 5 +++-- 3 files changed, 95 insertions(+), 2 deletions(-) create mode 100644 src/kahnsock.ml (limited to 'src') diff --git a/src/kahn.ml b/src/kahn.ml index 91b251f..5229f7e 100644 --- a/src/kahn.ml +++ b/src/kahn.ml @@ -150,3 +150,50 @@ module Seq: S = struct end + +module Pipe: S = struct + type 'a process = unit -> 'a + + type 'a in_port = in_channel + type 'a out_port = out_channel + + let children = ref [] + + let new_channel = + fun () -> + let i, o = Unix.pipe () in + Unix.in_channel_of_descr i, Unix.out_channel_of_descr o + + let get c = + fun () -> Marshal.from_channel c + + let put x c = + fun () -> Marshal.to_channel c x [] + + + let return v = + fun () -> v + + let bind e f = + fun () -> f (e ()) () + + let run p = + let v = p() in + List.iter + (fun x -> try ignore(Unix.waitpid [] x) with _ -> ()) + !children; + v + + let doco l = + fun () -> + List.iter (fun p -> + let i = Unix.fork () in + if i = 0 then begin + children := []; + run p; + exit 0 + end else begin + children := i::!children + end) + l +end diff --git a/src/kahnsock.ml b/src/kahnsock.ml new file mode 100644 index 0000000..6cdfadd --- /dev/null +++ b/src/kahnsock.ml @@ -0,0 +1,45 @@ +Random.self_init () + +type ident = (int * int * int * int) +let gen_ident () = + Random.int 1000000000, Random.int 1000000000, + Random.int 1000000000, Random.int 1000000000 + +module Sock : Kahn.S = struct + + (* L'idée : + - L'ensemble des noeuds qui font du calcul est un arbre. + Le premier noeud lancé est la racine de l'arbre ; tous les + noeuds qui se connectent par la suite se connectent à un + noeud déjà présent et sont donc son fils. + - Les processus sont des fermetures de type unit -> unit, + transmises par des canaux + - Un noeud de calcul est un processus ocaml avec un seul + thread. Le parallélisme est coopératif (penser à faire + des binds assez souvent). + - Les noeuds publient régulièrement leur load, ie le nombre + de processus en attente et qui ne sont pas en train + d'attendre des données depuis un canal. Si un noeud a un + voisin dont le load est plus faible que le sien d'une + quantité plus grande que 2, il délègue une tâche. + - Le noeud racine délègue toutes ses tâches et sert uniquement + pour les entrées-sorties + - Comportement indéterminé lorsqu'un noeud se déconnecte + (des processus peuvent disparaître, le réseau est cassé...) + - Les canaux sont identifiés par le type ident décrit + ci-dessus. Lorsque quelqu'un écrit sur un canal, tout le + monde le sait. Lorsque quelqu'un lit sur un canal, tout le + monde le sait. (on n'est pas capable de déterminer + quel est le noeud propriétaire du processus devant lire + le message) Les communications sont donc coûteuses. + *) + + type 'a process = (unit -> 'a) + + type 'a in_port = ident + type 'a out_port = ident + + let cin = (0, 0, 0, 0) + let cout = (0, 0, 0, 1) + +end diff --git a/src/primes.ml b/src/primes.ml index 0911f31..e0eeed7 100644 --- a/src/primes.ml +++ b/src/primes.ml @@ -32,11 +32,12 @@ module Primes (K : Kahn.S) = struct let main : unit process = (delay new_channel ()) >>= - (fun (q_in, q_out) -> doco [ integers 5000 q_out ; primes q_in ]) + (fun (q_in, q_out) -> doco [ integers 2000 q_out ; primes q_in ]) end -module P = Primes(Kahn.Seq) +module Eng = Kahn.Pipe +module P = Primes(Eng) let () = P.K.run P.main -- cgit v1.2.3