diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile | 19 | ||||
-rw-r--r-- | src/example.ml | 32 | ||||
-rw-r--r-- | src/kahn.ml | 152 | ||||
-rw-r--r-- | src/primes.ml | 42 |
4 files changed, 245 insertions, 0 deletions
diff --git a/src/Makefile b/src/Makefile new file mode 100644 index 0000000..ed9aa69 --- /dev/null +++ b/src/Makefile @@ -0,0 +1,19 @@ +OCAMLBUILD=ocamlbuild -classic-display \ + -tags annot,debug,thread \ + -libs unix +TARGET=native + +primes: + $(OCAMLBUILD) primes.$(TARGET) + +example: + $(OCAMLBUILD) example.$(TARGET) + + +clean: + $(OCAMLBUILD) -clean + +realclean: clean + rm -f *~ + +cleanall: realclean diff --git a/src/example.ml b/src/example.ml new file mode 100644 index 0000000..a22f1b9 --- /dev/null +++ b/src/example.ml @@ -0,0 +1,32 @@ +module Example (K : Kahn.S) = struct + module K = K + module Lib = Kahn.Lib(K) + open Lib + + let integers nmax (qo : int K.out_port) : unit K.process = + let rec loop n = + if n > nmax then + K.put (-1) qo + else + (K.put n qo) >>= (fun () -> loop (n + 1)) + in + loop 2 + + let output (qi : int K.in_port) : unit K.process = + let rec loop () = + (K.get qi) >>= (fun v -> + if v <> -1 then + begin Format.printf "%d@." v; loop () end + else K.return ()) + in + loop () + + let main : unit K.process = + (delay K.new_channel ()) >>= + (fun (q_in, q_out) -> K.doco [ integers 10000 q_out ; output q_in ]) + +end + +module E = Example(Kahn.Seq) + +let () = E.K.run E.main diff --git a/src/kahn.ml b/src/kahn.ml new file mode 100644 index 0000000..91b251f --- /dev/null +++ b/src/kahn.ml @@ -0,0 +1,152 @@ +module type S = sig + type 'a process + type 'a in_port + type 'a out_port + + val new_channel: unit -> 'a in_port * 'a out_port + val put: 'a -> 'a out_port -> unit process + val get: 'a in_port -> 'a process + + val doco: unit process list -> unit process + + val return: 'a -> 'a process + val bind: 'a process -> ('a -> 'b process) -> 'b process + + val run: 'a process -> 'a +end + +module Lib (K : S) = struct + + let ( >>= ) x f = K.bind x f + + let delay f x = + K.bind (K.return ()) (fun () -> K.return (f x)) + + let par_map f l = + let rec build_workers l (ports, workers) = + match l with + | [] -> (ports, workers) + | x :: l -> + let qi, qo = K.new_channel () in + build_workers + l + (qi :: ports, + ((delay f x) >>= (fun v -> K.put v qo)) :: workers) + in + let ports, workers = build_workers l ([], []) in + let rec collect l acc qo = + match l with + | [] -> K.put acc qo + | qi :: l -> (K.get qi) >>= (fun v -> collect l (v :: acc) qo) + in + let qi, qo = K.new_channel () in + K.run + ((K.doco ((collect ports [] qo) :: workers)) >>= (fun _ -> K.get qi)) + +end + + +module Th: S = struct + type 'a process = (unit -> 'a) + + type 'a channel = { q: 'a Queue.t ; m: Mutex.t; } + type 'a in_port = 'a channel + type 'a out_port = 'a channel + + let new_channel () = + let q = { q = Queue.create (); m = Mutex.create (); } in + q, q + + let put v c () = + Mutex.lock c.m; + Queue.push v c.q; + Mutex.unlock c.m; + Thread.yield () + + let rec get c () = + try + Mutex.lock c.m; + let v = Queue.pop c.q in + Mutex.unlock c.m; + v + with Queue.Empty -> + Mutex.unlock c.m; + Thread.yield (); + get c () + + let doco l () = + let ths = List.map (fun f -> Thread.create f ()) l in + List.iter (fun th -> Thread.join th) ths + + let return v = (fun () -> v) + + let bind e e' () = + let v = e () in + Thread.yield (); + e' v () + + let run e = e () +end + +module Seq: S = struct + type 'a process = (('a -> unit) option) -> unit + + type 'a channel = 'a Queue.t + type 'a in_port = 'a channel + type 'a out_port = 'a channel + + type task = unit -> unit + + let tasks = Queue.create () + + let new_channel () = + let q = Queue.create () in + q, q + + let put x c = + fun cont -> + Queue.push x c; + match cont with + | None -> () + | Some cont -> Queue.push cont tasks + + let rec get c = + fun cont -> + try + let v = Queue.pop c in + match cont with + | None -> () + | Some cont -> Queue.push (fun () -> cont v) tasks + with Queue.Empty -> + Queue.push (fun () -> get c cont) tasks + + let doco l = + fun cont -> + List.iter (fun proc -> Queue.push (fun () -> proc None) tasks) l; + match cont with + | None -> () + | Some cont -> Queue.push cont tasks + + let return v = + fun cont -> + match cont with + | None -> () + | Some cont -> Queue.push (fun () -> cont v) tasks + + let bind e f = + fun cont -> + Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks + + let run e = + let ret = ref None in + e (Some (fun v -> ret := Some v)); + while not (Queue.is_empty tasks) do + let task = Queue.pop tasks in + task () + done; + match !ret with + | Some k -> k + | None -> assert false + +end + diff --git a/src/primes.ml b/src/primes.ml new file mode 100644 index 0000000..0911f31 --- /dev/null +++ b/src/primes.ml @@ -0,0 +1,42 @@ +module Primes (K : Kahn.S) = struct + module K = K + module Lib = Kahn.Lib(K) + open K + open Lib + + let integers nmax (qo : int out_port) : unit process = + let rec loop n = + if n > nmax then + put (-1) qo + else + (put n qo) >>= (fun () -> loop (n+1)) + in + loop 2 + + let filter n (qi : int in_port) (qo : int out_port) : unit process = + let rec loop () = + (get qi) >>= (fun v -> + if v <> -1 then + (if v mod n = 0 then return () else put v qo) >>= loop + else + put v qo) + in loop() + + let rec primes (qi : int in_port) : unit process = + (get qi) >>= (fun v -> + if v <> -1 then begin + Format.printf "%d@." v; + (delay new_channel ()) >>= + (fun (qi2, qo2) -> doco [ filter v qi qo2 ; primes qi2 ]) + end else return ()) + + let main : unit process = + (delay new_channel ()) >>= + (fun (q_in, q_out) -> doco [ integers 5000 q_out ; primes q_in ]) + +end + +module P = Primes(Kahn.Seq) + +let () = P.K.run P.main + |