summaryrefslogtreecommitdiff
path: root/src/kahn_stdio.ml
diff options
context:
space:
mode:
authorAlex AUVOLAT <alex.auvolat@ens.fr>2014-05-24 23:25:07 +0200
committerAlex AUVOLAT <alex.auvolat@ens.fr>2014-05-24 23:25:07 +0200
commit6e750a757ef6fb1f41cf4c2fe39edba834b76858 (patch)
treef339630beb8a9a1a6f3544b40547ce9c83f23a91 /src/kahn_stdio.ml
parentc5e69a904e79e807c5b075c08ce82183133e7b4c (diff)
downloadSystemeReseaux-Projet-6e750a757ef6fb1f41cf4c2fe39edba834b76858.tar.gz
SystemeReseaux-Projet-6e750a757ef6fb1f41cf4c2fe39edba834b76858.zip
./manager -local-proc 4 ./example.native does what expected.
Diffstat (limited to 'src/kahn_stdio.ml')
-rw-r--r--src/kahn_stdio.ml131
1 files changed, 131 insertions, 0 deletions
diff --git a/src/kahn_stdio.ml b/src/kahn_stdio.ml
new file mode 100644
index 0000000..a149742
--- /dev/null
+++ b/src/kahn_stdio.ml
@@ -0,0 +1,131 @@
+open Unix
+
+open Util
+open Kahn
+open Proto
+
+
+module ProtoKahn: S = struct
+
+ type 'a process = (('a -> unit) option) -> unit
+
+ type 'a channel = id
+ type 'a in_port = 'a channel
+ type 'a out_port = 'a channel
+
+ let send m = Marshal.to_channel Pervasives.stdout m []; flush Pervasives.stdout
+ let read () : message = read_one_msg stdin
+
+ let task_desc t = Marshal.to_string t [Marshal.Closures]
+
+ let send_task t is_io =
+ send (GiveTask(task_desc t, is_io))
+
+ let new_channel () =
+ let x = new_id() in x, x
+
+ let push_cont cont arg is_io =
+ match cont with
+ | None -> ()
+ | Some cont ->
+ send_task (fun () -> cont arg) is_io
+
+ let put v prt =
+ fun cont ->
+ send (Put(prt, Marshal.to_string v []));
+ push_cont cont () false
+
+ let get prt =
+ fun cont ->
+ send (Get(prt,
+ task_desc
+ (fun s -> match cont with
+ | None -> ()
+ | Some cont -> cont (Marshal.from_string s 0))
+ )
+ )
+
+ let select pl = fun cont -> assert false
+ let select_default = fun cont -> assert false
+
+ let doco plist =
+ fun cont ->
+ let f_ch_id = new_id () in
+ List.iter
+ (fun p ->
+ send_task
+ (fun () -> p
+ (Some (fun () -> send (Put(f_ch_id, ""))))
+ )
+ false
+ ) plist;
+ let rec push_x = function
+ | 0 -> push_cont cont () false
+ | n -> send (Get(f_ch_id, task_desc (fun s -> push_x (n-1))))
+ in push_x (List.length plist)
+
+ let return v =
+ fun cont ->
+ match cont with
+ | None -> ()
+ | Some cont -> cont v
+ let bind a f =
+ fun cont ->
+ a (Some (fun va ->
+ let b = (f va) in
+ b cont))
+ let bind_io a f =
+ fun cont ->
+ a (Some (fun va ->
+ send_task
+ (fun () ->
+ let b = f va in
+ send_task (fun () -> b cont) false)
+ true))
+
+ let origin = ref false
+ let dbg_out = ref false
+ let dbg x = if !dbg_out then Format.eprintf "(cli) %s@." x
+
+ let parse_args () =
+ let usage = "Usage: ./program [options]" in
+ let options = [
+ "-org", Arg.Set origin, "Launch root process";
+ "-dbg", Arg.Set dbg_out, "Show debug output";
+ ] in
+ Arg.parse options (fun _ -> assert false) usage
+
+ let run proc =
+ Random.self_init();
+ parse_args();
+ (* Initialize protocol *)
+ send Hello;
+ if read () <> Hello then raise (ProtocolError "Server did not say Hello correctly.");
+ (* Start task if necessary *)
+ if !origin then proc (Some (fun r -> send (FinalResult (Marshal.to_string r []))));
+ (* While there are things to do... *)
+ let result = ref None in
+ while !result = None do
+ dbg "Requesting task...";
+ send RequestTask;
+ dbg "Reading...";
+ match read() with
+ | GiveTask(td, _) ->
+ dbg "Got task!";
+ let t : task = Marshal.from_string td 0 in
+ t();
+ | GiveMsgTask(msg, td) ->
+ dbg "Got msg task!";
+ let t : msg_task = Marshal.from_string td 0 in
+ t msg;
+ | FinalResult(x) ->
+ dbg "Got result!";
+ result := Some (Marshal.from_string x 0)
+ | _ -> raise (ProtocolError "Invalid message in main loop.")
+ done;
+ (* Return result *)
+ match !result with
+ | None -> assert false
+ | Some r -> r
+
+end