summaryrefslogtreecommitdiff
path: root/src/kahn_pipe.ml
blob: 2df8bc52e256d81ecfbe096450f960bebe155c2e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
open Kahn

module Pipe: S = struct
    type 'a process = unit -> 'a

    type 'a in_port = in_channel
    type 'a out_port = out_channel

    let new_channel =
        fun () ->
            let i, o = Unix.pipe () in
            Unix.in_channel_of_descr i, Unix.out_channel_of_descr o

    let get (c : 'a in_port) : 'a =
        fun () -> Marshal.from_channel c

    let put x c =
        fun () ->
          Marshal.to_channel c x [];
          flush c
	
	let output s = Format.printf "%s@?" s
    
    let try_get block prt_list =
        let fds = List.map fst prt_list in
        let fds = List.map Unix.descr_of_in_channel fds in
        let ok_fds, _, _ = Unix.select fds [] []
            (if block then -1.0 else 0.0) 
        in
        match ok_fds with
        | [] -> None
        | fd::x ->
            let chan, f =
                List.find
                    (fun (s, _) -> Unix.descr_of_in_channel s = fd)
                    prt_list
            in
                Some(f (Marshal.from_channel chan))
    
    let select prt_list =
        fun () ->
            match try_get true prt_list with
            | Some x -> x
            | None -> assert false

    let select_default prt_list def =
        fun () ->
            match try_get false prt_list with
            | Some x -> x
            | None -> def ()

    let return v =
        fun () -> v

    let bind e f =
        fun () -> f (e ()) ()

    let run p =
        p()

    let doco l =
        fun () ->
          let children =
            List.map
              (fun p ->
                match Unix.fork () with
                | 0 ->
                  run p;
                  exit 0
                | i -> i)
              l
          in
            List.iter
              (fun i -> try ignore(Unix.waitpid [] i) with _ -> ())
              children
end