blob: 5229f7eac23d1ad2dd54104832ba4267a3838fa1 (
plain) (
tree)
|
|
module type S = sig
type 'a process
type 'a in_port
type 'a out_port
val new_channel: unit -> 'a in_port * 'a out_port
val put: 'a -> 'a out_port -> unit process
val get: 'a in_port -> 'a process
val doco: unit process list -> unit process
val return: 'a -> 'a process
val bind: 'a process -> ('a -> 'b process) -> 'b process
val run: 'a process -> 'a
end
module Lib (K : S) = struct
let ( >>= ) x f = K.bind x f
let delay f x =
K.bind (K.return ()) (fun () -> K.return (f x))
let par_map f l =
let rec build_workers l (ports, workers) =
match l with
| [] -> (ports, workers)
| x :: l ->
let qi, qo = K.new_channel () in
build_workers
l
(qi :: ports,
((delay f x) >>= (fun v -> K.put v qo)) :: workers)
in
let ports, workers = build_workers l ([], []) in
let rec collect l acc qo =
match l with
| [] -> K.put acc qo
| qi :: l -> (K.get qi) >>= (fun v -> collect l (v :: acc) qo)
in
let qi, qo = K.new_channel () in
K.run
((K.doco ((collect ports [] qo) :: workers)) >>= (fun _ -> K.get qi))
end
module Th: S = struct
type 'a process = (unit -> 'a)
type 'a channel = { q: 'a Queue.t ; m: Mutex.t; }
type 'a in_port = 'a channel
type 'a out_port = 'a channel
let new_channel () =
let q = { q = Queue.create (); m = Mutex.create (); } in
q, q
let put v c () =
Mutex.lock c.m;
Queue.push v c.q;
Mutex.unlock c.m;
Thread.yield ()
let rec get c () =
try
Mutex.lock c.m;
let v = Queue.pop c.q in
Mutex.unlock c.m;
v
with Queue.Empty ->
Mutex.unlock c.m;
Thread.yield ();
get c ()
let doco l () =
let ths = List.map (fun f -> Thread.create f ()) l in
List.iter (fun th -> Thread.join th) ths
let return v = (fun () -> v)
let bind e e' () =
let v = e () in
Thread.yield ();
e' v ()
let run e = e ()
end
module Seq: S = struct
type 'a process = (('a -> unit) option) -> unit
type 'a channel = 'a Queue.t
type 'a in_port = 'a channel
type 'a out_port = 'a channel
type task = unit -> unit
let tasks = Queue.create ()
let new_channel () =
let q = Queue.create () in
q, q
let put x c =
fun cont ->
Queue.push x c;
match cont with
| None -> ()
| Some cont -> Queue.push cont tasks
let rec get c =
fun cont ->
try
let v = Queue.pop c in
match cont with
| None -> ()
| Some cont -> Queue.push (fun () -> cont v) tasks
with Queue.Empty ->
Queue.push (fun () -> get c cont) tasks
let doco l =
fun cont ->
List.iter (fun proc -> Queue.push (fun () -> proc None) tasks) l;
match cont with
| None -> ()
| Some cont -> Queue.push cont tasks
let return v =
fun cont ->
match cont with
| None -> ()
| Some cont -> Queue.push (fun () -> cont v) tasks
let bind e f =
fun cont ->
Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks
let run e =
let ret = ref None in
e (Some (fun v -> ret := Some v));
while not (Queue.is_empty tasks) do
let task = Queue.pop tasks in
task ()
done;
match !ret with
| Some k -> k
| None -> assert false
end
module Pipe: S = struct
type 'a process = unit -> 'a
type 'a in_port = in_channel
type 'a out_port = out_channel
let children = ref []
let new_channel =
fun () ->
let i, o = Unix.pipe () in
Unix.in_channel_of_descr i, Unix.out_channel_of_descr o
let get c =
fun () -> Marshal.from_channel c
let put x c =
fun () -> Marshal.to_channel c x []
let return v =
fun () -> v
let bind e f =
fun () -> f (e ()) ()
let run p =
let v = p() in
List.iter
(fun x -> try ignore(Unix.waitpid [] x) with _ -> ())
!children;
v
let doco l =
fun () ->
List.iter (fun p ->
let i = Unix.fork () in
if i = 0 then begin
children := [];
run p;
exit 0
end else begin
children := i::!children
end)
l
end
|