summaryrefslogtreecommitdiff
path: root/src/kahn_stdio.ml
blob: 4c8c9764f2241635520586da868ab6d7438a7f24 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
open Unix

open Util
open Kahn
open Proto


module ProtoKahn: S = struct

	type 'a process = ('a -> unit) -> unit

	type 'a channel = id
	type 'a in_port = 'a channel
	type 'a out_port = 'a channel

	let send m = Marshal.to_channel Pervasives.stdout m []; flush Pervasives.stdout
	let read () : message = read_one_msg stdin

	let task_desc t = Marshal.to_string t [Marshal.Closures]

	let send_task t =
		send (GiveTask(task_desc t))

	let new_channel () =
		let x = new_id() in x, x
	
	let put v prt =
		fun cont ->
			send (Put(prt, Marshal.to_string v []));
			cont ()
	
	let get prt =
		fun cont ->
			send (Get(prt, task_desc (fun s -> cont (Marshal.from_string s 0))))
	
	let output s = send (Output s)
	
	let select pl = assert false (* Not Implemented *)
	let select_default pl = assert false (* Not Implemented *)

	let doco plist =
		fun cont ->
			let f_ch_id = new_id () in
			List.iter
				(fun p ->
					send_task
						(fun () -> p (fun () -> send (Put(f_ch_id, "")))))
				plist;
			let rec push_x = function
			| 0 -> cont ()
			| n -> send (Get(f_ch_id, task_desc (fun s -> push_x (n-1))))
			in push_x (List.length plist)
	
	let return v =
		fun cont -> cont v
	let bind a f =
		fun cont ->
			a (fun va -> f va cont)

	(* Main function *)

	let origin = ref false
	let dbg_out = ref false
    let color = ref 0
	let dbg x = if !dbg_out then Format.eprintf "(cli) %s@." x

	let parse_args () =
		let usage = "Usage: ./program [options]" in
		let options = [
			"-org", Arg.Set origin, "Launch root process";
			"-dbg", Arg.Set dbg_out, "Show debug output";
            "-col", Arg.Set_int color, "Color for output";
		] in
		Arg.parse options (fun _ -> assert false) usage

	let run proc =
		parse_args();

		Random.self_init();
        let color =
                if !color = 0
                   then Random.int 6 + 31
                   else 30 + !color in
        let cseq = Format.sprintf "\x1B[%dm" color in
        let ncseq = "\x1B[0m" in

		(* Initialize protocol *)
		send Hello;
		if read () <> Hello then raise (ProtocolError "Server did not say Hello correctly.");
		(* Start task if necessary *)
		if !origin then proc (fun r -> send (FinalResult (Marshal.to_string r [])));
		(* While there are things to do... *)
		let result = ref None in
		while !result = None do
			dbg "Requesting task...";
			send RequestTask;
			dbg "Reading...";
			match read() with
			| GiveTask(td) ->
				dbg "Got task!";
				let t : task = Marshal.from_string td 0 in
				Format.eprintf "%s[%s@?" cseq ncseq;
				t();
				Format.eprintf "%s]%s@?" cseq ncseq;
			| GiveMsgTask(msg, td) ->
				dbg "Got msg task!";
				let t : msg_task = Marshal.from_string td 0 in
				Format.eprintf "%s{%s@?" cseq ncseq;
				t msg;
				Format.eprintf "%s}%s@?" cseq ncseq;
			| FinalResult(x) ->
				dbg "Got result!";
				result := Some (Marshal.from_string x 0)
			| _ -> raise (ProtocolError "Invalid message in main loop.")
		done;
		(* Return result *)
		match !result with
		| None -> assert false
		| Some r -> r
		
end