summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/example.ml85
-rw-r--r--src/kahn.ml167
-rw-r--r--src/kahn_pipe.ml75
-rw-r--r--src/kahn_seq.ml83
-rw-r--r--src/kahn_sock.ml (renamed from src/kahnsock.ml)0
-rw-r--r--src/kahn_th.ml45
-rw-r--r--src/primes.ml20
7 files changed, 291 insertions, 184 deletions
diff --git a/src/example.ml b/src/example.ml
index a22f1b9..86c4f5b 100644
--- a/src/example.ml
+++ b/src/example.ml
@@ -3,30 +3,89 @@ module Example (K : Kahn.S) = struct
module Lib = Kahn.Lib(K)
open Lib
- let integers nmax (qo : int K.out_port) : unit K.process =
+ (* First test : distribute calculation of 50 first Fibonacci numbers *)
+
+ let integers first step nmax (qo : int K.out_port) : unit K.process =
let rec loop n =
if n > nmax then
K.put (-1) qo
- else
- (K.put n qo) >>= (fun () -> loop (n + 1))
+ else begin
+ (K.put n qo) >>= (fun () -> loop (n + step))
+ end
in
- loop 2
+ loop first
+
+ let rec fib n =
+ if n < 2 then n
+ else fib (n-1) + fib (n-2)
- let output (qi : int K.in_port) : unit K.process =
+ let rec slow_fib (qi : int K.in_port) (qo : (int * int) K.out_port) : unit K.process =
+ (K.get qi) >>=
+ (fun i ->
+ if i <> -1 then
+ (K.put (i, fib i) qo) >>= (fun () -> slow_fib qi qo)
+ else (K.put (i, i) qo) >>= (fun () -> K.return ()))
+
+ let output (qi : (int * int) K.in_port) : unit K.process =
let rec loop () =
- (K.get qi) >>= (fun v ->
- if v <> -1 then
- begin Format.printf "%d@." v; loop () end
- else K.return ())
+ K.bind_io
+ (K.get qi)
+ (fun (v, s) ->
+ if v <> -1 then
+ begin Format.printf "f(%d) = %d@." v s; loop () end
+ else K.return ())
in
loop ()
let main : unit K.process =
- (delay K.new_channel ()) >>=
- (fun (q_in, q_out) -> K.doco [ integers 10000 q_out ; output q_in ])
+ let max = 4 in
+ let rec aux n =
+ (delay K.new_channel ()) >>= (fun (q_in, q_out) ->
+ (delay K.new_channel ()) >>= (fun (q_in2, q_out2) ->
+ K.doco [ integers n max 50 q_out ; slow_fib q_in q_out2 ; output q_in2 ]))
+ in
+ let rec aux2 n =
+ if n = max then []
+ else aux n :: aux2 (n+1)
+ in
+ (K.return ()) >>= (fun () -> K.doco (aux2 0))
+
+ (* Second test : distribute the calculation of fib 53 *)
+
+ let rec fib_rec n r (qo : int K.out_port) =
+ (K.return ()) >>= (fun () ->
+ if r = 0 then
+ K.put (fib n) qo
+ else
+ (delay K.new_channel ()) >>= (fun (q_in, q_out) ->
+ (delay K.new_channel ()) >>= (fun (q_in2, q_out2) ->
+ K.doco
+ [
+ fib_rec (n-2) (r-1) q_out ;
+ fib_rec (n-1) (r-1) q_out2 ;
+ K.get q_in >>= (fun x ->
+ K.bind_io
+ (K.get q_in2)
+ (fun y ->
+ Format.printf "f(%d) = %d@." n (x+y);
+ K.put (x+y) qo))
+ ]
+ )))
+
+
+ let main2 : unit K.process =
+ (delay K.new_channel()) >>=
+ (fun (qi, qo) ->
+ K.doco
+ [
+ fib_rec 53 7 qo;
+ K.bind_io
+ (K.get qi)
+ (fun v -> Format.printf "Got it! Result is %d@." v; K.return ())
+ ])
end
-module E = Example(Kahn.Seq)
+module E = Example(Kahn_pipe.Pipe)
-let () = E.K.run E.main
+let () = E.K.run E.main2
diff --git a/src/kahn.ml b/src/kahn.ml
index a02ee24..08eac19 100644
--- a/src/kahn.ml
+++ b/src/kahn.ml
@@ -3,17 +3,18 @@ module type S = sig
type 'a in_port
type 'a out_port
- val io_read: unit -> string
- val io_write: string -> unit
-
val new_channel: unit -> 'a in_port * 'a out_port
val put: 'a -> 'a out_port -> unit process
val get: 'a in_port -> 'a process
+ val select: ('a in_port * ('a -> 'b)) list -> 'b process
+ val select_default: ('a in_port * ('a -> 'b)) list -> (unit -> 'b) -> 'b process
+
val doco: unit process list -> unit process
val return: 'a -> 'a process
val bind: 'a process -> ('a -> 'b process) -> 'b process
+ val bind_io: 'a process -> ('a -> 'b process) -> 'b process
val run: 'a process -> 'a
end
@@ -49,163 +50,3 @@ module Lib (K : S) = struct
end
-module Th: S = struct
- type 'a process = (unit -> 'a)
-
- type 'a channel = { q: 'a Queue.t ; m: Mutex.t; }
- type 'a in_port = 'a channel
- type 'a out_port = 'a channel
-
- let new_channel () =
- let q = { q = Queue.create (); m = Mutex.create (); } in
- q, q
-
- let io_read () = ""
- let io_write s = print_string s; flush stdout
-
- let put v c () =
- Mutex.lock c.m;
- Queue.push v c.q;
- Mutex.unlock c.m;
- Thread.yield ()
-
- let rec get c () =
- try
- Mutex.lock c.m;
- let v = Queue.pop c.q in
- Mutex.unlock c.m;
- v
- with Queue.Empty ->
- Mutex.unlock c.m;
- Thread.yield ();
- get c ()
-
- let doco l () =
- let ths = List.map (fun f -> Thread.create f ()) l in
- List.iter (fun th -> Thread.join th) ths
-
- let return v = (fun () -> v)
-
- let bind e e' () =
- let v = e () in
- Thread.yield ();
- e' v ()
-
- let run e = e ()
-end
-
-module Seq: S = struct
- type 'a process = (('a -> unit) option) -> unit
-
- type 'a channel = 'a Queue.t
- type 'a in_port = 'a channel
- type 'a out_port = 'a channel
-
- type task = unit -> unit
-
- let tasks = Queue.create ()
-
- let io_read () = ""
- let io_write s = print_string s; flush stdout
-
- let new_channel () =
- let q = Queue.create () in
- q, q
-
- let put x c =
- fun cont ->
- Queue.push x c;
- match cont with
- | None -> ()
- | Some cont -> Queue.push cont tasks
-
- let rec get c =
- fun cont ->
- try
- let v = Queue.pop c in
- match cont with
- | None -> ()
- | Some cont -> Queue.push (fun () -> cont v) tasks
- with Queue.Empty ->
- Queue.push (fun () -> get c cont) tasks
-
- let doco l =
- fun cont ->
- List.iter (fun proc -> Queue.push (fun () -> proc None) tasks) l;
- match cont with
- | None -> ()
- | Some cont -> Queue.push cont tasks
-
- let return v =
- fun cont ->
- match cont with
- | None -> ()
- | Some cont -> Queue.push (fun () -> cont v) tasks
-
- let bind e f =
- fun cont ->
- Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks
-
- let run e =
- let ret = ref None in
- e (Some (fun v -> ret := Some v));
- while not (Queue.is_empty tasks) do
- let task = Queue.pop tasks in
- task ()
- done;
- match !ret with
- | Some k -> k
- | None -> assert false
-
-end
-
-
-module Pipe: S = struct
- type 'a process = unit -> 'a
-
- type 'a in_port = in_channel
- type 'a out_port = out_channel
-
- let children = ref []
-
- let io_read () = ""
- let io_write s = print_string s; flush stdout
-
- let new_channel =
- fun () ->
- let i, o = Unix.pipe () in
- Unix.in_channel_of_descr i, Unix.out_channel_of_descr o
-
- let get c =
- fun () -> Marshal.from_channel c
-
- let put x c =
- fun () -> Marshal.to_channel c x []
-
-
- let return v =
- fun () -> v
-
- let bind e f =
- fun () -> f (e ()) ()
-
- let run p =
- let v = p() in
- List.iter
- (fun x -> try ignore(Unix.waitpid [] x) with _ -> ())
- !children;
- v
-
- let doco l =
- fun () ->
- List.iter (fun p ->
- let i = Unix.fork () in
- if i = 0 then begin
- children := [];
- run p;
- exit 0
- end else begin
- children := i::!children
- end)
- l
-end
diff --git a/src/kahn_pipe.ml b/src/kahn_pipe.ml
new file mode 100644
index 0000000..f0bec97
--- /dev/null
+++ b/src/kahn_pipe.ml
@@ -0,0 +1,75 @@
+open Kahn
+
+module Pipe: S = struct
+ type 'a process = unit -> 'a
+
+ type 'a in_port = in_channel
+ type 'a out_port = out_channel
+
+ let new_channel =
+ fun () ->
+ let i, o = Unix.pipe () in
+ Unix.in_channel_of_descr i, Unix.out_channel_of_descr o
+
+ let get c =
+ fun () -> Marshal.from_channel c
+
+ let put x c =
+ fun () ->
+ Marshal.to_channel c x [];
+ flush c
+
+ let try_get block prt_list =
+ let fds = List.map fst prt_list in
+ let fds = List.map Unix.descr_of_in_channel fds in
+ let ok_fds, _, _ = Unix.select fds [] []
+ (if block then -1.0 else 0.0)
+ in
+ match ok_fds with
+ | [] -> None
+ | fd::x ->
+ let chan, f =
+ List.find
+ (fun (s, _) -> Unix.descr_of_in_channel s = fd)
+ prt_list
+ in
+ Some(f (Marshal.from_channel chan))
+
+ let select prt_list =
+ fun () ->
+ match try_get true prt_list with
+ | Some x -> x
+ | None -> assert false
+
+ let select_default prt_list def =
+ fun () ->
+ match try_get false prt_list with
+ | Some x -> x
+ | None -> def ()
+
+ let return v =
+ fun () -> v
+
+ let bind e f =
+ fun () -> f (e ()) ()
+ let bind_io = bind
+
+ let run p =
+ p()
+
+ let doco l =
+ fun () ->
+ let children =
+ List.map
+ (fun p ->
+ match Unix.fork () with
+ | 0 ->
+ run p;
+ exit 0
+ | i -> i)
+ l
+ in
+ List.iter
+ (fun x -> try ignore(Unix.waitpid [] x) with _ -> ())
+ children
+end
diff --git a/src/kahn_seq.ml b/src/kahn_seq.ml
new file mode 100644
index 0000000..7f0eec5
--- /dev/null
+++ b/src/kahn_seq.ml
@@ -0,0 +1,83 @@
+open Kahn
+
+module Seq: S = struct
+ type 'a process = (('a -> unit) option) -> unit
+
+ type 'a channel = 'a Queue.t
+ type 'a in_port = 'a channel
+ type 'a out_port = 'a channel
+
+ type task = unit -> unit
+
+ let tasks = Queue.create ()
+
+ let push_cont (cont : ('a -> unit) option) (v : 'a) =
+ match cont with
+ | None -> ()
+ | Some cont -> Queue.push (fun () -> cont v) tasks
+
+ let new_channel () =
+ let q = Queue.create () in
+ q, q
+
+ let put x c =
+ fun cont ->
+ Queue.push x c;
+ match cont with
+ | None -> ()
+ | Some cont -> Queue.push cont tasks
+
+ let rec get c =
+ fun cont ->
+ try
+ let v = Queue.pop c in push_cont cont v
+ with Queue.Empty ->
+ Queue.push (fun () -> get c cont) tasks
+
+ let rec try_get = function
+ | [] -> None
+ | (prt, f)::q ->
+ try
+ let v = Queue.pop prt in Some (f v)
+ with Queue.Empty -> try_get q
+
+ let rec select prt_list =
+ fun cont ->
+ match try_get prt_list with
+ | Some x -> push_cont cont x
+ | None -> Queue.push (fun () -> select prt_list cont) tasks
+
+ let select_default prt_list def =
+ fun cont ->
+ match try_get prt_list with
+ | Some x -> push_cont cont x
+ | None -> push_cont cont (def())
+
+ let doco l =
+ fun cont ->
+ List.iter (fun proc -> Queue.push (fun () -> proc None) tasks) l;
+ push_cont cont ()
+
+ let return v =
+ fun cont ->
+ push_cont cont v
+
+ let bind e f =
+ fun cont ->
+ Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks
+ let bind_io e f =
+ fun cont ->
+ Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks
+
+ let run e =
+ let ret = ref None in
+ e (Some (fun v -> ret := Some v));
+ while not (Queue.is_empty tasks) do
+ let task = Queue.pop tasks in
+ task ()
+ done;
+ match !ret with
+ | Some k -> k
+ | None -> assert false
+
+end
diff --git a/src/kahnsock.ml b/src/kahn_sock.ml
index 89ee65c..89ee65c 100644
--- a/src/kahnsock.ml
+++ b/src/kahn_sock.ml
diff --git a/src/kahn_th.ml b/src/kahn_th.ml
new file mode 100644
index 0000000..b6fccad
--- /dev/null
+++ b/src/kahn_th.ml
@@ -0,0 +1,45 @@
+
+module Th: S = struct
+ type 'a process = (unit -> 'a)
+
+ type 'a channel = { q: 'a Queue.t ; m: Mutex.t; }
+ type 'a in_port = 'a channel
+ type 'a out_port = 'a channel
+
+ let new_channel () =
+ let q = { q = Queue.create (); m = Mutex.create (); } in
+ q, q
+
+ let io_read () = ""
+ let io_write s = print_string s; flush stdout
+
+ let put v c () =
+ Mutex.lock c.m;
+ Queue.push v c.q;
+ Mutex.unlock c.m;
+ Thread.yield ()
+
+ let rec get c () =
+ try
+ Mutex.lock c.m;
+ let v = Queue.pop c.q in
+ Mutex.unlock c.m;
+ v
+ with Queue.Empty ->
+ Mutex.unlock c.m;
+ Thread.yield ();
+ get c ()
+
+ let doco l () =
+ let ths = List.map (fun f -> Thread.create f ()) l in
+ List.iter (fun th -> Thread.join th) ths
+
+ let return v = (fun () -> v)
+
+ let bind e e' () =
+ let v = e () in
+ Thread.yield ();
+ e' v ()
+
+ let run e = e ()
+end
diff --git a/src/primes.ml b/src/primes.ml
index 924f7d3..11f5387 100644
--- a/src/primes.ml
+++ b/src/primes.ml
@@ -23,20 +23,24 @@ module Primes (K : Kahn.S) = struct
in loop()
let rec primes (qi : int in_port) : unit process =
- (get qi) >>= (fun v ->
- if v <> -1 then begin
- io_write ((string_of_int v)^"\n");
- (delay new_channel ()) >>=
- (fun (qi2, qo2) -> doco [ filter v qi qo2 ; primes qi2 ])
- end else return ())
+ bind_io
+ (get qi)
+ (fun v ->
+ if v <> -1 then
+ begin
+ Format.printf "%d@." v;
+ (delay new_channel ()) >>=
+ (fun (qi2, qo2) -> doco [ filter v qi qo2 ; primes qi2 ])
+ end
+ else return ())
let main : unit process =
(delay new_channel ()) >>=
- (fun (q_in, q_out) -> doco [ integers 2000 q_out ; primes q_in ])
+ (fun (q_in, q_out) -> doco [ integers 1000 q_out ; primes q_in ])
end
-module Eng = Kahn.Seq
+module Eng = Kahn_pipe.Pipe
module P = Primes(Eng)
let () = P.K.run P.main