summaryrefslogblamecommitdiff
path: root/src/kahn.ml
blob: a02ee24757c8ea2fd63f48f20de314559d4a8218 (plain) (tree)
1
2
3
4
5
6
7
8
                   


                        
 

                                    
 


                                                         
 
                                                   
 



                                                                



                           
























                                                                                                    




                     









































                                                                        


                      




























































                                                                                         


   

                       














































                                                                               
   
module type S = sig
	type 'a process
	type 'a in_port
	type 'a out_port

	val io_read: unit -> string
	val io_write: string -> unit

	val new_channel: unit -> 'a in_port * 'a out_port
	val put: 'a -> 'a out_port -> unit process
	val get: 'a in_port -> 'a process

	val doco: unit process list -> unit process

	val return: 'a -> 'a process
	val bind: 'a process -> ('a -> 'b process) -> 'b process

	val run: 'a process -> 'a
end

module Lib (K : S) = struct

	let ( >>= ) x f = K.bind x f

	let delay f x =
		K.bind (K.return ()) (fun () -> K.return (f x))

	let par_map f l =
		let rec build_workers l (ports, workers) =
			match l with
			| [] -> (ports, workers)
			| x :: l ->
					let qi, qo = K.new_channel () in
					build_workers
						l
						(qi :: ports,
						 ((delay f x) >>= (fun v -> K.put v qo)) :: workers)
		in
		let ports, workers = build_workers l ([], []) in
		let rec collect l acc qo =
			match l with
			| [] -> K.put acc qo
			| qi :: l -> (K.get qi) >>= (fun v -> collect l (v :: acc) qo)
		in
		let qi, qo = K.new_channel () in
		K.run
			((K.doco ((collect ports [] qo) :: workers)) >>= (fun _ -> K.get qi))

end


module Th: S = struct
	type 'a process = (unit -> 'a)

	type 'a channel = { q: 'a Queue.t ; m: Mutex.t; }
	type 'a in_port = 'a channel
	type 'a out_port = 'a channel

	let new_channel () =
		let q = { q = Queue.create (); m = Mutex.create (); } in
		q, q

	let io_read () = ""
	let io_write s = print_string s; flush stdout

	let put v c () =
		Mutex.lock c.m;
		Queue.push v c.q;
		Mutex.unlock c.m;
		Thread.yield ()

	let rec get c () =
		try
			Mutex.lock c.m;
			let v = Queue.pop c.q in
			Mutex.unlock c.m;
			v
		with Queue.Empty ->
			Mutex.unlock c.m;
			Thread.yield ();
			get c ()

	let doco l () =
		let ths = List.map (fun f -> Thread.create f ()) l in
		List.iter (fun th -> Thread.join th) ths

	let return v = (fun () -> v)

	let bind e e' () =
		let v = e () in
		Thread.yield ();
		e' v ()

	let run e = e ()
end

module Seq: S = struct
	type 'a process = (('a -> unit) option) -> unit

	type 'a channel = 'a Queue.t
	type 'a in_port = 'a channel
	type 'a out_port = 'a channel

	type task = unit -> unit

	let tasks = Queue.create ()

	let io_read () = ""
	let io_write s = print_string s; flush stdout

	let new_channel () =
		let q = Queue.create () in
		q, q

	let put x c =
		fun cont ->
			Queue.push x c;
			match cont with
			| None -> ()
			| Some cont ->	Queue.push cont tasks

	let rec get c =
		fun cont ->
			try
				let v = Queue.pop c in
				match cont with
				| None -> ()
				| Some cont -> Queue.push (fun () -> cont v) tasks
			with Queue.Empty ->
				Queue.push (fun () -> get c cont) tasks

	let doco l =
		fun cont ->
			List.iter (fun proc -> Queue.push (fun () -> proc None) tasks) l;
			match cont with
			| None -> ()
			| Some cont -> Queue.push cont tasks

	let return v =
		fun cont ->
			match cont with
			| None -> ()
			| Some cont -> Queue.push (fun () -> cont v) tasks

	let bind e f =
		fun cont ->
			Queue.push (fun () -> e (Some (fun r -> f r cont))) tasks
		
	let run e =
		let ret = ref None in
		e (Some (fun v -> ret := Some v));
		while not (Queue.is_empty tasks) do
			let task = Queue.pop tasks in
			task ()
		done;
		match !ret with
		| Some k -> k
		| None -> assert false

end


module Pipe: S = struct
	type 'a process = unit -> 'a

	type 'a in_port = in_channel
	type 'a out_port = out_channel

	let children = ref []

	let io_read () = ""
	let io_write s = print_string s; flush stdout

	let new_channel =
		fun () ->
			let i, o = Unix.pipe () in
			Unix.in_channel_of_descr i, Unix.out_channel_of_descr o

	let get c =
		fun () -> Marshal.from_channel c

	let put x c =
		fun () -> Marshal.to_channel c x []


	let return v =
		fun () -> v

	let bind e f =
		fun () -> f (e ()) ()

	let run p =
		let v = p() in
		List.iter
			(fun x -> try ignore(Unix.waitpid [] x) with _ -> ())
			!children;
		v

	let doco l =
		fun () ->
			List.iter (fun p ->
				let i = Unix.fork () in
				if i = 0 then begin
					children := [];
					run p;
					exit 0
				end else begin
					children := i::!children
				end)
			l
end