From c6313368d3719e507eba4216a4fc4ea9f30db576 Mon Sep 17 00:00:00 2001 From: Alex AUVOLAT Date: Fri, 21 Mar 2014 13:57:16 +0100 Subject: Retab files ; start work on networked implementation. --- src/kahnsock.ml | 142 ++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 106 insertions(+), 36 deletions(-) (limited to 'src/kahnsock.ml') diff --git a/src/kahnsock.ml b/src/kahnsock.ml index 6cdfadd..89ee65c 100644 --- a/src/kahnsock.ml +++ b/src/kahnsock.ml @@ -2,44 +2,114 @@ Random.self_init () type ident = (int * int * int * int) let gen_ident () = - Random.int 1000000000, Random.int 1000000000, - Random.int 1000000000, Random.int 1000000000 + Random.int 1000000000, Random.int 1000000000, + Random.int 1000000000, Random.int 1000000000 module Sock : Kahn.S = struct - (* L'idée : - - L'ensemble des noeuds qui font du calcul est un arbre. - Le premier noeud lancé est la racine de l'arbre ; tous les - noeuds qui se connectent par la suite se connectent à un - noeud déjà présent et sont donc son fils. - - Les processus sont des fermetures de type unit -> unit, - transmises par des canaux - - Un noeud de calcul est un processus ocaml avec un seul - thread. Le parallélisme est coopératif (penser à faire - des binds assez souvent). - - Les noeuds publient régulièrement leur load, ie le nombre - de processus en attente et qui ne sont pas en train - d'attendre des données depuis un canal. Si un noeud a un - voisin dont le load est plus faible que le sien d'une - quantité plus grande que 2, il délègue une tâche. - - Le noeud racine délègue toutes ses tâches et sert uniquement - pour les entrées-sorties - - Comportement indéterminé lorsqu'un noeud se déconnecte - (des processus peuvent disparaître, le réseau est cassé...) - - Les canaux sont identifiés par le type ident décrit - ci-dessus. Lorsque quelqu'un écrit sur un canal, tout le - monde le sait. Lorsque quelqu'un lit sur un canal, tout le - monde le sait. (on n'est pas capable de déterminer - quel est le noeud propriétaire du processus devant lire - le message) Les communications sont donc coûteuses. - *) - - type 'a process = (unit -> 'a) - - type 'a in_port = ident - type 'a out_port = ident - - let cin = (0, 0, 0, 0) - let cout = (0, 0, 0, 1) + (* L'idée : + - L'ensemble des noeuds qui font du calcul est un arbre. + Le premier noeud lancé est la racine de l'arbre ; tous les + noeuds qui se connectent par la suite se connectent à un + noeud déjà présent et sont donc son fils. + - Les processus sont des fermetures de type unit -> unit, + transmises par des canaux + - Un noeud de calcul est un processus ocaml avec un seul + thread. Le parallélisme est coopératif (penser à faire + des binds assez souvent). + - Les noeuds publient régulièrement leur load, ie le nombre + de processus en attente et qui ne sont pas en train + d'attendre des données depuis un canal. Si un noeud a un + voisin dont le load est plus faible que le sien d'une + quantité plus grande que 2, il délègue une tâche. + - Le noeud racine délègue toutes ses tâches et sert uniquement + pour les entrées-sorties + - Comportement indéterminé lorsqu'un noeud se déconnecte + (des processus peuvent disparaître, le réseau est cassé...) + - Les canaux sont identifiés par le type ident décrit + ci-dessus. Lorsque quelqu'un écrit sur un canal, tout le + monde le sait. Lorsque quelqu'un lit sur un canal, tout le + monde le sait. (on n'est pas capable de déterminer + quel est le noeud propriétaire du processus devant lire + le message) Les communications sont donc coûteuses. + - On garantit que si chaque canal est lu par un processus + et écrit par un autre, alors l'ordre des messages est + conservé. On ne garantit pas l'ordre s'il y a plusieurs + écrivains, et on est à peu près sûrs que le réseau va + planter s'il y a plusieurs lecteurs. + *) + + type 'a process = (('a -> unit) option) -> unit + + type 'a in_port = ident + type 'a out_port = ident + + type task = unit -> unit + + let tasks = Queue.create () + let read_wait_tasks = Hashtbl.create 42 + + let channels = Hashtbl.create 42 + + type host_id = string + type message = host_id * message_data + (* message contains sender ID *) + and message_data = + | Hello + | LoadAdvert of host_id * int + (* Host X has N tasks waiting *) + | Delegate of task + (* I need you to do this for me *) + | SendChan of ident * string + (* Put message in buffer *) + | RecvChan of ident + (* Read message from buffer (everybody clear it from + memory !) *) + | IOWrite of string + | Bye + + let peers = Hashtbl.create 12 (* host_id -> in_chan * out_chan *) + let parent = ref "" (* id of parent *) + let myself = ref "" + + let tell peer msg = + let _, o = Hashtbl.find peers peer in + Marshall.to_channel o msg + + let tell_all msg = + Hashtbl.iter peers + (fun _ (_, o) -> Marshall.to_channel o msg) + + let tell_all_except peer msg = + Hashtbl.iter peers + (fun k (_, o) -> if k <> peer then + Marshall.to_channel o msg) + + let io_read () = "" + let io_write msg = + tell !parent (!myself, IOWrite msg) + + let new_channel () = + let x = gen_ident () in x, x + + let put port x = + fun cont -> + tell_all (!myself, SendChan(port, Marshal.to_string x)); + match cont with + | Some cont -> Queue.push cont tasks + | None -> () + + let rec get port = + fun cont -> + try + let p = Hashtbl.find channels port in + let v = Queue.pop p in + tell_all (!myself, RecvChan port) + match cont with + | None -> () + | Some -> Queue.push (fun () -> cont v) tasks + with _ -> (* no message in queue *) + Hashtbl.add read_wait_tasks + port (fun () -> get port cont) end -- cgit v1.2.3