Java - Détection et gestion des fils suspendus

Par Alex. C. Punnen

Architecte - Nokia Siemens Networks

Bangalore

Les threads suspendus sont un défi courant dans le développement de logiciels qui doivent s'interfacer avec des appareils propriétaires à l'aide d'interfaces propriétaires ou standardisées telles que SNMP, Q3 ou Telnet. Ce problème ne se limite pas à la gestion du réseau, mais se produit dans un large éventail de domaines tels que les serveurs Web, les processus appelant des appels de procédure à distance, etc.

Un thread qui lance une demande à un appareil a besoin d'un mécanisme pour détecter au cas où l'appareil ne répondrait pas ou ne répondrait que partiellement. Dans certains cas où un tel blocage est détecté, une action spécifique doit être entreprise. L'action spécifique peut être soit une nouvelle tentative, soit une notification à l'utilisateur final de l'échec de la tâche ou une autre option de récupération. Dans certains cas où un grand nombre de tâches doit être déclenché vers un grand nombre d'éléments de réseau par un composant, la détection des threads suspendus est importante afin qu'elle ne devienne pas un goulot d'étranglement pour d'autres traitements de tâches. La gestion des threads bloqués comporte donc deux aspects: les performances et la notification .

Pour l' aspect notification, nous pouvons personnaliser le modèle Java Observer pour qu'il s'intègre dans le monde multithread.

Personnalisation du modèle Java Observer pour les systèmes multithread

En raison des tâches suspendues, l'utilisation de la ThreadPoolclasse Java avec une stratégie d'adaptation est la première solution qui me vient à l'esprit. Cependant, l'utilisation de Java ThreadPooldans le contexte de certains threads suspendus au hasard sur une période de temps donne un comportement indésirable basé sur la stratégie particulière utilisée, comme la famine de threads dans le cas d'une stratégie de pool de threads fixe. Ceci est principalement dû au fait que Java ThreadPooln'a pas de mécanisme pour détecter un blocage de thread.

Nous pourrions essayer un pool de threads mis en cache, mais il présente également des problèmes. Si le taux de déclenchement des tâches est élevé et que certains threads se bloquent, le nombre de threads peut augmenter, provoquant éventuellement une privation de ressources et des exceptions de mémoire insuffisante. Ou nous pourrions utiliser une ThreadPoolstratégie personnalisée invoquant un CallerRunsPolicy. Dans ce cas également, un thread bloqué peut éventuellement entraîner le blocage de tous les threads. (Le thread principal ne doit jamais être l'appelant, car il est possible que toute tâche passée au thread principal se bloque, provoquant l'arrêt de tout.)

Donc, quelle est la solution? Je vais démontrer un modèle ThreadPool pas si simple qui ajuste la taille du pool en fonction du taux de tâches et en fonction du nombre de threads suspendus. Passons d'abord au problème de la détection des fils suspendus.

Détection des fils suspendus

La figure 1 montre une abstraction du motif:

Il y a deux classes importantes ici: ThreadManageret ManagedThread. Les deux s'étendent de la Threadclasse Java . Le ThreadManagercontient un conteneur qui contient le ManagedThreads. Lorsqu'un nouveau ManagedThreadest créé, il s'ajoute à ce conteneur.

 ThreadHangTester testthread = new ThreadHangTester("threadhangertest",2000,false); testthread.start(); thrdManger.manage(testthread, ThreadManager.RESTART_THREAD, 10); thrdManger.start(); 

Le ThreadManagerparcourt cette liste et appelle la méthode ManagedThreads isHung(). Il s'agit essentiellement d'une logique de vérification d'horodatage.

 if(System.currentTimeMillis() - lastprocessingtime.get() > maxprocessingtime ) { logger.debug("Thread is hung"); return true; } 

