summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/kahn_seq.ml19
-rw-r--r--src/kahn_stdio.ml50
-rw-r--r--src/manager.ml10
-rw-r--r--src/primes.ml2
-rw-r--r--src/proto.ml3
5 files changed, 32 insertions, 52 deletions
diff --git a/src/kahn_seq.ml b/src/kahn_seq.ml
index 177d6dd..8aff905 100644
--- a/src/kahn_seq.ml
+++ b/src/kahn_seq.ml
@@ -1,7 +1,7 @@
open Kahn
module Seq: S = struct
- type 'a process = (('a -> unit) option) -> unit
+ type 'a process = ('a -> unit) -> unit
type 'a channel = 'a Queue.t
type 'a in_port = 'a channel
@@ -11,10 +11,8 @@ module Seq: S = struct
let tasks = Queue.create ()
- let push_cont (cont : ('a -> unit) option) (v : 'a) =
- match cont with
- | None -> ()
- | Some cont_f -> Queue.push (fun () -> cont_f v) tasks
+ let push_cont (cont : 'a -> unit) (v : 'a) =
+ Queue.push (fun () -> cont v) tasks
let new_channel () =
let q = Queue.create () in
@@ -55,20 +53,19 @@ module Seq: S = struct
let doco l =
fun cont ->
- List.iter (fun proc -> Queue.push (fun () -> proc None) tasks) l;
- push_cont cont ()
+ List.iter (fun proc -> Queue.push (fun () -> proc (fun () -> ())) tasks) l;
+ cont ()
let return v =
- fun cont ->
- push_cont cont v
+ fun cont -> cont v
let bind (e : 'a process) (f : 'a -> 'b process) : 'b process =
fun cont ->
- e (Some (fun (r : 'a) -> f r cont))
+ e (fun (r : 'a) -> f r cont)
let run e =
let ret = ref None in
- e (Some (fun v -> ret := Some v));
+ e (fun v -> ret := Some v);
while not (Queue.is_empty tasks) do
let task = Queue.pop tasks in
task ()
diff --git a/src/kahn_stdio.ml b/src/kahn_stdio.ml
index 1bed0a1..4c8c976 100644
--- a/src/kahn_stdio.ml
+++ b/src/kahn_stdio.ml
@@ -7,7 +7,7 @@ open Proto
module ProtoKahn: S = struct
- type 'a process = (('a -> unit) option) -> unit
+ type 'a process = ('a -> unit) -> unit
type 'a channel = id
type 'a in_port = 'a channel
@@ -18,37 +18,25 @@ module ProtoKahn: S = struct
let task_desc t = Marshal.to_string t [Marshal.Closures]
- let send_task t is_io =
- send (GiveTask(task_desc t, is_io))
+ let send_task t =
+ send (GiveTask(task_desc t))
let new_channel () =
let x = new_id() in x, x
- let push_cont cont arg is_io =
- match cont with
- | None -> ()
- | Some cont ->
- send_task (fun () -> cont arg) is_io
-
let put v prt =
fun cont ->
send (Put(prt, Marshal.to_string v []));
- push_cont cont () false
+ cont ()
let get prt =
fun cont ->
- send (Get(prt,
- task_desc
- (fun s -> match cont with
- | None -> ()
- | Some cont -> cont (Marshal.from_string s 0))
- )
- )
+ send (Get(prt, task_desc (fun s -> cont (Marshal.from_string s 0))))
let output s = send (Output s)
- let select pl = fun cont -> assert false
- let select_default = fun cont -> assert false
+ let select pl = assert false (* Not Implemented *)
+ let select_default pl = assert false (* Not Implemented *)
let doco plist =
fun cont ->
@@ -56,26 +44,20 @@ module ProtoKahn: S = struct
List.iter
(fun p ->
send_task
- (fun () -> p
- (Some (fun () -> send (Put(f_ch_id, ""))))
- )
- false
- ) plist;
+ (fun () -> p (fun () -> send (Put(f_ch_id, "")))))
+ plist;
let rec push_x = function
- | 0 -> push_cont cont () false
+ | 0 -> cont ()
| n -> send (Get(f_ch_id, task_desc (fun s -> push_x (n-1))))
in push_x (List.length plist)
let return v =
- fun cont ->
- match cont with
- | None -> ()
- | Some cont -> cont v
+ fun cont -> cont v
let bind a f =
fun cont ->
- a (Some (fun va ->
- let b = (f va) in
- b cont))
+ a (fun va -> f va cont)
+
+ (* Main function *)
let origin = ref false
let dbg_out = ref false
@@ -106,7 +88,7 @@ module ProtoKahn: S = struct
send Hello;
if read () <> Hello then raise (ProtocolError "Server did not say Hello correctly.");
(* Start task if necessary *)
- if !origin then proc (Some (fun r -> send (FinalResult (Marshal.to_string r []))));
+ if !origin then proc (fun r -> send (FinalResult (Marshal.to_string r [])));
(* While there are things to do... *)
let result = ref None in
while !result = None do
@@ -114,7 +96,7 @@ module ProtoKahn: S = struct
send RequestTask;
dbg "Reading...";
match read() with
- | GiveTask(td, _) ->
+ | GiveTask(td) ->
dbg "Got task!";
let t : task = Marshal.from_string td 0 in
Format.eprintf "%s[%s@?" cseq ncseq;
diff --git a/src/manager.ml b/src/manager.ml
index 1e6a0df..9498d8c 100644
--- a/src/manager.ml
+++ b/src/manager.ml
@@ -21,7 +21,7 @@ let pool_count = ref 0
(* Server data structures *)
type task_el =
- | Task of task_descr * bool
+ | Task of task_descr
| MsgTask of string * msg_task_descr
type client_status =
@@ -82,7 +82,7 @@ let push_task server task =
c.send
(match task with
| MsgTask(a, b) -> GiveMsgTask(a, b)
- | Task(a, b) -> GiveTask(a, b))
+ | Task(a) -> GiveTask(a))
let get_task server =
Queue.pop server.tasks
@@ -172,7 +172,7 @@ let rec server_run server =
cli.status <- Waiting
else cli.send (match Queue.pop server.tasks with
| MsgTask(a, b) -> GiveMsgTask(a, b)
- | Task(a, b) -> GiveTask(a,b))
+ | Task(a) -> GiveTask(a))
| Some r ->
cli.send(FinalResult r);
client_disconnect server cli
@@ -197,9 +197,9 @@ let rec server_run server =
c.send(FinalResult x);
client_disconnect server c)
!p
- | GiveTask(a, b) ->
+ | GiveTask(a) ->
dbg "got Task";
- push_task server (Task(a, b))
+ push_task server (Task(a))
| GiveMsgTask(a, b) ->
dbg "got MsgTask";
push_task server (MsgTask(a, b))
diff --git a/src/primes.ml b/src/primes.ml
index a91b11c..6975b9e 100644
--- a/src/primes.ml
+++ b/src/primes.ml
@@ -41,7 +41,7 @@ module Primes (K : Kahn.S) = struct
end
-module Eng = Kahn_stdio.ProtoKahn
+module Eng = Kahn_seq.Seq
module P = Primes(Eng)
let () =
diff --git a/src/proto.ml b/src/proto.ml
index 1f6d8e6..df41944 100644
--- a/src/proto.ml
+++ b/src/proto.ml
@@ -4,6 +4,7 @@ open Util
type task = unit -> unit
type msg_task = string -> unit
+
type task_descr = string
type msg_task_descr = string
@@ -15,7 +16,7 @@ type message =
| Get of id * msg_task_descr
| Put of id * string
| RequestTask
- | GiveTask of task_descr * bool
+ | GiveTask of task_descr
| GiveMsgTask of string * msg_task_descr
| FinalResult of string