summaryrefslogtreecommitdiff
path: root/src/kahn.ml
blob: 91b251fc19def182c728a73e26ec91223b03bb20 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
module type S = sig
  type 'a process
  type 'a in_port
  type 'a out_port

  val new_channel: unit -> 'a in_port * 'a out_port
  val put: 'a -> 'a out_port -> unit process
  val get: 'a in_port -> 'a process

  val doco: unit process list -> unit process

  val return: 'a -> 'a process
  val bind: 'a process -> ('a -> 'b process) -> 'b process

  val run: 'a process -> 'a
end

module Lib (K : S) = struct

  let ( >>= ) x f = K.bind x f

  let delay f x =
    K.bind (K.return ()) (fun () -> K.return (f x))

  let par_map f l =
    let rec build_workers l (ports, workers) =
      match l with
      | [] -> (ports, workers)
      | x :: l ->
          let qi, qo = K.new_channel () in
          build_workers
            l
            (qi :: ports,
             ((delay f x) >>= (fun v -> K.put v qo)) :: workers)
    in
    let ports, workers = build_workers l ([], []) in
    let rec collect l acc qo =
      match l with
      | [] -> K.put acc qo
      | qi :: l -> (K.get qi) >>= (fun v -> collect l (v :: acc) qo)
    in
    let qi, qo = K.new_channel () in
    K.run
      ((K.doco ((collect ports [] qo) :: workers)) >>= (fun _ -> K.get qi))

end


module Th: S = struct
  type 'a process = (unit -> 'a)

  type 'a channel = { q: 'a Queue.t ; m: Mutex.t; }
  type 'a in_port = 'a channel
  type 'a out_port = 'a channel

  let new_channel () =
    let q = { q = Queue.create (); m = Mutex.create (); } in
    q, q

  let put v c () =
    Mutex.lock c.m;
    Queue.push v c.q;
    Mutex.unlock c.m;
    Thread.yield ()

  let rec get c () =
    try
      Mutex.lock c.m;
      let v = Queue.pop c.q in
      Mutex.unlock c.m;
      v
    with Queue.Empty ->
      Mutex.unlock c.m;
      Thread.yield ();
      get c ()

  let doco l () =
    let ths = List.map (fun f -> Thread.create f ()) l in
    List.iter (fun th -> Thread.join th) ths

  let return v = (fun () -> v)

  let bind e e' () =
    let v = e () in
    Thread.yield ();
    e' v ()

  let run e = e ()
end

module Seq: S = struct
  type 'a process = (('a -> unit) option) -> unit

  type 'a channel = 'a Queue.t
  type 'a in_port = 'a channel
  type 'a out_port = 'a channel

  type task = unit -> unit

  let tasks = Queue.create ()

  let new_channel () =
    let q = Queue.create () in
    q, q

  let put x c =
    fun cont ->
      Queue.push x c;
      match cont with
      | None -> ()
      | Some cont ->  Queue.push cont tasks

  let rec get c =
    fun cont ->
      try
        let v = Queue.pop c in
        match cont with
        | None -> ()
        | Some cont -> Queue.push (fun () -> cont v) tasks
      with Queue.Empty ->
        Queue.push (fun () -> get c cont) tasks

  let doco l =
    fun cont ->
      List.iter (fun proc -> Queue.push (fun () -> proc None) tasks) l;
      match cont with
      | None -> ()
      | Some cont -> Queue.push cont tasks

  let return v =
    fun cont ->
      match cont with
      | None -> ()
      | Some cont -> Queue.push (fun () -> cont v) tasks

  let bind e f =
    fun cont ->
      Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks
    
  let run e =
    let ret = ref None in
    e (Some (fun v -> ret := Some v));
    while not (Queue.is_empty tasks) do
      let task = Queue.pop tasks in
      task ()
    done;
    match !ret with
    | Some k -> k
    | None -> assert false

end