summaryrefslogtreecommitdiff
path: root/src/kahn.ml
blob: 5229f7eac23d1ad2dd54104832ba4267a3838fa1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
module type S = sig
  type 'a process
  type 'a in_port
  type 'a out_port

  val new_channel: unit -> 'a in_port * 'a out_port
  val put: 'a -> 'a out_port -> unit process
  val get: 'a in_port -> 'a process

  val doco: unit process list -> unit process

  val return: 'a -> 'a process
  val bind: 'a process -> ('a -> 'b process) -> 'b process

  val run: 'a process -> 'a
end

module Lib (K : S) = struct

  let ( >>= ) x f = K.bind x f

  let delay f x =
    K.bind (K.return ()) (fun () -> K.return (f x))

  let par_map f l =
    let rec build_workers l (ports, workers) =
      match l with
      | [] -> (ports, workers)
      | x :: l ->
          let qi, qo = K.new_channel () in
          build_workers
            l
            (qi :: ports,
             ((delay f x) >>= (fun v -> K.put v qo)) :: workers)
    in
    let ports, workers = build_workers l ([], []) in
    let rec collect l acc qo =
      match l with
      | [] -> K.put acc qo
      | qi :: l -> (K.get qi) >>= (fun v -> collect l (v :: acc) qo)
    in
    let qi, qo = K.new_channel () in
    K.run
      ((K.doco ((collect ports [] qo) :: workers)) >>= (fun _ -> K.get qi))

end


module Th: S = struct
  type 'a process = (unit -> 'a)

  type 'a channel = { q: 'a Queue.t ; m: Mutex.t; }
  type 'a in_port = 'a channel
  type 'a out_port = 'a channel

  let new_channel () =
    let q = { q = Queue.create (); m = Mutex.create (); } in
    q, q

  let put v c () =
    Mutex.lock c.m;
    Queue.push v c.q;
    Mutex.unlock c.m;
    Thread.yield ()

  let rec get c () =
    try
      Mutex.lock c.m;
      let v = Queue.pop c.q in
      Mutex.unlock c.m;
      v
    with Queue.Empty ->
      Mutex.unlock c.m;
      Thread.yield ();
      get c ()

  let doco l () =
    let ths = List.map (fun f -> Thread.create f ()) l in
    List.iter (fun th -> Thread.join th) ths

  let return v = (fun () -> v)

  let bind e e' () =
    let v = e () in
    Thread.yield ();
    e' v ()

  let run e = e ()
end

module Seq: S = struct
  type 'a process = (('a -> unit) option) -> unit

  type 'a channel = 'a Queue.t
  type 'a in_port = 'a channel
  type 'a out_port = 'a channel

  type task = unit -> unit

  let tasks = Queue.create ()

  let new_channel () =
    let q = Queue.create () in
    q, q

  let put x c =
    fun cont ->
      Queue.push x c;
      match cont with
      | None -> ()
      | Some cont ->  Queue.push cont tasks

  let rec get c =
    fun cont ->
      try
        let v = Queue.pop c in
        match cont with
        | None -> ()
        | Some cont -> Queue.push (fun () -> cont v) tasks
      with Queue.Empty ->
        Queue.push (fun () -> get c cont) tasks

  let doco l =
    fun cont ->
      List.iter (fun proc -> Queue.push (fun () -> proc None) tasks) l;
      match cont with
      | None -> ()
      | Some cont -> Queue.push cont tasks

  let return v =
    fun cont ->
      match cont with
      | None -> ()
      | Some cont -> Queue.push (fun () -> cont v) tasks

  let bind e f =
    fun cont ->
      Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks
    
  let run e =
    let ret = ref None in
    e (Some (fun v -> ret := Some v));
    while not (Queue.is_empty tasks) do
      let task = Queue.pop tasks in
      task ()
    done;
    match !ret with
    | Some k -> k
    | None -> assert false

end


module Pipe: S = struct
    type 'a process = unit -> 'a

    type 'a in_port = in_channel
    type 'a out_port = out_channel

    let children = ref []

    let new_channel =
        fun () ->
            let i, o = Unix.pipe () in
            Unix.in_channel_of_descr i, Unix.out_channel_of_descr o

    let get c =
        fun () -> Marshal.from_channel c

    let put x c =
        fun () -> Marshal.to_channel c x []


    let return v =
        fun () -> v

    let bind e f =
        fun () -> f (e ()) ()

    let run p =
        let v = p() in
        List.iter
            (fun x -> try ignore(Unix.waitpid [] x) with _ -> ())
            !children;
        v

    let doco l =
        fun () ->
            List.iter (fun p ->
                let i = Unix.fork () in
                if i = 0 then begin
                    children := [];
                    run p;
                    exit 0
                end else begin
                    children := i::!children
                end)
            l
end