summaryrefslogtreecommitdiff
path: root/src/kahn.ml
blob: a02ee24757c8ea2fd63f48f20de314559d4a8218 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
module type S = sig
	type 'a process
	type 'a in_port
	type 'a out_port

	val io_read: unit -> string
	val io_write: string -> unit

	val new_channel: unit -> 'a in_port * 'a out_port
	val put: 'a -> 'a out_port -> unit process
	val get: 'a in_port -> 'a process

	val doco: unit process list -> unit process

	val return: 'a -> 'a process
	val bind: 'a process -> ('a -> 'b process) -> 'b process

	val run: 'a process -> 'a
end

module Lib (K : S) = struct

	let ( >>= ) x f = K.bind x f

	let delay f x =
		K.bind (K.return ()) (fun () -> K.return (f x))

	let par_map f l =
		let rec build_workers l (ports, workers) =
			match l with
			| [] -> (ports, workers)
			| x :: l ->
					let qi, qo = K.new_channel () in
					build_workers
						l
						(qi :: ports,
						 ((delay f x) >>= (fun v -> K.put v qo)) :: workers)
		in
		let ports, workers = build_workers l ([], []) in
		let rec collect l acc qo =
			match l with
			| [] -> K.put acc qo
			| qi :: l -> (K.get qi) >>= (fun v -> collect l (v :: acc) qo)
		in
		let qi, qo = K.new_channel () in
		K.run
			((K.doco ((collect ports [] qo) :: workers)) >>= (fun _ -> K.get qi))

end


module Th: S = struct
	type 'a process = (unit -> 'a)

	type 'a channel = { q: 'a Queue.t ; m: Mutex.t; }
	type 'a in_port = 'a channel
	type 'a out_port = 'a channel

	let new_channel () =
		let q = { q = Queue.create (); m = Mutex.create (); } in
		q, q

	let io_read () = ""
	let io_write s = print_string s; flush stdout

	let put v c () =
		Mutex.lock c.m;
		Queue.push v c.q;
		Mutex.unlock c.m;
		Thread.yield ()

	let rec get c () =
		try
			Mutex.lock c.m;
			let v = Queue.pop c.q in
			Mutex.unlock c.m;
			v
		with Queue.Empty ->
			Mutex.unlock c.m;
			Thread.yield ();
			get c ()

	let doco l () =
		let ths = List.map (fun f -> Thread.create f ()) l in
		List.iter (fun th -> Thread.join th) ths

	let return v = (fun () -> v)

	let bind e e' () =
		let v = e () in
		Thread.yield ();
		e' v ()

	let run e = e ()
end

module Seq: S = struct
	type 'a process = (('a -> unit) option) -> unit

	type 'a channel = 'a Queue.t
	type 'a in_port = 'a channel
	type 'a out_port = 'a channel

	type task = unit -> unit

	let tasks = Queue.create ()

	let io_read () = ""
	let io_write s = print_string s; flush stdout

	let new_channel () =
		let q = Queue.create () in
		q, q

	let put x c =
		fun cont ->
			Queue.push x c;
			match cont with
			| None -> ()
			| Some cont ->	Queue.push cont tasks

	let rec get c =
		fun cont ->
			try
				let v = Queue.pop c in
				match cont with
				| None -> ()
				| Some cont -> Queue.push (fun () -> cont v) tasks
			with Queue.Empty ->
				Queue.push (fun () -> get c cont) tasks

	let doco l =
		fun cont ->
			List.iter (fun proc -> Queue.push (fun () -> proc None) tasks) l;
			match cont with
			| None -> ()
			| Some cont -> Queue.push cont tasks

	let return v =
		fun cont ->
			match cont with
			| None -> ()
			| Some cont -> Queue.push (fun () -> cont v) tasks

	let bind e f =
		fun cont ->
			Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks
		
	let run e =
		let ret = ref None in
		e (Some (fun v -> ret := Some v));
		while not (Queue.is_empty tasks) do
			let task = Queue.pop tasks in
			task ()
		done;
		match !ret with
		| Some k -> k
		| None -> assert false

end


module Pipe: S = struct
	type 'a process = unit -> 'a

	type 'a in_port = in_channel
	type 'a out_port = out_channel

	let children = ref []

	let io_read () = ""
	let io_write s = print_string s; flush stdout

	let new_channel =
		fun () ->
			let i, o = Unix.pipe () in
			Unix.in_channel_of_descr i, Unix.out_channel_of_descr o

	let get c =
		fun () -> Marshal.from_channel c

	let put x c =
		fun () -> Marshal.to_channel c x []


	let return v =
		fun () -> v

	let bind e f =
		fun () -> f (e ()) ()

	let run p =
		let v = p() in
		List.iter
			(fun x -> try ignore(Unix.waitpid [] x) with _ -> ())
			!children;
		v

	let doco l =
		fun () ->
			List.iter (fun p ->
				let i = Unix.fork () in
				if i = 0 then begin
					children := [];
					run p;
					exit 0
				end else begin
					children := i::!children
				end)
			l
end