summaryrefslogtreecommitdiff
path: root/src/kahn_sock.ml
diff options
context:
space:
mode:
Diffstat (limited to 'src/kahn_sock.ml')
-rw-r--r--src/kahn_sock.ml215
1 files changed, 110 insertions, 105 deletions
diff --git a/src/kahn_sock.ml b/src/kahn_sock.ml
index 89ee65c..bce6cf2 100644
--- a/src/kahn_sock.ml
+++ b/src/kahn_sock.ml
@@ -1,115 +1,120 @@
-Random.self_init ()
-
-type ident = (int * int * int * int)
-let gen_ident () =
- Random.int 1000000000, Random.int 1000000000,
- Random.int 1000000000, Random.int 1000000000
-
-module Sock : Kahn.S = struct
-
- (* L'idée :
- - L'ensemble des noeuds qui font du calcul est un arbre.
- Le premier noeud lancé est la racine de l'arbre ; tous les
- noeuds qui se connectent par la suite se connectent à un
- noeud déjà présent et sont donc son fils.
- - Les processus sont des fermetures de type unit -> unit,
- transmises par des canaux
- - Un noeud de calcul est un processus ocaml avec un seul
- thread. Le parallélisme est coopératif (penser à faire
- des binds assez souvent).
- - Les noeuds publient régulièrement leur load, ie le nombre
- de processus en attente et qui ne sont pas en train
- d'attendre des données depuis un canal. Si un noeud a un
- voisin dont le load est plus faible que le sien d'une
- quantité plus grande que 2, il délègue une tâche.
- - Le noeud racine délègue toutes ses tâches et sert uniquement
- pour les entrées-sorties
- - Comportement indéterminé lorsqu'un noeud se déconnecte
- (des processus peuvent disparaître, le réseau est cassé...)
- - Les canaux sont identifiés par le type ident décrit
- ci-dessus. Lorsque quelqu'un écrit sur un canal, tout le
- monde le sait. Lorsque quelqu'un lit sur un canal, tout le
- monde le sait. (on n'est pas capable de déterminer
- quel est le noeud propriétaire du processus devant lire
- le message) Les communications sont donc coûteuses.
- - On garantit que si chaque canal est lu par un processus
- et écrit par un autre, alors l'ordre des messages est
- conservé. On ne garantit pas l'ordre s'il y a plusieurs
- écrivains, et on est à peu près sûrs que le réseau va
- planter s'il y a plusieurs lecteurs.
- *)
+open Kahn
+open Unix
+
+(* make_addr : string -> int -> sockaddr *)
+let make_addr host port =
+ let host = gethostbyname host in
+ ADDR_INET(host.h_addr_list.(Random.int (Array.length host.h_addr_list)), port)
+
+module Sock: S = struct
+
+ let kahn_port = 8197
type 'a process = (('a -> unit) option) -> unit
- type 'a in_port = ident
- type 'a out_port = ident
- type task = unit -> unit
+ type 'a channel = int
+ type 'a in_port = 'a channel
+ type 'a out_port = 'a channel
+ type task = unit -> unit
let tasks = Queue.create ()
- let read_wait_tasks = Hashtbl.create 42
- let channels = Hashtbl.create 42
-
- type host_id = string
- type message = host_id * message_data
- (* message contains sender ID *)
- and message_data =
+ let socket_to_srv = ref None
+
+ type cli_msg =
+ | Hello
+ | Put of int * string
+ | Get of int * (string -> task)
+ | AskTask
+ | GiveTask of task
+ | GiveIOTask of task
+ | FinalResult of string
+ type srv_msg =
| Hello
- | LoadAdvert of host_id * int
- (* Host X has N tasks waiting *)
- | Delegate of task
- (* I need you to do this for me *)
- | SendChan of ident * string
- (* Put message in buffer *)
- | RecvChan of ident
- (* Read message from buffer (everybody clear it from
- memory !) *)
- | IOWrite of string
- | Bye
-
- let peers = Hashtbl.create 12 (* host_id -> in_chan * out_chan *)
- let parent = ref "" (* id of parent *)
- let myself = ref ""
-
- let tell peer msg =
- let _, o = Hashtbl.find peers peer in
- Marshall.to_channel o msg
-
- let tell_all msg =
- Hashtbl.iter peers
- (fun _ (_, o) -> Marshall.to_channel o msg)
-
- let tell_all_except peer msg =
- Hashtbl.iter peers
- (fun k (_, o) -> if k <> peer then
- Marshall.to_channel o msg)
-
- let io_read () = ""
- let io_write msg =
- tell !parent (!myself, IOWrite msg)
-
- let new_channel () =
- let x = gen_ident () in x, x
-
- let put port x =
- fun cont ->
- tell_all (!myself, SendChan(port, Marshal.to_string x));
- match cont with
- | Some cont -> Queue.push cont tasks
- | None -> ()
-
- let rec get port =
- fun cont ->
- try
- let p = Hashtbl.find channels port in
- let v = Queue.pop p in
- tell_all (!myself, RecvChan port)
- match cont with
- | None -> ()
- | Some -> Queue.push (fun () -> cont v) tasks
- with _ -> (* no message in queue *)
- Hashtbl.add read_wait_tasks
- port (fun () -> get port cont)
+ | GiveTask of task
+ | PleaseWait
+ | FinalResult of string
+
+
+ let rec tell_server (msg : cli_msg) =
+ match !socket_to_srv with
+ | Some s -> Marshal.to_channel s msg [Marshal.Closures]; flush s
+ | None -> handle_msg_server (fun _ -> assert false) msg
+
+ and handle_msg_server (reply_fun : srv_msg -> unit) = function
+ | Hello -> reply_fun Hello
+ | _ -> () (* TODO *)
+
+ and client host =
+ (* Initialize socket *)
+ let sock = socket PF_INET SOCK_STREAM 0 in
+ connect sock (make_addr host kahn_port);
+ let i, o = in_channel_of_descr sock, out_channel_of_descr sock in
+ socket_to_srv := Some o;
+ let get_msg () = Marshal.from_channel i in
+ (* Initialize protocol *)
+ tell_server Hello;
+ assert (get_msg() = Hello);
+ (* Loop *)
+ let rec loop () =
+ tell_server AskTask;
+ match get_msg () with
+ | Hello -> assert false
+ | GiveTask task -> task (); loop ()
+ | PleaseWait -> sleep 2; loop ()
+ | FinalResult s -> Marshal.from_string s
+ in
+ let result = loop() in
+ shutdown sock SHUTDOWN_ALL;
+ result
+
+ and server e =
+ (* Initialize task list *)
+ push_task (fun () -> e None);
+
+ (* Initialize socket *)
+ let sock = socket PF_INET SOCK_STREAM 0 in
+ bind sock (make_addr "0.0.0.0" kahn_port);
+ listen sock 10;
+ let stop_srv _ =
+ Format.eprintf "Shutdown server...@.";
+ shutdown sock SHUTDOWN_ALL;
+ exit 0
+ in
+ Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv);
+ Sys.set_signal Sys.sigterm (Sys.Signal_handle stop_srv);
+
+ (* Loop *)
+ let clients = ref [] in
+ while true do
+ let fds = List.map (fun (i, o, a) -> descr_of_in_channel i) !clients in
+ match select (sock::fds) [] [] (-1.0) with
+ | s::_, _, _ when s = sock ->
+ (* New client ! *)
+ let fd, addr = accept sock in
+ clients :=
+ (in_channel_of_descr fd,
+ out_channel_of_descr fd,
+ addr)::!clients
+ | s::_, _, _ ->
+ (* Client sent something *)
+ let i, o, a = List.find
+ (fun (i, _, _) -> descr_of_in_channel i = s) !clients in
+ let msg = Marshal.from_channel i in
+ handle_msg_server
+ (fun m -> Marshal.to_channel o m [Marshall.Closures]; flush o)
+ msg
+ | _ -> assert false
+ done
+
+ let srv = ref ""
+ let set_var v s = v := s
+ let run e =
+ Arg.parse [] (set_var srv) "usage: kahn [server_addr]";
+ if !srv = "" then
+ server e
+ else
+ client !sr
end