From c6313368d3719e507eba4216a4fc4ea9f30db576 Mon Sep 17 00:00:00 2001 From: Alex AUVOLAT Date: Fri, 21 Mar 2014 13:57:16 +0100 Subject: Retab files ; start work on networked implementation. --- src/kahn.ml | 364 +++++++++++++++++++++++++++++++----------------------------- 1 file changed, 188 insertions(+), 176 deletions(-) (limited to 'src/kahn.ml') diff --git a/src/kahn.ml b/src/kahn.ml index 5229f7e..a02ee24 100644 --- a/src/kahn.ml +++ b/src/kahn.ml @@ -1,199 +1,211 @@ module type S = sig - type 'a process - type 'a in_port - type 'a out_port + type 'a process + type 'a in_port + type 'a out_port - val new_channel: unit -> 'a in_port * 'a out_port - val put: 'a -> 'a out_port -> unit process - val get: 'a in_port -> 'a process + val io_read: unit -> string + val io_write: string -> unit - val doco: unit process list -> unit process + val new_channel: unit -> 'a in_port * 'a out_port + val put: 'a -> 'a out_port -> unit process + val get: 'a in_port -> 'a process - val return: 'a -> 'a process - val bind: 'a process -> ('a -> 'b process) -> 'b process + val doco: unit process list -> unit process - val run: 'a process -> 'a + val return: 'a -> 'a process + val bind: 'a process -> ('a -> 'b process) -> 'b process + + val run: 'a process -> 'a end module Lib (K : S) = struct - let ( >>= ) x f = K.bind x f - - let delay f x = - K.bind (K.return ()) (fun () -> K.return (f x)) - - let par_map f l = - let rec build_workers l (ports, workers) = - match l with - | [] -> (ports, workers) - | x :: l -> - let qi, qo = K.new_channel () in - build_workers - l - (qi :: ports, - ((delay f x) >>= (fun v -> K.put v qo)) :: workers) - in - let ports, workers = build_workers l ([], []) in - let rec collect l acc qo = - match l with - | [] -> K.put acc qo - | qi :: l -> (K.get qi) >>= (fun v -> collect l (v :: acc) qo) - in - let qi, qo = K.new_channel () in - K.run - ((K.doco ((collect ports [] qo) :: workers)) >>= (fun _ -> K.get qi)) + let ( >>= ) x f = K.bind x f + + let delay f x = + K.bind (K.return ()) (fun () -> K.return (f x)) + + let par_map f l = + let rec build_workers l (ports, workers) = + match l with + | [] -> (ports, workers) + | x :: l -> + let qi, qo = K.new_channel () in + build_workers + l + (qi :: ports, + ((delay f x) >>= (fun v -> K.put v qo)) :: workers) + in + let ports, workers = build_workers l ([], []) in + let rec collect l acc qo = + match l with + | [] -> K.put acc qo + | qi :: l -> (K.get qi) >>= (fun v -> collect l (v :: acc) qo) + in + let qi, qo = K.new_channel () in + K.run + ((K.doco ((collect ports [] qo) :: workers)) >>= (fun _ -> K.get qi)) end module Th: S = struct - type 'a process = (unit -> 'a) - - type 'a channel = { q: 'a Queue.t ; m: Mutex.t; } - type 'a in_port = 'a channel - type 'a out_port = 'a channel - - let new_channel () = - let q = { q = Queue.create (); m = Mutex.create (); } in - q, q - - let put v c () = - Mutex.lock c.m; - Queue.push v c.q; - Mutex.unlock c.m; - Thread.yield () - - let rec get c () = - try - Mutex.lock c.m; - let v = Queue.pop c.q in - Mutex.unlock c.m; - v - with Queue.Empty -> - Mutex.unlock c.m; - Thread.yield (); - get c () - - let doco l () = - let ths = List.map (fun f -> Thread.create f ()) l in - List.iter (fun th -> Thread.join th) ths - - let return v = (fun () -> v) - - let bind e e' () = - let v = e () in - Thread.yield (); - e' v () - - let run e = e () + type 'a process = (unit -> 'a) + + type 'a channel = { q: 'a Queue.t ; m: Mutex.t; } + type 'a in_port = 'a channel + type 'a out_port = 'a channel + + let new_channel () = + let q = { q = Queue.create (); m = Mutex.create (); } in + q, q + + let io_read () = "" + let io_write s = print_string s; flush stdout + + let put v c () = + Mutex.lock c.m; + Queue.push v c.q; + Mutex.unlock c.m; + Thread.yield () + + let rec get c () = + try + Mutex.lock c.m; + let v = Queue.pop c.q in + Mutex.unlock c.m; + v + with Queue.Empty -> + Mutex.unlock c.m; + Thread.yield (); + get c () + + let doco l () = + let ths = List.map (fun f -> Thread.create f ()) l in + List.iter (fun th -> Thread.join th) ths + + let return v = (fun () -> v) + + let bind e e' () = + let v = e () in + Thread.yield (); + e' v () + + let run e = e () end module Seq: S = struct - type 'a process = (('a -> unit) option) -> unit - - type 'a channel = 'a Queue.t - type 'a in_port = 'a channel - type 'a out_port = 'a channel - - type task = unit -> unit - - let tasks = Queue.create () - - let new_channel () = - let q = Queue.create () in - q, q - - let put x c = - fun cont -> - Queue.push x c; - match cont with - | None -> () - | Some cont -> Queue.push cont tasks - - let rec get c = - fun cont -> - try - let v = Queue.pop c in - match cont with - | None -> () - | Some cont -> Queue.push (fun () -> cont v) tasks - with Queue.Empty -> - Queue.push (fun () -> get c cont) tasks - - let doco l = - fun cont -> - List.iter (fun proc -> Queue.push (fun () -> proc None) tasks) l; - match cont with - | None -> () - | Some cont -> Queue.push cont tasks - - let return v = - fun cont -> - match cont with - | None -> () - | Some cont -> Queue.push (fun () -> cont v) tasks - - let bind e f = - fun cont -> - Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks - - let run e = - let ret = ref None in - e (Some (fun v -> ret := Some v)); - while not (Queue.is_empty tasks) do - let task = Queue.pop tasks in - task () - done; - match !ret with - | Some k -> k - | None -> assert false + type 'a process = (('a -> unit) option) -> unit + + type 'a channel = 'a Queue.t + type 'a in_port = 'a channel + type 'a out_port = 'a channel + + type task = unit -> unit + + let tasks = Queue.create () + + let io_read () = "" + let io_write s = print_string s; flush stdout + + let new_channel () = + let q = Queue.create () in + q, q + + let put x c = + fun cont -> + Queue.push x c; + match cont with + | None -> () + | Some cont -> Queue.push cont tasks + + let rec get c = + fun cont -> + try + let v = Queue.pop c in + match cont with + | None -> () + | Some cont -> Queue.push (fun () -> cont v) tasks + with Queue.Empty -> + Queue.push (fun () -> get c cont) tasks + + let doco l = + fun cont -> + List.iter (fun proc -> Queue.push (fun () -> proc None) tasks) l; + match cont with + | None -> () + | Some cont -> Queue.push cont tasks + + let return v = + fun cont -> + match cont with + | None -> () + | Some cont -> Queue.push (fun () -> cont v) tasks + + let bind e f = + fun cont -> + Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks + + let run e = + let ret = ref None in + e (Some (fun v -> ret := Some v)); + while not (Queue.is_empty tasks) do + let task = Queue.pop tasks in + task () + done; + match !ret with + | Some k -> k + | None -> assert false end module Pipe: S = struct - type 'a process = unit -> 'a - - type 'a in_port = in_channel - type 'a out_port = out_channel - - let children = ref [] - - let new_channel = - fun () -> - let i, o = Unix.pipe () in - Unix.in_channel_of_descr i, Unix.out_channel_of_descr o - - let get c = - fun () -> Marshal.from_channel c - - let put x c = - fun () -> Marshal.to_channel c x [] - - - let return v = - fun () -> v - - let bind e f = - fun () -> f (e ()) () - - let run p = - let v = p() in - List.iter - (fun x -> try ignore(Unix.waitpid [] x) with _ -> ()) - !children; - v - - let doco l = - fun () -> - List.iter (fun p -> - let i = Unix.fork () in - if i = 0 then begin - children := []; - run p; - exit 0 - end else begin - children := i::!children - end) - l + type 'a process = unit -> 'a + + type 'a in_port = in_channel + type 'a out_port = out_channel + + let children = ref [] + + let io_read () = "" + let io_write s = print_string s; flush stdout + + let new_channel = + fun () -> + let i, o = Unix.pipe () in + Unix.in_channel_of_descr i, Unix.out_channel_of_descr o + + let get c = + fun () -> Marshal.from_channel c + + let put x c = + fun () -> Marshal.to_channel c x [] + + + let return v = + fun () -> v + + let bind e f = + fun () -> f (e ()) () + + let run p = + let v = p() in + List.iter + (fun x -> try ignore(Unix.waitpid [] x) with _ -> ()) + !children; + v + + let doco l = + fun () -> + List.iter (fun p -> + let i = Unix.fork () in + if i = 0 then begin + children := []; + run p; + exit 0 + end else begin + children := i::!children + end) + l end -- cgit v1.2.3