summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/example.ml2
-rw-r--r--src/kahn_pipe.ml124
-rw-r--r--src/kahn_seq.ml15
-rw-r--r--src/kahn_sock.ml215
-rw-r--r--src/kahn_sock_0.ml115
-rw-r--r--src/primes.ml6
6 files changed, 296 insertions, 181 deletions
diff --git a/src/example.ml b/src/example.ml
index 86c4f5b..4971448 100644
--- a/src/example.ml
+++ b/src/example.ml
@@ -86,6 +86,6 @@ module Example (K : Kahn.S) = struct
end
-module E = Example(Kahn_pipe.Pipe)
+module E = Example(Kahn_seq.Seq)
let () = E.K.run E.main2
diff --git a/src/kahn_pipe.ml b/src/kahn_pipe.ml
index f0bec97..9f3da0a 100644
--- a/src/kahn_pipe.ml
+++ b/src/kahn_pipe.ml
@@ -1,75 +1,75 @@
open Kahn
module Pipe: S = struct
- type 'a process = unit -> 'a
+ type 'a process = unit -> 'a
- type 'a in_port = in_channel
- type 'a out_port = out_channel
+ type 'a in_port = in_channel
+ type 'a out_port = out_channel
- let new_channel =
- fun () ->
- let i, o = Unix.pipe () in
- Unix.in_channel_of_descr i, Unix.out_channel_of_descr o
+ let new_channel =
+ fun () ->
+ let i, o = Unix.pipe () in
+ Unix.in_channel_of_descr i, Unix.out_channel_of_descr o
- let get c =
- fun () -> Marshal.from_channel c
+ let get (c : 'a in_port) : 'a =
+ fun () -> Marshal.from_channel c
- let put x c =
- fun () ->
- Marshal.to_channel c x [];
- flush c
-
- let try_get block prt_list =
- let fds = List.map fst prt_list in
- let fds = List.map Unix.descr_of_in_channel fds in
- let ok_fds, _, _ = Unix.select fds [] []
- (if block then -1.0 else 0.0)
- in
- match ok_fds with
- | [] -> None
- | fd::x ->
- let chan, f =
- List.find
- (fun (s, _) -> Unix.descr_of_in_channel s = fd)
- prt_list
- in
- Some(f (Marshal.from_channel chan))
-
- let select prt_list =
- fun () ->
- match try_get true prt_list with
- | Some x -> x
- | None -> assert false
+ let put x c =
+ fun () ->
+ Marshal.to_channel c x [];
+ flush c
+
+ let try_get block prt_list =
+ let fds = List.map fst prt_list in
+ let fds = List.map Unix.descr_of_in_channel fds in
+ let ok_fds, _, _ = Unix.select fds [] []
+ (if block then -1.0 else 0.0)
+ in
+ match ok_fds with
+ | [] -> None
+ | fd::x ->
+ let chan, f =
+ List.find
+ (fun (s, _) -> Unix.descr_of_in_channel s = fd)
+ prt_list
+ in
+ Some(f (Marshal.from_channel chan))
+
+ let select prt_list =
+ fun () ->
+ match try_get true prt_list with
+ | Some x -> x
+ | None -> assert false
- let select_default prt_list def =
- fun () ->
- match try_get false prt_list with
- | Some x -> x
- | None -> def ()
+ let select_default prt_list def =
+ fun () ->
+ match try_get false prt_list with
+ | Some x -> x
+ | None -> def ()
- let return v =
- fun () -> v
+ let return v =
+ fun () -> v
- let bind e f =
- fun () -> f (e ()) ()
- let bind_io = bind
+ let bind e f =
+ fun () -> f (e ()) ()
+ let bind_io = bind
- let run p =
- p()
+ let run p =
+ p()
- let doco l =
- fun () ->
- let children =
- List.map
- (fun p ->
- match Unix.fork () with
- | 0 ->
- run p;
- exit 0
- | i -> i)
- l
- in
- List.iter
- (fun x -> try ignore(Unix.waitpid [] x) with _ -> ())
- children
+ let doco l =
+ fun () ->
+ let children =
+ List.map
+ (fun p ->
+ match Unix.fork () with
+ | 0 ->
+ run p;
+ exit 0
+ | i -> i)
+ l
+ in
+ List.iter
+ (fun i -> try ignore(Unix.waitpid [] i) with _ -> ())
+ children
end
diff --git a/src/kahn_seq.ml b/src/kahn_seq.ml
index 7f0eec5..ce82117 100644
--- a/src/kahn_seq.ml
+++ b/src/kahn_seq.ml
@@ -14,7 +14,7 @@ module Seq: S = struct
let push_cont (cont : ('a -> unit) option) (v : 'a) =
match cont with
| None -> ()
- | Some cont -> Queue.push (fun () -> cont v) tasks
+ | Some cont_f -> Queue.push (fun () -> cont_f v) tasks
let new_channel () =
let q = Queue.create () in
@@ -23,9 +23,7 @@ module Seq: S = struct
let put x c =
fun cont ->
Queue.push x c;
- match cont with
- | None -> ()
- | Some cont -> Queue.push cont tasks
+ push_cont cont ()
let rec get c =
fun cont ->
@@ -62,12 +60,11 @@ module Seq: S = struct
fun cont ->
push_cont cont v
- let bind e f =
- fun cont ->
- Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks
- let bind_io e f =
+ let bind (e : 'a process) (f : 'a -> 'b process) : 'b process =
fun cont ->
- Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks
+ e (Some (fun (r : 'a) -> f r cont))
+
+ let bind_io = bind
let run e =
let ret = ref None in
diff --git a/src/kahn_sock.ml b/src/kahn_sock.ml
index 89ee65c..bce6cf2 100644
--- a/src/kahn_sock.ml
+++ b/src/kahn_sock.ml
@@ -1,115 +1,120 @@
-Random.self_init ()
-
-type ident = (int * int * int * int)
-let gen_ident () =
- Random.int 1000000000, Random.int 1000000000,
- Random.int 1000000000, Random.int 1000000000
-
-module Sock : Kahn.S = struct
-
- (* L'idée :
- - L'ensemble des noeuds qui font du calcul est un arbre.
- Le premier noeud lancé est la racine de l'arbre ; tous les
- noeuds qui se connectent par la suite se connectent à un
- noeud déjà présent et sont donc son fils.
- - Les processus sont des fermetures de type unit -> unit,
- transmises par des canaux
- - Un noeud de calcul est un processus ocaml avec un seul
- thread. Le parallélisme est coopératif (penser à faire
- des binds assez souvent).
- - Les noeuds publient régulièrement leur load, ie le nombre
- de processus en attente et qui ne sont pas en train
- d'attendre des données depuis un canal. Si un noeud a un
- voisin dont le load est plus faible que le sien d'une
- quantité plus grande que 2, il délègue une tâche.
- - Le noeud racine délègue toutes ses tâches et sert uniquement
- pour les entrées-sorties
- - Comportement indéterminé lorsqu'un noeud se déconnecte
- (des processus peuvent disparaître, le réseau est cassé...)
- - Les canaux sont identifiés par le type ident décrit
- ci-dessus. Lorsque quelqu'un écrit sur un canal, tout le
- monde le sait. Lorsque quelqu'un lit sur un canal, tout le
- monde le sait. (on n'est pas capable de déterminer
- quel est le noeud propriétaire du processus devant lire
- le message) Les communications sont donc coûteuses.
- - On garantit que si chaque canal est lu par un processus
- et écrit par un autre, alors l'ordre des messages est
- conservé. On ne garantit pas l'ordre s'il y a plusieurs
- écrivains, et on est à peu près sûrs que le réseau va
- planter s'il y a plusieurs lecteurs.
- *)
+open Kahn
+open Unix
+
+(* make_addr : string -> int -> sockaddr *)
+let make_addr host port =
+ let host = gethostbyname host in
+ ADDR_INET(host.h_addr_list.(Random.int (Array.length host.h_addr_list)), port)
+
+module Sock: S = struct
+
+ let kahn_port = 8197
type 'a process = (('a -> unit) option) -> unit
- type 'a in_port = ident
- type 'a out_port = ident
- type task = unit -> unit
+ type 'a channel = int
+ type 'a in_port = 'a channel
+ type 'a out_port = 'a channel
+ type task = unit -> unit
let tasks = Queue.create ()
- let read_wait_tasks = Hashtbl.create 42
- let channels = Hashtbl.create 42
-
- type host_id = string
- type message = host_id * message_data
- (* message contains sender ID *)
- and message_data =
+ let socket_to_srv = ref None
+
+ type cli_msg =
+ | Hello
+ | Put of int * string
+ | Get of int * (string -> task)
+ | AskTask
+ | GiveTask of task
+ | GiveIOTask of task
+ | FinalResult of string
+ type srv_msg =
| Hello
- | LoadAdvert of host_id * int
- (* Host X has N tasks waiting *)
- | Delegate of task
- (* I need you to do this for me *)
- | SendChan of ident * string
- (* Put message in buffer *)
- | RecvChan of ident
- (* Read message from buffer (everybody clear it from
- memory !) *)
- | IOWrite of string
- | Bye
-
- let peers = Hashtbl.create 12 (* host_id -> in_chan * out_chan *)
- let parent = ref "" (* id of parent *)
- let myself = ref ""
-
- let tell peer msg =
- let _, o = Hashtbl.find peers peer in
- Marshall.to_channel o msg
-
- let tell_all msg =
- Hashtbl.iter peers
- (fun _ (_, o) -> Marshall.to_channel o msg)
-
- let tell_all_except peer msg =
- Hashtbl.iter peers
- (fun k (_, o) -> if k <> peer then
- Marshall.to_channel o msg)
-
- let io_read () = ""
- let io_write msg =
- tell !parent (!myself, IOWrite msg)
-
- let new_channel () =
- let x = gen_ident () in x, x
-
- let put port x =
- fun cont ->
- tell_all (!myself, SendChan(port, Marshal.to_string x));
- match cont with
- | Some cont -> Queue.push cont tasks
- | None -> ()
-
- let rec get port =
- fun cont ->
- try
- let p = Hashtbl.find channels port in
- let v = Queue.pop p in
- tell_all (!myself, RecvChan port)
- match cont with
- | None -> ()
- | Some -> Queue.push (fun () -> cont v) tasks
- with _ -> (* no message in queue *)
- Hashtbl.add read_wait_tasks
- port (fun () -> get port cont)
+ | GiveTask of task
+ | PleaseWait
+ | FinalResult of string
+
+
+ let rec tell_server (msg : cli_msg) =
+ match !socket_to_srv with
+ | Some s -> Marshal.to_channel s msg [Marshal.Closures]; flush s
+ | None -> handle_msg_server (fun _ -> assert false) msg
+
+ and handle_msg_server (reply_fun : srv_msg -> unit) = function
+ | Hello -> reply_fun Hello
+ | _ -> () (* TODO *)
+
+ and client host =
+ (* Initialize socket *)
+ let sock = socket PF_INET SOCK_STREAM 0 in
+ connect sock (make_addr host kahn_port);
+ let i, o = in_channel_of_descr sock, out_channel_of_descr sock in
+ socket_to_srv := Some o;
+ let get_msg () = Marshal.from_channel i in
+ (* Initialize protocol *)
+ tell_server Hello;
+ assert (get_msg() = Hello);
+ (* Loop *)
+ let rec loop () =
+ tell_server AskTask;
+ match get_msg () with
+ | Hello -> assert false
+ | GiveTask task -> task (); loop ()
+ | PleaseWait -> sleep 2; loop ()
+ | FinalResult s -> Marshal.from_string s
+ in
+ let result = loop() in
+ shutdown sock SHUTDOWN_ALL;
+ result
+
+ and server e =
+ (* Initialize task list *)
+ push_task (fun () -> e None);
+
+ (* Initialize socket *)
+ let sock = socket PF_INET SOCK_STREAM 0 in
+ bind sock (make_addr "0.0.0.0" kahn_port);
+ listen sock 10;
+ let stop_srv _ =
+ Format.eprintf "Shutdown server...@.";
+ shutdown sock SHUTDOWN_ALL;
+ exit 0
+ in
+ Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv);
+ Sys.set_signal Sys.sigterm (Sys.Signal_handle stop_srv);
+
+ (* Loop *)
+ let clients = ref [] in
+ while true do
+ let fds = List.map (fun (i, o, a) -> descr_of_in_channel i) !clients in
+ match select (sock::fds) [] [] (-1.0) with
+ | s::_, _, _ when s = sock ->
+ (* New client ! *)
+ let fd, addr = accept sock in
+ clients :=
+ (in_channel_of_descr fd,
+ out_channel_of_descr fd,
+ addr)::!clients
+ | s::_, _, _ ->
+ (* Client sent something *)
+ let i, o, a = List.find
+ (fun (i, _, _) -> descr_of_in_channel i = s) !clients in
+ let msg = Marshal.from_channel i in
+ handle_msg_server
+ (fun m -> Marshal.to_channel o m [Marshall.Closures]; flush o)
+ msg
+ | _ -> assert false
+ done
+
+ let srv = ref ""
+ let set_var v s = v := s
+ let run e =
+ Arg.parse [] (set_var srv) "usage: kahn [server_addr]";
+ if !srv = "" then
+ server e
+ else
+ client !sr
end
diff --git a/src/kahn_sock_0.ml b/src/kahn_sock_0.ml
new file mode 100644
index 0000000..89ee65c
--- /dev/null
+++ b/src/kahn_sock_0.ml
@@ -0,0 +1,115 @@
+Random.self_init ()
+
+type ident = (int * int * int * int)
+let gen_ident () =
+ Random.int 1000000000, Random.int 1000000000,
+ Random.int 1000000000, Random.int 1000000000
+
+module Sock : Kahn.S = struct
+
+ (* L'idée :
+ - L'ensemble des noeuds qui font du calcul est un arbre.
+ Le premier noeud lancé est la racine de l'arbre ; tous les
+ noeuds qui se connectent par la suite se connectent à un
+ noeud déjà présent et sont donc son fils.
+ - Les processus sont des fermetures de type unit -> unit,
+ transmises par des canaux
+ - Un noeud de calcul est un processus ocaml avec un seul
+ thread. Le parallélisme est coopératif (penser à faire
+ des binds assez souvent).
+ - Les noeuds publient régulièrement leur load, ie le nombre
+ de processus en attente et qui ne sont pas en train
+ d'attendre des données depuis un canal. Si un noeud a un
+ voisin dont le load est plus faible que le sien d'une
+ quantité plus grande que 2, il délègue une tâche.
+ - Le noeud racine délègue toutes ses tâches et sert uniquement
+ pour les entrées-sorties
+ - Comportement indéterminé lorsqu'un noeud se déconnecte
+ (des processus peuvent disparaître, le réseau est cassé...)
+ - Les canaux sont identifiés par le type ident décrit
+ ci-dessus. Lorsque quelqu'un écrit sur un canal, tout le
+ monde le sait. Lorsque quelqu'un lit sur un canal, tout le
+ monde le sait. (on n'est pas capable de déterminer
+ quel est le noeud propriétaire du processus devant lire
+ le message) Les communications sont donc coûteuses.
+ - On garantit que si chaque canal est lu par un processus
+ et écrit par un autre, alors l'ordre des messages est
+ conservé. On ne garantit pas l'ordre s'il y a plusieurs
+ écrivains, et on est à peu près sûrs que le réseau va
+ planter s'il y a plusieurs lecteurs.
+ *)
+
+ type 'a process = (('a -> unit) option) -> unit
+
+ type 'a in_port = ident
+ type 'a out_port = ident
+
+ type task = unit -> unit
+
+ let tasks = Queue.create ()
+ let read_wait_tasks = Hashtbl.create 42
+
+ let channels = Hashtbl.create 42
+
+ type host_id = string
+ type message = host_id * message_data
+ (* message contains sender ID *)
+ and message_data =
+ | Hello
+ | LoadAdvert of host_id * int
+ (* Host X has N tasks waiting *)
+ | Delegate of task
+ (* I need you to do this for me *)
+ | SendChan of ident * string
+ (* Put message in buffer *)
+ | RecvChan of ident
+ (* Read message from buffer (everybody clear it from
+ memory !) *)
+ | IOWrite of string
+ | Bye
+
+ let peers = Hashtbl.create 12 (* host_id -> in_chan * out_chan *)
+ let parent = ref "" (* id of parent *)
+ let myself = ref ""
+
+ let tell peer msg =
+ let _, o = Hashtbl.find peers peer in
+ Marshall.to_channel o msg
+
+ let tell_all msg =
+ Hashtbl.iter peers
+ (fun _ (_, o) -> Marshall.to_channel o msg)
+
+ let tell_all_except peer msg =
+ Hashtbl.iter peers
+ (fun k (_, o) -> if k <> peer then
+ Marshall.to_channel o msg)
+
+ let io_read () = ""
+ let io_write msg =
+ tell !parent (!myself, IOWrite msg)
+
+ let new_channel () =
+ let x = gen_ident () in x, x
+
+ let put port x =
+ fun cont ->
+ tell_all (!myself, SendChan(port, Marshal.to_string x));
+ match cont with
+ | Some cont -> Queue.push cont tasks
+ | None -> ()
+
+ let rec get port =
+ fun cont ->
+ try
+ let p = Hashtbl.find channels port in
+ let v = Queue.pop p in
+ tell_all (!myself, RecvChan port)
+ match cont with
+ | None -> ()
+ | Some -> Queue.push (fun () -> cont v) tasks
+ with _ -> (* no message in queue *)
+ Hashtbl.add read_wait_tasks
+ port (fun () -> get port cont)
+
+end
diff --git a/src/primes.ml b/src/primes.ml
index 11f5387..b9a57ed 100644
--- a/src/primes.ml
+++ b/src/primes.ml
@@ -23,9 +23,7 @@ module Primes (K : Kahn.S) = struct
in loop()
let rec primes (qi : int in_port) : unit process =
- bind_io
- (get qi)
- (fun v ->
+ (get qi) >>= (fun v ->
if v <> -1 then
begin
Format.printf "%d@." v;
@@ -36,7 +34,7 @@ module Primes (K : Kahn.S) = struct
let main : unit process =
(delay new_channel ()) >>=
- (fun (q_in, q_out) -> doco [ integers 1000 q_out ; primes q_in ])
+ (fun (q_in, q_out) -> doco [ integers 2000 q_out ; primes q_in ])
end