summaryrefslogtreecommitdiff
path: root/src/kahn_pipe.ml
blob: f0bec979efe67624fb816b224f27e4e4c90ead91 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
open Kahn

module Pipe: S = struct
	type 'a process = unit -> 'a

	type 'a in_port = in_channel
	type 'a out_port = out_channel

	let new_channel =
		fun () ->
			let i, o = Unix.pipe () in
			Unix.in_channel_of_descr i, Unix.out_channel_of_descr o

	let get c =
		fun () -> Marshal.from_channel c

	let put x c =
		fun () ->
      Marshal.to_channel c x [];
      flush c
	
	let try_get block prt_list =
		let fds = List.map fst prt_list in
		let fds = List.map Unix.descr_of_in_channel fds in
		let ok_fds, _, _ = Unix.select fds [] []
			(if block then -1.0 else 0.0) 
		in
		match ok_fds with
		| [] -> None
		| fd::x ->
			let chan, f =
				List.find
					(fun (s, _) -> Unix.descr_of_in_channel s = fd)
					prt_list
			in
				Some(f (Marshal.from_channel chan))
	
	let select prt_list =
		fun () ->
			match try_get true prt_list with
			| Some x -> x
			| None -> assert false

	let select_default prt_list def =
		fun () ->
			match try_get false prt_list with
			| Some x -> x
			| None -> def ()

	let return v =
		fun () -> v

	let bind e f =
		fun () -> f (e ()) ()
	let bind_io = bind

	let run p =
    p()

	let doco l =
		fun () ->
      let children =
        List.map
          (fun p ->
            match Unix.fork () with
            | 0 ->
              run p;
              exit 0
            | i -> i)
          l
      in
        List.iter
          (fun x -> try ignore(Unix.waitpid [] x) with _ -> ())
          children
end