blob: a02ee24757c8ea2fd63f48f20de314559d4a8218 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
|
module type S = sig
type 'a process
type 'a in_port
type 'a out_port
val io_read: unit -> string
val io_write: string -> unit
val new_channel: unit -> 'a in_port * 'a out_port
val put: 'a -> 'a out_port -> unit process
val get: 'a in_port -> 'a process
val doco: unit process list -> unit process
val return: 'a -> 'a process
val bind: 'a process -> ('a -> 'b process) -> 'b process
val run: 'a process -> 'a
end
module Lib (K : S) = struct
let ( >>= ) x f = K.bind x f
let delay f x =
K.bind (K.return ()) (fun () -> K.return (f x))
let par_map f l =
let rec build_workers l (ports, workers) =
match l with
| [] -> (ports, workers)
| x :: l ->
let qi, qo = K.new_channel () in
build_workers
l
(qi :: ports,
((delay f x) >>= (fun v -> K.put v qo)) :: workers)
in
let ports, workers = build_workers l ([], []) in
let rec collect l acc qo =
match l with
| [] -> K.put acc qo
| qi :: l -> (K.get qi) >>= (fun v -> collect l (v :: acc) qo)
in
let qi, qo = K.new_channel () in
K.run
((K.doco ((collect ports [] qo) :: workers)) >>= (fun _ -> K.get qi))
end
module Th: S = struct
type 'a process = (unit -> 'a)
type 'a channel = { q: 'a Queue.t ; m: Mutex.t; }
type 'a in_port = 'a channel
type 'a out_port = 'a channel
let new_channel () =
let q = { q = Queue.create (); m = Mutex.create (); } in
q, q
let io_read () = ""
let io_write s = print_string s; flush stdout
let put v c () =
Mutex.lock c.m;
Queue.push v c.q;
Mutex.unlock c.m;
Thread.yield ()
let rec get c () =
try
Mutex.lock c.m;
let v = Queue.pop c.q in
Mutex.unlock c.m;
v
with Queue.Empty ->
Mutex.unlock c.m;
Thread.yield ();
get c ()
let doco l () =
let ths = List.map (fun f -> Thread.create f ()) l in
List.iter (fun th -> Thread.join th) ths
let return v = (fun () -> v)
let bind e e' () =
let v = e () in
Thread.yield ();
e' v ()
let run e = e ()
end
module Seq: S = struct
type 'a process = (('a -> unit) option) -> unit
type 'a channel = 'a Queue.t
type 'a in_port = 'a channel
type 'a out_port = 'a channel
type task = unit -> unit
let tasks = Queue.create ()
let io_read () = ""
let io_write s = print_string s; flush stdout
let new_channel () =
let q = Queue.create () in
q, q
let put x c =
fun cont ->
Queue.push x c;
match cont with
| None -> ()
| Some cont -> Queue.push cont tasks
let rec get c =
fun cont ->
try
let v = Queue.pop c in
match cont with
| None -> ()
| Some cont -> Queue.push (fun () -> cont v) tasks
with Queue.Empty ->
Queue.push (fun () -> get c cont) tasks
let doco l =
fun cont ->
List.iter (fun proc -> Queue.push (fun () -> proc None) tasks) l;
match cont with
| None -> ()
| Some cont -> Queue.push cont tasks
let return v =
fun cont ->
match cont with
| None -> ()
| Some cont -> Queue.push (fun () -> cont v) tasks
let bind e f =
fun cont ->
Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks
let run e =
let ret = ref None in
e (Some (fun v -> ret := Some v));
while not (Queue.is_empty tasks) do
let task = Queue.pop tasks in
task ()
done;
match !ret with
| Some k -> k
| None -> assert false
end
module Pipe: S = struct
type 'a process = unit -> 'a
type 'a in_port = in_channel
type 'a out_port = out_channel
let children = ref []
let io_read () = ""
let io_write s = print_string s; flush stdout
let new_channel =
fun () ->
let i, o = Unix.pipe () in
Unix.in_channel_of_descr i, Unix.out_channel_of_descr o
let get c =
fun () -> Marshal.from_channel c
let put x c =
fun () -> Marshal.to_channel c x []
let return v =
fun () -> v
let bind e f =
fun () -> f (e ()) ()
let run p =
let v = p() in
List.iter
(fun x -> try ignore(Unix.waitpid [] x) with _ -> ())
!children;
v
let doco l =
fun () ->
List.iter (fun p ->
let i = Unix.fork () in
if i = 0 then begin
children := [];
run p;
exit 0
end else begin
children := i::!children
end)
l
end
|