summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile19
-rw-r--r--src/example.ml32
-rw-r--r--src/kahn.ml152
-rw-r--r--src/primes.ml42
4 files changed, 245 insertions, 0 deletions
diff --git a/src/Makefile b/src/Makefile
new file mode 100644
index 0000000..ed9aa69
--- /dev/null
+++ b/src/Makefile
@@ -0,0 +1,19 @@
+OCAMLBUILD=ocamlbuild -classic-display \
+ -tags annot,debug,thread \
+ -libs unix
+TARGET=native
+
+primes:
+ $(OCAMLBUILD) primes.$(TARGET)
+
+example:
+ $(OCAMLBUILD) example.$(TARGET)
+
+
+clean:
+ $(OCAMLBUILD) -clean
+
+realclean: clean
+ rm -f *~
+
+cleanall: realclean
diff --git a/src/example.ml b/src/example.ml
new file mode 100644
index 0000000..a22f1b9
--- /dev/null
+++ b/src/example.ml
@@ -0,0 +1,32 @@
+module Example (K : Kahn.S) = struct
+ module K = K
+ module Lib = Kahn.Lib(K)
+ open Lib
+
+ let integers nmax (qo : int K.out_port) : unit K.process =
+ let rec loop n =
+ if n > nmax then
+ K.put (-1) qo
+ else
+ (K.put n qo) >>= (fun () -> loop (n + 1))
+ in
+ loop 2
+
+ let output (qi : int K.in_port) : unit K.process =
+ let rec loop () =
+ (K.get qi) >>= (fun v ->
+ if v <> -1 then
+ begin Format.printf "%d@." v; loop () end
+ else K.return ())
+ in
+ loop ()
+
+ let main : unit K.process =
+ (delay K.new_channel ()) >>=
+ (fun (q_in, q_out) -> K.doco [ integers 10000 q_out ; output q_in ])
+
+end
+
+module E = Example(Kahn.Seq)
+
+let () = E.K.run E.main
diff --git a/src/kahn.ml b/src/kahn.ml
new file mode 100644
index 0000000..91b251f
--- /dev/null
+++ b/src/kahn.ml
@@ -0,0 +1,152 @@
+module type S = sig
+ type 'a process
+ type 'a in_port
+ type 'a out_port
+
+ val new_channel: unit -> 'a in_port * 'a out_port
+ val put: 'a -> 'a out_port -> unit process
+ val get: 'a in_port -> 'a process
+
+ val doco: unit process list -> unit process
+
+ val return: 'a -> 'a process
+ val bind: 'a process -> ('a -> 'b process) -> 'b process
+
+ val run: 'a process -> 'a
+end
+
+module Lib (K : S) = struct
+
+ let ( >>= ) x f = K.bind x f
+
+ let delay f x =
+ K.bind (K.return ()) (fun () -> K.return (f x))
+
+ let par_map f l =
+ let rec build_workers l (ports, workers) =
+ match l with
+ | [] -> (ports, workers)
+ | x :: l ->
+ let qi, qo = K.new_channel () in
+ build_workers
+ l
+ (qi :: ports,
+ ((delay f x) >>= (fun v -> K.put v qo)) :: workers)
+ in
+ let ports, workers = build_workers l ([], []) in
+ let rec collect l acc qo =
+ match l with
+ | [] -> K.put acc qo
+ | qi :: l -> (K.get qi) >>= (fun v -> collect l (v :: acc) qo)
+ in
+ let qi, qo = K.new_channel () in
+ K.run
+ ((K.doco ((collect ports [] qo) :: workers)) >>= (fun _ -> K.get qi))
+
+end
+
+
+module Th: S = struct
+ type 'a process = (unit -> 'a)
+
+ type 'a channel = { q: 'a Queue.t ; m: Mutex.t; }
+ type 'a in_port = 'a channel
+ type 'a out_port = 'a channel
+
+ let new_channel () =
+ let q = { q = Queue.create (); m = Mutex.create (); } in
+ q, q
+
+ let put v c () =
+ Mutex.lock c.m;
+ Queue.push v c.q;
+ Mutex.unlock c.m;
+ Thread.yield ()
+
+ let rec get c () =
+ try
+ Mutex.lock c.m;
+ let v = Queue.pop c.q in
+ Mutex.unlock c.m;
+ v
+ with Queue.Empty ->
+ Mutex.unlock c.m;
+ Thread.yield ();
+ get c ()
+
+ let doco l () =
+ let ths = List.map (fun f -> Thread.create f ()) l in
+ List.iter (fun th -> Thread.join th) ths
+
+ let return v = (fun () -> v)
+
+ let bind e e' () =
+ let v = e () in
+ Thread.yield ();
+ e' v ()
+
+ let run e = e ()
+end
+
+module Seq: S = struct
+ type 'a process = (('a -> unit) option) -> unit
+
+ type 'a channel = 'a Queue.t
+ type 'a in_port = 'a channel
+ type 'a out_port = 'a channel
+
+ type task = unit -> unit
+
+ let tasks = Queue.create ()
+
+ let new_channel () =
+ let q = Queue.create () in
+ q, q
+
+ let put x c =
+ fun cont ->
+ Queue.push x c;
+ match cont with
+ | None -> ()
+ | Some cont -> Queue.push cont tasks
+
+ let rec get c =
+ fun cont ->
+ try
+ let v = Queue.pop c in
+ match cont with
+ | None -> ()
+ | Some cont -> Queue.push (fun () -> cont v) tasks
+ with Queue.Empty ->
+ Queue.push (fun () -> get c cont) tasks
+
+ let doco l =
+ fun cont ->
+ List.iter (fun proc -> Queue.push (fun () -> proc None) tasks) l;
+ match cont with
+ | None -> ()
+ | Some cont -> Queue.push cont tasks
+
+ let return v =
+ fun cont ->
+ match cont with
+ | None -> ()
+ | Some cont -> Queue.push (fun () -> cont v) tasks
+
+ let bind e f =
+ fun cont ->
+ Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks
+
+ let run e =
+ let ret = ref None in
+ e (Some (fun v -> ret := Some v));
+ while not (Queue.is_empty tasks) do
+ let task = Queue.pop tasks in
+ task ()
+ done;
+ match !ret with
+ | Some k -> k
+ | None -> assert false
+
+end
+
diff --git a/src/primes.ml b/src/primes.ml
new file mode 100644
index 0000000..0911f31
--- /dev/null
+++ b/src/primes.ml
@@ -0,0 +1,42 @@
+module Primes (K : Kahn.S) = struct
+ module K = K
+ module Lib = Kahn.Lib(K)
+ open K
+ open Lib
+
+ let integers nmax (qo : int out_port) : unit process =
+ let rec loop n =
+ if n > nmax then
+ put (-1) qo
+ else
+ (put n qo) >>= (fun () -> loop (n+1))
+ in
+ loop 2
+
+ let filter n (qi : int in_port) (qo : int out_port) : unit process =
+ let rec loop () =
+ (get qi) >>= (fun v ->
+ if v <> -1 then
+ (if v mod n = 0 then return () else put v qo) >>= loop
+ else
+ put v qo)
+ in loop()
+
+ let rec primes (qi : int in_port) : unit process =
+ (get qi) >>= (fun v ->
+ if v <> -1 then begin
+ Format.printf "%d@." v;
+ (delay new_channel ()) >>=
+ (fun (qi2, qo2) -> doco [ filter v qi qo2 ; primes qi2 ])
+ end else return ())
+
+ let main : unit process =
+ (delay new_channel ()) >>=
+ (fun (q_in, q_out) -> doco [ integers 5000 q_out ; primes q_in ])
+
+end
+
+module P = Primes(Kahn.Seq)
+
+let () = P.K.run P.main
+