From c5e69a904e79e807c5b075c08ce82183133e7b4c Mon Sep 17 00:00:00 2001 From: Alex AUVOLAT Date: Tue, 20 May 2014 11:14:01 +0200 Subject: Stuff. --- src/kahn_sock.ml | 215 ++++++++++++++++++++++++++++--------------------------- 1 file changed, 110 insertions(+), 105 deletions(-) (limited to 'src/kahn_sock.ml') diff --git a/src/kahn_sock.ml b/src/kahn_sock.ml index 89ee65c..bce6cf2 100644 --- a/src/kahn_sock.ml +++ b/src/kahn_sock.ml @@ -1,115 +1,120 @@ -Random.self_init () - -type ident = (int * int * int * int) -let gen_ident () = - Random.int 1000000000, Random.int 1000000000, - Random.int 1000000000, Random.int 1000000000 - -module Sock : Kahn.S = struct - - (* L'idée : - - L'ensemble des noeuds qui font du calcul est un arbre. - Le premier noeud lancé est la racine de l'arbre ; tous les - noeuds qui se connectent par la suite se connectent à un - noeud déjà présent et sont donc son fils. - - Les processus sont des fermetures de type unit -> unit, - transmises par des canaux - - Un noeud de calcul est un processus ocaml avec un seul - thread. Le parallélisme est coopératif (penser à faire - des binds assez souvent). - - Les noeuds publient régulièrement leur load, ie le nombre - de processus en attente et qui ne sont pas en train - d'attendre des données depuis un canal. Si un noeud a un - voisin dont le load est plus faible que le sien d'une - quantité plus grande que 2, il délègue une tâche. - - Le noeud racine délègue toutes ses tâches et sert uniquement - pour les entrées-sorties - - Comportement indéterminé lorsqu'un noeud se déconnecte - (des processus peuvent disparaître, le réseau est cassé...) - - Les canaux sont identifiés par le type ident décrit - ci-dessus. Lorsque quelqu'un écrit sur un canal, tout le - monde le sait. Lorsque quelqu'un lit sur un canal, tout le - monde le sait. (on n'est pas capable de déterminer - quel est le noeud propriétaire du processus devant lire - le message) Les communications sont donc coûteuses. - - On garantit que si chaque canal est lu par un processus - et écrit par un autre, alors l'ordre des messages est - conservé. On ne garantit pas l'ordre s'il y a plusieurs - écrivains, et on est à peu près sûrs que le réseau va - planter s'il y a plusieurs lecteurs. - *) +open Kahn +open Unix + +(* make_addr : string -> int -> sockaddr *) +let make_addr host port = + let host = gethostbyname host in + ADDR_INET(host.h_addr_list.(Random.int (Array.length host.h_addr_list)), port) + +module Sock: S = struct + + let kahn_port = 8197 type 'a process = (('a -> unit) option) -> unit - type 'a in_port = ident - type 'a out_port = ident - type task = unit -> unit + type 'a channel = int + type 'a in_port = 'a channel + type 'a out_port = 'a channel + type task = unit -> unit let tasks = Queue.create () - let read_wait_tasks = Hashtbl.create 42 - let channels = Hashtbl.create 42 - - type host_id = string - type message = host_id * message_data - (* message contains sender ID *) - and message_data = + let socket_to_srv = ref None + + type cli_msg = + | Hello + | Put of int * string + | Get of int * (string -> task) + | AskTask + | GiveTask of task + | GiveIOTask of task + | FinalResult of string + type srv_msg = | Hello - | LoadAdvert of host_id * int - (* Host X has N tasks waiting *) - | Delegate of task - (* I need you to do this for me *) - | SendChan of ident * string - (* Put message in buffer *) - | RecvChan of ident - (* Read message from buffer (everybody clear it from - memory !) *) - | IOWrite of string - | Bye - - let peers = Hashtbl.create 12 (* host_id -> in_chan * out_chan *) - let parent = ref "" (* id of parent *) - let myself = ref "" - - let tell peer msg = - let _, o = Hashtbl.find peers peer in - Marshall.to_channel o msg - - let tell_all msg = - Hashtbl.iter peers - (fun _ (_, o) -> Marshall.to_channel o msg) - - let tell_all_except peer msg = - Hashtbl.iter peers - (fun k (_, o) -> if k <> peer then - Marshall.to_channel o msg) - - let io_read () = "" - let io_write msg = - tell !parent (!myself, IOWrite msg) - - let new_channel () = - let x = gen_ident () in x, x - - let put port x = - fun cont -> - tell_all (!myself, SendChan(port, Marshal.to_string x)); - match cont with - | Some cont -> Queue.push cont tasks - | None -> () - - let rec get port = - fun cont -> - try - let p = Hashtbl.find channels port in - let v = Queue.pop p in - tell_all (!myself, RecvChan port) - match cont with - | None -> () - | Some -> Queue.push (fun () -> cont v) tasks - with _ -> (* no message in queue *) - Hashtbl.add read_wait_tasks - port (fun () -> get port cont) + | GiveTask of task + | PleaseWait + | FinalResult of string + + + let rec tell_server (msg : cli_msg) = + match !socket_to_srv with + | Some s -> Marshal.to_channel s msg [Marshal.Closures]; flush s + | None -> handle_msg_server (fun _ -> assert false) msg + + and handle_msg_server (reply_fun : srv_msg -> unit) = function + | Hello -> reply_fun Hello + | _ -> () (* TODO *) + + and client host = + (* Initialize socket *) + let sock = socket PF_INET SOCK_STREAM 0 in + connect sock (make_addr host kahn_port); + let i, o = in_channel_of_descr sock, out_channel_of_descr sock in + socket_to_srv := Some o; + let get_msg () = Marshal.from_channel i in + (* Initialize protocol *) + tell_server Hello; + assert (get_msg() = Hello); + (* Loop *) + let rec loop () = + tell_server AskTask; + match get_msg () with + | Hello -> assert false + | GiveTask task -> task (); loop () + | PleaseWait -> sleep 2; loop () + | FinalResult s -> Marshal.from_string s + in + let result = loop() in + shutdown sock SHUTDOWN_ALL; + result + + and server e = + (* Initialize task list *) + push_task (fun () -> e None); + + (* Initialize socket *) + let sock = socket PF_INET SOCK_STREAM 0 in + bind sock (make_addr "0.0.0.0" kahn_port); + listen sock 10; + let stop_srv _ = + Format.eprintf "Shutdown server...@."; + shutdown sock SHUTDOWN_ALL; + exit 0 + in + Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv); + Sys.set_signal Sys.sigterm (Sys.Signal_handle stop_srv); + + (* Loop *) + let clients = ref [] in + while true do + let fds = List.map (fun (i, o, a) -> descr_of_in_channel i) !clients in + match select (sock::fds) [] [] (-1.0) with + | s::_, _, _ when s = sock -> + (* New client ! *) + let fd, addr = accept sock in + clients := + (in_channel_of_descr fd, + out_channel_of_descr fd, + addr)::!clients + | s::_, _, _ -> + (* Client sent something *) + let i, o, a = List.find + (fun (i, _, _) -> descr_of_in_channel i = s) !clients in + let msg = Marshal.from_channel i in + handle_msg_server + (fun m -> Marshal.to_channel o m [Marshall.Closures]; flush o) + msg + | _ -> assert false + done + + let srv = ref "" + let set_var v s = v := s + let run e = + Arg.parse [] (set_var srv) "usage: kahn [server_addr]"; + if !srv = "" then + server e + else + client !sr end -- cgit v1.2.3