summaryrefslogtreecommitdiff
path: root/src/poolclient.ml
blob: 00b0d355602a3d159ce0c28393bef67ffeee183d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
open Unix
open Util
open Proto

let pool_port = ref 9082
let pool_server = ref ""
let provide = ref 4

let fullfill_request task (addr, port) n =
	for i = 0 to n-1 do
		Format.eprintf "Spawn %s@." task;
		if fork() = 0 then begin
			let sock = socket PF_INET SOCK_STREAM 0 in
			connect sock (make_addr addr port);
			dup2 sock stdin;
			dup2 sock stdout;
			execv task [|task|]
		end
	done

let run_client () =
	let sock = socket PF_INET SOCK_STREAM 0 in
	connect sock (make_addr !pool_server !pool_port);
	Format.eprintf "Connected.@.";

	let outc = out_channel_of_descr sock in
	let send m = Marshal.to_channel outc m []; flush outc in

	send PoolHello;
	if read_one_msg sock <> PoolHello then
		raise (ProtocolError "Expected PoolHello reply.");
	
	Format.eprintf "Provide %d@." !provide;
	send (PoolProvide !provide);

	let cont = ref true in
	while !cont do
		let qi, _, qe = select [sock] [] [sock] 1.0 in
		begin match qi, qe with
		| a::_, _ ->
			begin match read_one_msg sock with
			| PoolRequest(task, addr, n) ->
				fullfill_request task addr n
			| _ -> raise (ProtocolError "Unexpected message.")
			end
		| _, b::_ ->
			shutdown sock SHUTDOWN_ALL;
			close sock;
			cont := false
		| _ -> ()
		end;
		try match waitpid [WNOHANG] (-1) with
		| x, _ when x > 0 ->
			send (PoolProvide 1)
		| _ -> ()
		with _ -> ()
	done

let () =
	let usage = "Usage: ./poolclient [options] server" in
	let options = [
		"-port", Arg.Set_int pool_port, "Set port for pooling server.";
		"-provide", Arg.Set_int provide, "Number of processes to provide.";
	] in
	Arg.parse options (fun s -> pool_server := s) usage;

	if !pool_server = "" then begin
		Format.eprintf "%s@." usage;
		exit 0
	end;

	run_client ()