From 4d3f12f167729ecb5de5b9bf8e18f9eca52beced Mon Sep 17 00:00:00 2001 From: Alex AUVOLAT Date: Sun, 25 May 2014 23:26:38 +0200 Subject: Add README, clean up a little. --- README | 109 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/kahn_seq.ml | 10 ++++- src/manager.ml | 28 +++++++------- src/poolserver.ml | 2 + 4 files changed, 133 insertions(+), 16 deletions(-) create mode 100644 README diff --git a/README b/README new file mode 100644 index 0000000..9928b3e --- /dev/null +++ b/README @@ -0,0 +1,109 @@ +Projet de Système et Réseaux 2014 +Alex AUVOLAT, Jean FABRE-MONPLAISIR + +------------------------------------ + +Description du projet +===================== + +Implémentation des réseaux de Kahn en OCaml pour permettre la programmation +parallèle. Trois implémentations à réaliser : + +- Implémentation séquentielle (parallélisme coopératif mono-threadé) +- Implémentation basée sur les primitives Unix (fork, pipe) +- Implémentation permettant la communication en réseau + + +Commentaires techniques +======================= + +Dans les versions séquentielle et Unix, nous avons implémenté une nouvelle +primitive : select, qui permet de faire un get sur plusieurs caneaux en même +temps et d'exécuter une fonction différente en fonction du premier canal sur +lequel un message arrive. Nous n'avons pas pris le temps d'implémenter cette +fonction dans la version fonctionnant par le réseau. + +Version séquentielle +-------------------- + +Un processus est décrit par le type suivant : + + type 'a process = ('a -> unit) -> unit + +C'est-à-dire qu'un processus renvoyant une valeur de type 'a est une fonction qui +prend comme argument sa continuation et s'exécute à ce moment-là. + +Les fonctions qui exploitent le parallélisme font appel à une file de processus +en attente d'exécution : doco lance des processus en “parallèle” en mettant +les-dits processus dans la file ; get gère l'attente d'un message sur un canal +en mettant en fin de file un processus qui re-tente le get lorsque celui-ci a +échoué car le canal ne contenait aucune donnée - l'éspoir étant qu'un autre +processus se sera exécuté d'ici-là et aura envoyé un message dans le canal. + +Version Unix +------------ + +Toutes les primitives sont fournies d'office par Unix, il n'y a donc presque +rien à faire. Les put/get sont automatiquement gérés par le noyau en ce qui +concerne la bufferisation et la mise en attente d'un processus tant qu'il n'y a +rien à lire ou qu'il n'y a plus de place pour écrire. Le lancement de processus +en parallèle (doco) exploite simplement l'appel système fork, puis waitpid pour +la syncronisation finale. + +Version réseau +-------------- + +Publicité pour la version réseau : nous avons réussi, en mobilisant 5 machines +des salles INFO3 et INFO4 de l'ENS, à calcuer de manière naïve (c'est-à-dire +avec un algorithme exponentiel) le 53e nombre de la suite de Fibonacci, en un +temps record de 16,8 secondes. Ce nombre pouvait être calculé sur une seule +machine, ce qui prenait un peu plus d'une minute dans le cas d'une machine dont +les quatre cœurs étaient exploités (implémentation Unix). En mobilisant plus de +machines, nous pourrions sûrement améliorer encore ce temps. + +L'implémentation réseau est basée sur une version simplifiée de l'implémentation +séquentielle, où un processus participant de l'exécution du réseau se contente +de lire des tâches sur stdin, de les exécuter et d'envoyer des informations sur +stdout (messages envoyés, tâches lancées par doco). + +À cela se rajoute un “manager”, ou gestionnaire, qui s'occupe de multiplexer les +entrées/sorties pour dispatcher les processus disponibles aux différents +processus qui lui sont affectés. + +Les appels système pour la communication étant les même pour le réseau et les +pipes (read/write), le manager peut aussi bien communiquer avec des processus +locaux qu'avec des processus distants via le réseau. + +À cela nous avons rajouté un système de “pool” (pool server/pool client) qui +permet à un certain nombre de machines de se déclarer “disponnibles” pour des +calculs. Le manager peut ensuite demander à la pool de lui donner un certain +nombre de processus pour effectuer des calculs. Le pool client s'occupe +d'initaliser la connection réseau et de rediriger stdin et stdout vers le +réseau, avant de lancer le processus qui effectuera les calculs. + +Les tâches (processus au sens de Kahn) sont des fermetures que l'on serialise +via la bibliothèque Marshall d'OCaml pour être transmis par le réseau. La partie +manager est indépendante de l'application que l'on fait tourner, par contre le +binaire qui effectue les calculs doit être identique sur toutes les machines +participant au calcul pour que des fermetures puissent être transmises via +Marshall sans problème. + +Utilisation de la version réseau : + + tulipier$ ./poolserver.native & + tonka$ ./poolclient.native tulipier + tamier$ ./poolclient.native tulipier + turnep$ ./poolclient.native tulipier + tetragone$ ./poolclient.native tulipier + tulipier$ time ./manager.native -pool-addr tulipier \ + -my-addr tulipier -pool-proc 16 -local-proc 4 ./example.native + +(en supposant que . correspond au même dossier, monté par NFS par exemple, sur +toutes les machines) + +Les temps d'exécution peuvent varier car ils sont fonction de la répartition +entre les machines des tâches qui calculent peu et communiquent beaucoup : +celles-ci ralentissent le système lorsqu'elles sont lancées sur une machine qui +n'est pas celle où tourne le manager. Nous ne pouvons avoir que peu d'influence +là-dessus puisque la répartition des processus est un processus aléatoire. + diff --git a/src/kahn_seq.ml b/src/kahn_seq.ml index 8aff905..c699bbd 100644 --- a/src/kahn_seq.ml +++ b/src/kahn_seq.ml @@ -53,8 +53,14 @@ module Seq: S = struct let doco l = fun cont -> - List.iter (fun proc -> Queue.push (fun () -> proc (fun () -> ())) tasks) l; - cont () + let remain = ref (List.length l) in + List.iter (fun proc -> Queue.push (fun () -> proc (fun () -> remain := !remain - 1)) tasks) l; + let rec wait_x () = + if !remain = 0 then + cont () + else + Queue.push wait_x tasks + in wait_x () let return v = fun cont -> cont v diff --git a/src/manager.ml b/src/manager.ml index 9498d8c..4f49dd9 100644 --- a/src/manager.ml +++ b/src/manager.ml @@ -15,7 +15,7 @@ let my_addr = ref "" let my_port = ref 9011 let pool_addr = ref "" let pool_port = ref 9082 -let pool_count = ref 0 +let pool_proc = ref 0 (* Server data structures *) @@ -45,6 +45,12 @@ type server = { sock: file_descr; } +let shutdown_server server = + dbg "Shutting down server..."; + Hashtbl.iter (fun _ c -> c.disconnect()) server.clients; + if !my_addr <> "" then shutdown server.sock SHUTDOWN_ALL; + close server.sock + let new_server () = let server = { tasks = Queue.create (); @@ -58,11 +64,10 @@ let new_server () = if !my_addr <> "" then begin dbg @@ Format.sprintf "Listening on port %d" !my_port; bind server.sock (make_addr "0.0.0.0" !my_port); - listen server.sock (min 1 !pool_count); + listen server.sock (min 1 !pool_proc); let stop_srv _ = - dbg "Shutting down server..."; - shutdown server.sock SHUTDOWN_ALL; + shutdown_server server; exit 0 in Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv); @@ -84,9 +89,6 @@ let push_task server task = | MsgTask(a, b) -> GiveMsgTask(a, b) | Task(a) -> GiveTask(a)) -let get_task server = - Queue.pop server.tasks - let handle_put server chan msg = if Hashtbl.mem server.tsk_chan chan then let task = Hashtbl.find server.tsk_chan chan in @@ -220,12 +222,9 @@ let rec server_run server = end end -let server_shutdown server = - if !my_addr <> "" then shutdown server.sock SHUTDOWN_ALL (* Main function *) - let parse_args () = let usage = "Usage: ./manager [options] program" in let options = [ @@ -235,7 +234,7 @@ let parse_args () = "-my-port", Arg.Set_int my_port, "Port for me to listen"; "-pool-addr", Arg.Set_string pool_addr, "Pool server to use"; "-pool-port", Arg.Set_int pool_port, "Port on which to connect to pool"; - "-pool-count", Arg.Set_int pool_count, "Number of processes to ask to pool"; + "-pool-proc", Arg.Set_int pool_proc, "Number of processes to ask to pool"; ] in Arg.parse options (fun n -> program := n) usage @@ -260,6 +259,7 @@ let () = let p2m_m, p2m_p = pipe () in match fork() with | 0 -> + close server.sock; close m2p_m; close p2m_m; dup2 m2p_p stdin; @@ -285,7 +285,7 @@ let () = pids := pid :: (!pids) done; - if !pool_addr <> "" && !pool_count > 0 then begin + if !pool_addr <> "" && !pool_proc > 0 then begin let sock = socket PF_INET SOCK_STREAM 0 in connect sock (make_addr !pool_addr !pool_port); let outc = out_channel_of_descr sock in @@ -295,13 +295,13 @@ let () = if read_one_msg sock <> PoolHello then raise (ProtocolError "Expected PoolHello reply."); - send (PoolRequest(!program, (!my_addr, !my_port), !pool_count)); + send (PoolRequest(!program, (!my_addr, !my_port), !pool_proc)); shutdown sock SHUTDOWN_ALL; close sock end; server_run server; - server_shutdown server; + shutdown_server server; List.iter (fun pid -> ignore (waitpid [] pid)) !pids diff --git a/src/poolserver.ml b/src/poolserver.ml index 15a700c..850db3c 100644 --- a/src/poolserver.ml +++ b/src/poolserver.ml @@ -29,7 +29,9 @@ let new_server () = let stop_srv _ = Format.eprintf "Shutting down server...@."; + Hashtbl.iter (fun _ c -> c.disconnect()) server.clients; shutdown server.sock SHUTDOWN_ALL; + close server.sock; exit 0 in Sys.set_signal Sys.sigint (Sys.Signal_handle stop_srv); -- cgit v1.2.3