summaryrefslogtreecommitdiff
path: root/src/kahn.ml
blob: a458724cf362f7cb3b45c6460d61757678435650 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
module type S = sig
	type 'a process
	type 'a in_port
	type 'a out_port

	val new_channel: unit -> 'a in_port * 'a out_port
	val put: 'a -> 'a out_port -> unit process
	val get: 'a in_port -> 'a process

	val output: string -> unit

	val select: ('a in_port * ('a -> 'b)) list -> 'b process
	val select_default: ('a in_port * ('a -> 'b)) list -> (unit -> 'b) -> 'b process

	val doco: unit process list -> unit process

	val return: 'a -> 'a process
	val bind: 'a process -> ('a -> 'b process) -> 'b process

	val run: 'a process -> 'a
end

module Lib (K : S) = struct

	let ( >>= ) x f = K.bind x f

	let delay f x =
		K.bind (K.return ()) (fun () -> K.return (f x))

	let par_map f l =
		let rec build_workers l (ports, workers) =
			match l with
			| [] -> (ports, workers)
			| x :: l ->
					let qi, qo = K.new_channel () in
					build_workers
						l
						(qi :: ports,
						 ((delay f x) >>= (fun v -> K.put v qo)) :: workers)
		in
		let ports, workers = build_workers l ([], []) in
		let rec collect l acc qo =
			match l with
			| [] -> K.put acc qo
			| qi :: l -> (K.get qi) >>= (fun v -> collect l (v :: acc) qo)
		in
		let qi, qo = K.new_channel () in
		K.run
			((K.doco ((collect ports [] qo) :: workers)) >>= (fun _ -> K.get qi))

end