summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/kahn.ml364
-rw-r--r--src/kahnsock.ml142
-rw-r--r--src/primes.ml4
3 files changed, 296 insertions, 214 deletions
diff --git a/src/kahn.ml b/src/kahn.ml
index 5229f7e..a02ee24 100644
--- a/src/kahn.ml
+++ b/src/kahn.ml
@@ -1,199 +1,211 @@
module type S = sig
- type 'a process
- type 'a in_port
- type 'a out_port
+ type 'a process
+ type 'a in_port
+ type 'a out_port
- val new_channel: unit -> 'a in_port * 'a out_port
- val put: 'a -> 'a out_port -> unit process
- val get: 'a in_port -> 'a process
+ val io_read: unit -> string
+ val io_write: string -> unit
- val doco: unit process list -> unit process
+ val new_channel: unit -> 'a in_port * 'a out_port
+ val put: 'a -> 'a out_port -> unit process
+ val get: 'a in_port -> 'a process
- val return: 'a -> 'a process
- val bind: 'a process -> ('a -> 'b process) -> 'b process
+ val doco: unit process list -> unit process
- val run: 'a process -> 'a
+ val return: 'a -> 'a process
+ val bind: 'a process -> ('a -> 'b process) -> 'b process
+
+ val run: 'a process -> 'a
end
module Lib (K : S) = struct
- let ( >>= ) x f = K.bind x f
-
- let delay f x =
- K.bind (K.return ()) (fun () -> K.return (f x))
-
- let par_map f l =
- let rec build_workers l (ports, workers) =
- match l with
- | [] -> (ports, workers)
- | x :: l ->
- let qi, qo = K.new_channel () in
- build_workers
- l
- (qi :: ports,
- ((delay f x) >>= (fun v -> K.put v qo)) :: workers)
- in
- let ports, workers = build_workers l ([], []) in
- let rec collect l acc qo =
- match l with
- | [] -> K.put acc qo
- | qi :: l -> (K.get qi) >>= (fun v -> collect l (v :: acc) qo)
- in
- let qi, qo = K.new_channel () in
- K.run
- ((K.doco ((collect ports [] qo) :: workers)) >>= (fun _ -> K.get qi))
+ let ( >>= ) x f = K.bind x f
+
+ let delay f x =
+ K.bind (K.return ()) (fun () -> K.return (f x))
+
+ let par_map f l =
+ let rec build_workers l (ports, workers) =
+ match l with
+ | [] -> (ports, workers)
+ | x :: l ->
+ let qi, qo = K.new_channel () in
+ build_workers
+ l
+ (qi :: ports,
+ ((delay f x) >>= (fun v -> K.put v qo)) :: workers)
+ in
+ let ports, workers = build_workers l ([], []) in
+ let rec collect l acc qo =
+ match l with
+ | [] -> K.put acc qo
+ | qi :: l -> (K.get qi) >>= (fun v -> collect l (v :: acc) qo)
+ in
+ let qi, qo = K.new_channel () in
+ K.run
+ ((K.doco ((collect ports [] qo) :: workers)) >>= (fun _ -> K.get qi))
end
module Th: S = struct
- type 'a process = (unit -> 'a)
-
- type 'a channel = { q: 'a Queue.t ; m: Mutex.t; }
- type 'a in_port = 'a channel
- type 'a out_port = 'a channel
-
- let new_channel () =
- let q = { q = Queue.create (); m = Mutex.create (); } in
- q, q
-
- let put v c () =
- Mutex.lock c.m;
- Queue.push v c.q;
- Mutex.unlock c.m;
- Thread.yield ()
-
- let rec get c () =
- try
- Mutex.lock c.m;
- let v = Queue.pop c.q in
- Mutex.unlock c.m;
- v
- with Queue.Empty ->
- Mutex.unlock c.m;
- Thread.yield ();
- get c ()
-
- let doco l () =
- let ths = List.map (fun f -> Thread.create f ()) l in
- List.iter (fun th -> Thread.join th) ths
-
- let return v = (fun () -> v)
-
- let bind e e' () =
- let v = e () in
- Thread.yield ();
- e' v ()
-
- let run e = e ()
+ type 'a process = (unit -> 'a)
+
+ type 'a channel = { q: 'a Queue.t ; m: Mutex.t; }
+ type 'a in_port = 'a channel
+ type 'a out_port = 'a channel
+
+ let new_channel () =
+ let q = { q = Queue.create (); m = Mutex.create (); } in
+ q, q
+
+ let io_read () = ""
+ let io_write s = print_string s; flush stdout
+
+ let put v c () =
+ Mutex.lock c.m;
+ Queue.push v c.q;
+ Mutex.unlock c.m;
+ Thread.yield ()
+
+ let rec get c () =
+ try
+ Mutex.lock c.m;
+ let v = Queue.pop c.q in
+ Mutex.unlock c.m;
+ v
+ with Queue.Empty ->
+ Mutex.unlock c.m;
+ Thread.yield ();
+ get c ()
+
+ let doco l () =
+ let ths = List.map (fun f -> Thread.create f ()) l in
+ List.iter (fun th -> Thread.join th) ths
+
+ let return v = (fun () -> v)
+
+ let bind e e' () =
+ let v = e () in
+ Thread.yield ();
+ e' v ()
+
+ let run e = e ()
end
module Seq: S = struct
- type 'a process = (('a -> unit) option) -> unit
-
- type 'a channel = 'a Queue.t
- type 'a in_port = 'a channel
- type 'a out_port = 'a channel
-
- type task = unit -> unit
-
- let tasks = Queue.create ()
-
- let new_channel () =
- let q = Queue.create () in
- q, q
-
- let put x c =
- fun cont ->
- Queue.push x c;
- match cont with
- | None -> ()
- | Some cont -> Queue.push cont tasks
-
- let rec get c =
- fun cont ->
- try
- let v = Queue.pop c in
- match cont with
- | None -> ()
- | Some cont -> Queue.push (fun () -> cont v) tasks
- with Queue.Empty ->
- Queue.push (fun () -> get c cont) tasks
-
- let doco l =
- fun cont ->
- List.iter (fun proc -> Queue.push (fun () -> proc None) tasks) l;
- match cont with
- | None -> ()
- | Some cont -> Queue.push cont tasks
-
- let return v =
- fun cont ->
- match cont with
- | None -> ()
- | Some cont -> Queue.push (fun () -> cont v) tasks
-
- let bind e f =
- fun cont ->
- Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks
-
- let run e =
- let ret = ref None in
- e (Some (fun v -> ret := Some v));
- while not (Queue.is_empty tasks) do
- let task = Queue.pop tasks in
- task ()
- done;
- match !ret with
- | Some k -> k
- | None -> assert false
+ type 'a process = (('a -> unit) option) -> unit
+
+ type 'a channel = 'a Queue.t
+ type 'a in_port = 'a channel
+ type 'a out_port = 'a channel
+
+ type task = unit -> unit
+
+ let tasks = Queue.create ()
+
+ let io_read () = ""
+ let io_write s = print_string s; flush stdout
+
+ let new_channel () =
+ let q = Queue.create () in
+ q, q
+
+ let put x c =
+ fun cont ->
+ Queue.push x c;
+ match cont with
+ | None -> ()
+ | Some cont -> Queue.push cont tasks
+
+ let rec get c =
+ fun cont ->
+ try
+ let v = Queue.pop c in
+ match cont with
+ | None -> ()
+ | Some cont -> Queue.push (fun () -> cont v) tasks
+ with Queue.Empty ->
+ Queue.push (fun () -> get c cont) tasks
+
+ let doco l =
+ fun cont ->
+ List.iter (fun proc -> Queue.push (fun () -> proc None) tasks) l;
+ match cont with
+ | None -> ()
+ | Some cont -> Queue.push cont tasks
+
+ let return v =
+ fun cont ->
+ match cont with
+ | None -> ()
+ | Some cont -> Queue.push (fun () -> cont v) tasks
+
+ let bind e f =
+ fun cont ->
+ Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks
+
+ let run e =
+ let ret = ref None in
+ e (Some (fun v -> ret := Some v));
+ while not (Queue.is_empty tasks) do
+ let task = Queue.pop tasks in
+ task ()
+ done;
+ match !ret with
+ | Some k -> k
+ | None -> assert false
end
module Pipe: S = struct
- type 'a process = unit -> 'a
-
- type 'a in_port = in_channel
- type 'a out_port = out_channel
-
- let children = ref []
-
- let new_channel =
- fun () ->
- let i, o = Unix.pipe () in
- Unix.in_channel_of_descr i, Unix.out_channel_of_descr o
-
- let get c =
- fun () -> Marshal.from_channel c
-
- let put x c =
- fun () -> Marshal.to_channel c x []
-
-
- let return v =
- fun () -> v
-
- let bind e f =
- fun () -> f (e ()) ()
-
- let run p =
- let v = p() in
- List.iter
- (fun x -> try ignore(Unix.waitpid [] x) with _ -> ())
- !children;
- v
-
- let doco l =
- fun () ->
- List.iter (fun p ->
- let i = Unix.fork () in
- if i = 0 then begin
- children := [];
- run p;
- exit 0
- end else begin
- children := i::!children
- end)
- l
+ type 'a process = unit -> 'a
+
+ type 'a in_port = in_channel
+ type 'a out_port = out_channel
+
+ let children = ref []
+
+ let io_read () = ""
+ let io_write s = print_string s; flush stdout
+
+ let new_channel =
+ fun () ->
+ let i, o = Unix.pipe () in
+ Unix.in_channel_of_descr i, Unix.out_channel_of_descr o
+
+ let get c =
+ fun () -> Marshal.from_channel c
+
+ let put x c =
+ fun () -> Marshal.to_channel c x []
+
+
+ let return v =
+ fun () -> v
+
+ let bind e f =
+ fun () -> f (e ()) ()
+
+ let run p =
+ let v = p() in
+ List.iter
+ (fun x -> try ignore(Unix.waitpid [] x) with _ -> ())
+ !children;
+ v
+
+ let doco l =
+ fun () ->
+ List.iter (fun p ->
+ let i = Unix.fork () in
+ if i = 0 then begin
+ children := [];
+ run p;
+ exit 0
+ end else begin
+ children := i::!children
+ end)
+ l
end
diff --git a/src/kahnsock.ml b/src/kahnsock.ml
index 6cdfadd..89ee65c 100644
--- a/src/kahnsock.ml
+++ b/src/kahnsock.ml
@@ -2,44 +2,114 @@ Random.self_init ()
type ident = (int * int * int * int)
let gen_ident () =
- Random.int 1000000000, Random.int 1000000000,
- Random.int 1000000000, Random.int 1000000000
+ Random.int 1000000000, Random.int 1000000000,
+ Random.int 1000000000, Random.int 1000000000
module Sock : Kahn.S = struct
- (* L'idée :
- - L'ensemble des noeuds qui font du calcul est un arbre.
- Le premier noeud lancé est la racine de l'arbre ; tous les
- noeuds qui se connectent par la suite se connectent à un
- noeud déjà présent et sont donc son fils.
- - Les processus sont des fermetures de type unit -> unit,
- transmises par des canaux
- - Un noeud de calcul est un processus ocaml avec un seul
- thread. Le parallélisme est coopératif (penser à faire
- des binds assez souvent).
- - Les noeuds publient régulièrement leur load, ie le nombre
- de processus en attente et qui ne sont pas en train
- d'attendre des données depuis un canal. Si un noeud a un
- voisin dont le load est plus faible que le sien d'une
- quantité plus grande que 2, il délègue une tâche.
- - Le noeud racine délègue toutes ses tâches et sert uniquement
- pour les entrées-sorties
- - Comportement indéterminé lorsqu'un noeud se déconnecte
- (des processus peuvent disparaître, le réseau est cassé...)
- - Les canaux sont identifiés par le type ident décrit
- ci-dessus. Lorsque quelqu'un écrit sur un canal, tout le
- monde le sait. Lorsque quelqu'un lit sur un canal, tout le
- monde le sait. (on n'est pas capable de déterminer
- quel est le noeud propriétaire du processus devant lire
- le message) Les communications sont donc coûteuses.
- *)
-
- type 'a process = (unit -> 'a)
-
- type 'a in_port = ident
- type 'a out_port = ident
-
- let cin = (0, 0, 0, 0)
- let cout = (0, 0, 0, 1)
+ (* L'idée :
+ - L'ensemble des noeuds qui font du calcul est un arbre.
+ Le premier noeud lancé est la racine de l'arbre ; tous les
+ noeuds qui se connectent par la suite se connectent à un
+ noeud déjà présent et sont donc son fils.
+ - Les processus sont des fermetures de type unit -> unit,
+ transmises par des canaux
+ - Un noeud de calcul est un processus ocaml avec un seul
+ thread. Le parallélisme est coopératif (penser à faire
+ des binds assez souvent).
+ - Les noeuds publient régulièrement leur load, ie le nombre
+ de processus en attente et qui ne sont pas en train
+ d'attendre des données depuis un canal. Si un noeud a un
+ voisin dont le load est plus faible que le sien d'une
+ quantité plus grande que 2, il délègue une tâche.
+ - Le noeud racine délègue toutes ses tâches et sert uniquement
+ pour les entrées-sorties
+ - Comportement indéterminé lorsqu'un noeud se déconnecte
+ (des processus peuvent disparaître, le réseau est cassé...)
+ - Les canaux sont identifiés par le type ident décrit
+ ci-dessus. Lorsque quelqu'un écrit sur un canal, tout le
+ monde le sait. Lorsque quelqu'un lit sur un canal, tout le
+ monde le sait. (on n'est pas capable de déterminer
+ quel est le noeud propriétaire du processus devant lire
+ le message) Les communications sont donc coûteuses.
+ - On garantit que si chaque canal est lu par un processus
+ et écrit par un autre, alors l'ordre des messages est
+ conservé. On ne garantit pas l'ordre s'il y a plusieurs
+ écrivains, et on est à peu près sûrs que le réseau va
+ planter s'il y a plusieurs lecteurs.
+ *)
+
+ type 'a process = (('a -> unit) option) -> unit
+
+ type 'a in_port = ident
+ type 'a out_port = ident
+
+ type task = unit -> unit
+
+ let tasks = Queue.create ()
+ let read_wait_tasks = Hashtbl.create 42
+
+ let channels = Hashtbl.create 42
+
+ type host_id = string
+ type message = host_id * message_data
+ (* message contains sender ID *)
+ and message_data =
+ | Hello
+ | LoadAdvert of host_id * int
+ (* Host X has N tasks waiting *)
+ | Delegate of task
+ (* I need you to do this for me *)
+ | SendChan of ident * string
+ (* Put message in buffer *)
+ | RecvChan of ident
+ (* Read message from buffer (everybody clear it from
+ memory !) *)
+ | IOWrite of string
+ | Bye
+
+ let peers = Hashtbl.create 12 (* host_id -> in_chan * out_chan *)
+ let parent = ref "" (* id of parent *)
+ let myself = ref ""
+
+ let tell peer msg =
+ let _, o = Hashtbl.find peers peer in
+ Marshall.to_channel o msg
+
+ let tell_all msg =
+ Hashtbl.iter peers
+ (fun _ (_, o) -> Marshall.to_channel o msg)
+
+ let tell_all_except peer msg =
+ Hashtbl.iter peers
+ (fun k (_, o) -> if k <> peer then
+ Marshall.to_channel o msg)
+
+ let io_read () = ""
+ let io_write msg =
+ tell !parent (!myself, IOWrite msg)
+
+ let new_channel () =
+ let x = gen_ident () in x, x
+
+ let put port x =
+ fun cont ->
+ tell_all (!myself, SendChan(port, Marshal.to_string x));
+ match cont with
+ | Some cont -> Queue.push cont tasks
+ | None -> ()
+
+ let rec get port =
+ fun cont ->
+ try
+ let p = Hashtbl.find channels port in
+ let v = Queue.pop p in
+ tell_all (!myself, RecvChan port)
+ match cont with
+ | None -> ()
+ | Some -> Queue.push (fun () -> cont v) tasks
+ with _ -> (* no message in queue *)
+ Hashtbl.add read_wait_tasks
+ port (fun () -> get port cont)
end
diff --git a/src/primes.ml b/src/primes.ml
index e0eeed7..924f7d3 100644
--- a/src/primes.ml
+++ b/src/primes.ml
@@ -25,7 +25,7 @@ module Primes (K : Kahn.S) = struct
let rec primes (qi : int in_port) : unit process =
(get qi) >>= (fun v ->
if v <> -1 then begin
- Format.printf "%d@." v;
+ io_write ((string_of_int v)^"\n");
(delay new_channel ()) >>=
(fun (qi2, qo2) -> doco [ filter v qi qo2 ; primes qi2 ])
end else return ())
@@ -36,7 +36,7 @@ module Primes (K : Kahn.S) = struct
end
-module Eng = Kahn.Pipe
+module Eng = Kahn.Seq
module P = Primes(Eng)
let () = P.K.run P.main