S'il constate qu'un thread est entré dans une boucle de tâche et n'a jamais mis à jour ses résultats, il utilise un mécanisme de récupération tel que stipulé par le ManageThread.

 while(isRunning) { for (Iterator iterator = managedThreads.iterator(); iterator.hasNext();) { ManagedThreadData thrddata = (ManagedThreadData) iterator.next(); if(thrddata.getManagedThread().isHung()) { logger.warn("Thread Hang detected for ThreadName=" + thrddata.getManagedThread().getName() ); switch (thrddata.getManagedAction()) { case RESTART_THREAD: // The action here is to restart the the thread //remove from the manager iterator.remove(); //stop the processing of this thread if possible thrddata.getManagedThread().stopProcessing(); if(thrddata.getManagedThread().getClass() == ThreadHangTester.class) //To know which type of thread to create { ThreadHangTester newThread =new ThreadHangTester("restarted_ThrdHangTest",5000,true); //Create a new thread newThread.start(); //add it back to be managed manage(newThread, thrddata.getManagedAction(), thrddata.getThreadChecktime()); } break; ......... 

Pour qu'un nouveau ManagedThreadsoit créé et utilisé à la place de celui suspendu, il ne doit contenir aucun état ni aucun conteneur. Pour cela, le conteneur sur lequel les ManagedThreadactes doivent être séparés. Ici, nous utilisons le modèle Singleton basé sur ENUM pour contenir la liste des tâches. Ainsi, le conteneur contenant les tâches est indépendant du thread traitant les tâches. Cliquez sur le lien suivant pour télécharger la source du modèle décrit: Source Java Thread Manager.

Threads suspendus et stratégies Java ThreadPool

Le Java ThreadPooln'a pas de mécanisme pour détecter les threads suspendus. L'utilisation d'une stratégie telle que fixed threadpool ( Executors.newFixedThreadPool()) ne fonctionnera pas car si certaines tâches se bloquent au fil du temps, tous les threads seront finalement dans un état bloqué. Une autre option consiste à utiliser une stratégie ThreadPool mise en cache (Executors.newCachedThreadPool()). Cela pourrait garantir qu'il y aura toujours des threads disponibles pour traiter une tâche, uniquement limités par la mémoire de la VM, le processeur et les limites de thread. Cependant, avec cette stratégie, il n'y a aucun contrôle du nombre de threads créés. Indépendamment du fait qu'un thread de traitement se bloque ou non, l'utilisation de cette stratégie alors que le taux de tâches est élevé entraîne la création d'un grand nombre de threads. Si vous n'avez pas assez de ressources pour la JVM très bientôt, vous atteindrez le seuil de mémoire maximal ou le processeur élevé. Il est assez courant de voir le nombre de threads atteindre des centaines ou des milliers. Même s'ils sont libérés une fois la tâche traitée, parfois pendant la gestion des rafales, le nombre élevé de threads submerge les ressources système.

Une troisième option consiste à utiliser des stratégies ou des politiques personnalisées. Une de ces options est d'avoir un pool de threads qui évolue de 0 à un nombre maximum. Ainsi, même si un thread était suspendu, un nouveau thread serait créé tant que le nombre maximum de threads était atteint:

 execexec = new ThreadPoolExecutor(0, 3, 60, TimeUnit.SECONDS, new SynchronousQueue()); 

Ici, 3 est le nombre maximal de threads et la durée de conservation est définie sur 60 secondes car il s'agit d'un processus gourmand en tâches. Si nous donnons un nombre maximum de threads suffisamment élevé, c'est plus ou moins une politique raisonnable à utiliser dans le contexte des tâches suspendues. Le seul problème est que si les threads suspendus ne sont pas libérés, il y a une légère chance que tous les threads se bloquent à un moment donné. Si le nombre maximal de threads est suffisamment élevé et en supposant que le blocage d'une tâche est un phénomène peu fréquent, cette politique conviendrait parfaitement.

Il aurait été agréable d' ThreadPoolavoir également un mécanisme enfichable de détection des fils suspendus. J'en discuterai plus tard. Bien sûr, si tous les threads sont bloqués, vous pouvez configurer et utiliser la stratégie de tâche rejetée du pool de threads. Si vous ne souhaitez pas supprimer les tâches, vous devrez utiliser le CallerRunsPolicy:

 execexec = new ThreadPoolExecutor(0, 20, 20, TimeUnit.MILLISECONDS, new SynchronousQueue() new ThreadPoolExecutor.CallerRunsPolicy()); 

Dans ce cas, si un blocage de thread provoquait le rejet d'une tâche, cette tâche serait donnée au thread appelant à traiter. Il y a toujours une chance que cette tâche soit trop suspendue. Dans ce cas, tout le processus se figerait. Il vaut donc mieux ne pas ajouter une telle politique dans ce contexte.

 public class NotificationProcessor implements Runnable { private final NotificationOriginator notificationOrginator; boolean isRunning = true; private final ExecutorService execexec; AlarmNotificationProcessor(NotificationOriginator norginator) { //ctor // execexec = Executors.newCachedThreadPool();// Too many threads // execexec = Executors.newFixedThreadPool(2);//, no hang tasks detection execexec = new ThreadPoolExecutor(0, 4, 250, TimeUnit.MILLISECONDS, new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); } public void run() { while (isRunning) { try { final Task task = TaskQueue.INSTANCE.getTask(); Runnable thisTrap= new Runnable() { public void run() { ++alarmid; notificaionOrginator.notify(new OctetString(), // Task processing nbialarmnew.getOID(), nbialarmnew.createVariableBindingPayload()); É........}} ; execexec.execute(thisTrap); } 

Un ThreadPool personnalisé avec détection de blocage

Une bibliothèque de pool de threads avec la capacité de détection et de gestion des blocages de tâches serait formidable. J'en ai développé un et je vais le démontrer ci-dessous. Il s'agit en fait d'un port d'un pool de threads C ++ que j'ai conçu et utilisé il y a quelque temps (voir les références). Fondamentalement, cette solution utilise le modèle de commande et le modèle de chaîne de responsabilité. Cependant, implémenter le modèle Command en Java sans l'aide du support d'objet Function est un peu difficile. Pour cela, j'ai dû modifier légèrement l'implémentation pour utiliser la réflexion Java. Notez que le contexte dans lequel ce modèle a été conçu était celui où un pool de threads devait être installé / branché sans modifier aucune des classes existantes.(Je pense que le seul grand avantage de la programmation orientée objet est qu'elle nous donne un moyen de concevoir des classes de manière à utiliser efficacement le principe ouvert fermé. Cela est particulièrement vrai de l'ancien code complexe et pourrait être moins pertinent pour développement de nouveaux produits.) J'ai donc utilisé la réflexion au lieu d'utiliser une interface pour implémenter le modèle de commande. Le reste du code pourrait être porté sans changement majeur car presque toutes les primitives de synchronisation et de signalisation des threads sont disponibles à partir de Java 1.5.Le reste du code pourrait être porté sans changement majeur car presque toutes les primitives de synchronisation et de signalisation des threads sont disponibles à partir de Java 1.5.Le reste du code pourrait être porté sans changement majeur car presque toutes les primitives de synchronisation et de signalisation des threads sont disponibles à partir de Java 1.5.

 public class Command { private Object[ ]argParameter; ........ //Ctor for a method with two args Command(T pObj, String methodName, long timeout, String key, int arg1, int arg2) { m_objptr = pObj; m_methodName = mthodName; m_timeout = timeout; m_key = key; argParameter = new Object[2]; argParameter[0] = arg1; argParameter[1] = arg2; } // Calls the method of the object void execute() { Class klass = m_objptr.getClass(); Class[] paramTypes = new Class[]{int.class, int.class}; try { Method methodName = klass.getMethod(m_methodName, paramTypes); //System.out.println("Found the method--> " + methodName); if (argParameter.length == 2) { methodName.invoke(m_objptr, (Object) argParameter[0], (Object) argParameter[1]); } 

Exemple d'utilisation de ce modèle:

 public class CTask {.. public int DoSomething(int a, int b) {...} } 

Command cmd4 = new Command(task4, "DoMultiplication", 1, "key2",2,5);

Maintenant, nous avons deux classes plus importantes ici. L'un est la ThreadChainclasse, qui implémente le modèle de chaîne de responsabilité:

 public class ThreadChain implements Runnable { public ThreadChain(ThreadChain p, ThreadPool pool, String name) { AddRef(); deleteMe = false; busy = false; //--> very important next = p; //set the thread chain - note this is like a linked list impl threadpool = pool; //set the thread pool - Root of the threadpool ........ threadId = ++ThreadId; ...... // start the thread thisThread = new Thread(this, name + inttid.toString()); thisThread.start(); } 

Cette classe a deux méthodes principales. L'un est booléen CanHandle()qui est initié par la ThreadPoolclasse et se poursuit ensuite de manière récursive. Cela vérifie si le thread actuel ( ThreadChaininstance actuelle ) est libre de gérer la tâche. S'il est déjà en train de gérer une tâche, il appelle la suivante dans la chaîne.

 public Boolean canHandle() { if (!busy) { //If not busy System.out.println("Can Handle This Event in id=" + threadId); // todo signal an event try { condLock.lock(); condWait.signal(); //Signal the HandleRequest which is waiting for this in the run method ......................................... return true; } ......................................... ///Else see if the next object in the chain is free /// to handle the request return next.canHandle(); 

Notez que HandleRequestest une méthode de ThreadChainqui est invoquée à partir de la Thread run()méthode et attend le signal de la canHandleméthode. Notez également comment la tâche est gérée via le modèle de commande.