diff options
-rw-r--r-- | src/kahn_seq.ml | 19 | ||||
-rw-r--r-- | src/kahn_stdio.ml | 50 | ||||
-rw-r--r-- | src/manager.ml | 10 | ||||
-rw-r--r-- | src/primes.ml | 2 | ||||
-rw-r--r-- | src/proto.ml | 3 |
5 files changed, 32 insertions, 52 deletions
diff --git a/src/kahn_seq.ml b/src/kahn_seq.ml index 177d6dd..8aff905 100644 --- a/src/kahn_seq.ml +++ b/src/kahn_seq.ml @@ -1,7 +1,7 @@ open Kahn module Seq: S = struct - type 'a process = (('a -> unit) option) -> unit + type 'a process = ('a -> unit) -> unit type 'a channel = 'a Queue.t type 'a in_port = 'a channel @@ -11,10 +11,8 @@ module Seq: S = struct let tasks = Queue.create () - let push_cont (cont : ('a -> unit) option) (v : 'a) = - match cont with - | None -> () - | Some cont_f -> Queue.push (fun () -> cont_f v) tasks + let push_cont (cont : 'a -> unit) (v : 'a) = + Queue.push (fun () -> cont v) tasks let new_channel () = let q = Queue.create () in @@ -55,20 +53,19 @@ module Seq: S = struct let doco l = fun cont -> - List.iter (fun proc -> Queue.push (fun () -> proc None) tasks) l; - push_cont cont () + List.iter (fun proc -> Queue.push (fun () -> proc (fun () -> ())) tasks) l; + cont () let return v = - fun cont -> - push_cont cont v + fun cont -> cont v let bind (e : 'a process) (f : 'a -> 'b process) : 'b process = fun cont -> - e (Some (fun (r : 'a) -> f r cont)) + e (fun (r : 'a) -> f r cont) let run e = let ret = ref None in - e (Some (fun v -> ret := Some v)); + e (fun v -> ret := Some v); while not (Queue.is_empty tasks) do let task = Queue.pop tasks in task () diff --git a/src/kahn_stdio.ml b/src/kahn_stdio.ml index 1bed0a1..4c8c976 100644 --- a/src/kahn_stdio.ml +++ b/src/kahn_stdio.ml @@ -7,7 +7,7 @@ open Proto module ProtoKahn: S = struct - type 'a process = (('a -> unit) option) -> unit + type 'a process = ('a -> unit) -> unit type 'a channel = id type 'a in_port = 'a channel @@ -18,37 +18,25 @@ module ProtoKahn: S = struct let task_desc t = Marshal.to_string t [Marshal.Closures] - let send_task t is_io = - send (GiveTask(task_desc t, is_io)) + let send_task t = + send (GiveTask(task_desc t)) let new_channel () = let x = new_id() in x, x - let push_cont cont arg is_io = - match cont with - | None -> () - | Some cont -> - send_task (fun () -> cont arg) is_io - let put v prt = fun cont -> send (Put(prt, Marshal.to_string v [])); - push_cont cont () false + cont () let get prt = fun cont -> - send (Get(prt, - task_desc - (fun s -> match cont with - | None -> () - | Some cont -> cont (Marshal.from_string s 0)) - ) - ) + send (Get(prt, task_desc (fun s -> cont (Marshal.from_string s 0)))) let output s = send (Output s) - let select pl = fun cont -> assert false - let select_default = fun cont -> assert false + let select pl = assert false (* Not Implemented *) + let select_default pl = assert false (* Not Implemented *) let doco plist = fun cont -> @@ -56,26 +44,20 @@ module ProtoKahn: S = struct List.iter (fun p -> send_task - (fun () -> p - (Some (fun () -> send (Put(f_ch_id, "")))) - ) - false - ) plist; + (fun () -> p (fun () -> send (Put(f_ch_id, ""))))) + plist; let rec push_x = function - | 0 -> push_cont cont () false + | 0 -> cont () | n -> send (Get(f_ch_id, task_desc (fun s -> push_x (n-1)))) in push_x (List.length plist) let return v = - fun cont -> - match cont with - | None -> () - | Some cont -> cont v + fun cont -> cont v let bind a f = fun cont -> - a (Some (fun va -> - let b = (f va) in - b cont)) + a (fun va -> f va cont) + + (* Main function *) let origin = ref false let dbg_out = ref false @@ -106,7 +88,7 @@ module ProtoKahn: S = struct send Hello; if read () <> Hello then raise (ProtocolError "Server did not say Hello correctly."); (* Start task if necessary *) - if !origin then proc (Some (fun r -> send (FinalResult (Marshal.to_string r [])))); + if !origin then proc (fun r -> send (FinalResult (Marshal.to_string r []))); (* While there are things to do... *) let result = ref None in while !result = None do @@ -114,7 +96,7 @@ module ProtoKahn: S = struct send RequestTask; dbg "Reading..."; match read() with - | GiveTask(td, _) -> + | GiveTask(td) -> dbg "Got task!"; let t : task = Marshal.from_string td 0 in Format.eprintf "%s[%s@?" cseq ncseq; diff --git a/src/manager.ml b/src/manager.ml index 1e6a0df..9498d8c 100644 --- a/src/manager.ml +++ b/src/manager.ml @@ -21,7 +21,7 @@ let pool_count = ref 0 (* Server data structures *) type task_el = - | Task of task_descr * bool + | Task of task_descr | MsgTask of string * msg_task_descr type client_status = @@ -82,7 +82,7 @@ let push_task server task = c.send (match task with | MsgTask(a, b) -> GiveMsgTask(a, b) - | Task(a, b) -> GiveTask(a, b)) + | Task(a) -> GiveTask(a)) let get_task server = Queue.pop server.tasks @@ -172,7 +172,7 @@ let rec server_run server = cli.status <- Waiting else cli.send (match Queue.pop server.tasks with | MsgTask(a, b) -> GiveMsgTask(a, b) - | Task(a, b) -> GiveTask(a,b)) + | Task(a) -> GiveTask(a)) | Some r -> cli.send(FinalResult r); client_disconnect server cli @@ -197,9 +197,9 @@ let rec server_run server = c.send(FinalResult x); client_disconnect server c) !p - | GiveTask(a, b) -> + | GiveTask(a) -> dbg "got Task"; - push_task server (Task(a, b)) + push_task server (Task(a)) | GiveMsgTask(a, b) -> dbg "got MsgTask"; push_task server (MsgTask(a, b)) diff --git a/src/primes.ml b/src/primes.ml index a91b11c..6975b9e 100644 --- a/src/primes.ml +++ b/src/primes.ml @@ -41,7 +41,7 @@ module Primes (K : Kahn.S) = struct end -module Eng = Kahn_stdio.ProtoKahn +module Eng = Kahn_seq.Seq module P = Primes(Eng) let () = diff --git a/src/proto.ml b/src/proto.ml index 1f6d8e6..df41944 100644 --- a/src/proto.ml +++ b/src/proto.ml @@ -4,6 +4,7 @@ open Util type task = unit -> unit type msg_task = string -> unit + type task_descr = string type msg_task_descr = string @@ -15,7 +16,7 @@ type message = | Get of id * msg_task_descr | Put of id * string | RequestTask - | GiveTask of task_descr * bool + | GiveTask of task_descr | GiveMsgTask of string * msg_task_descr | FinalResult of string |