Programování

Java - detekce závěsných vláken a manipulace s nimi

Alex. C. Punnen

Architekt - Nokia Siemens Networks

Bangalore

Závěsná vlákna jsou běžnou výzvou při vývoji softwaru, který musí komunikovat s proprietárními zařízeními pomocí proprietárních nebo standardizovaných rozhraní, jako jsou SNMP, Q3 nebo Telnet. Tento problém se neomezuje pouze na správu sítě, ale vyskytuje se v celé řadě oborů, jako jsou webové servery, procesy vyvolávající vzdálená volání procedur atd.

Vlákno, které iniciuje požadavek na zařízení, potřebuje mechanismus k detekci v případě, že zařízení neodpovídá nebo reaguje pouze částečně. V některých případech, kdy je zjištěno takové zablokování, je třeba provést konkrétní akci. Konkrétní akcí může být buď obnovení, nebo informování koncového uživatele o selhání úlohy nebo nějaká jiná možnost obnovení. V některých případech, kdy komponenta musí na velký počet síťových prvků aktivovat velký počet úkolů, je důležitá detekce zavěšení podprocesu, aby se nestala překážkou pro další zpracování úloh. Správa visících vláken má tedy dva aspekty: výkon a oznámení.

Pro aspekt oznámení můžeme přizpůsobit vzor Java Observer tak, aby se vešel do světa s více vlákny.

Přizpůsobení vzoru pozorovatele Java na vícevláknové systémy

Kvůli zavěšení úkolů pomocí Java ThreadPool třída s vhodnou strategií je první řešení, které mě napadne. Nicméně pomocí Java ThreadPool v souvislosti s některými vlákny, které náhodně visí po určitou dobu, způsobují nežádoucí chování založené na konkrétní použité strategii, jako je hladové vlákno v případě pevné strategie fondu vláken. To je způsobeno hlavně skutečností, že Java ThreadPool nemá mechanismus pro detekci zablokování vlákna.

Mohli bychom vyzkoušet fond podprocesů s mezipamětí, ale má také problémy. Pokud je vysoká rychlost spouštění úkolů a některá vlákna zablokují, počet vláken by mohl vystřelit nahoru, což by nakonec způsobilo nedostatek prostředků a výjimky z paměti. Nebo bychom mohli použít vlastní ThreadPool strategie vyvolávající a CallerRunsPolicy. I v tomto případě může zablokování vlákna způsobit, že se nakonec všechna vlákna zablokují. (Hlavní vlákno by nikdy nemělo být volajícím, protože existuje možnost, že jakýkoli úkol předaný hlavnímu vláknu může viset, což způsobí, že se vše zastaví.)

Jaké je tedy řešení? Předvedu ne tak jednoduchý vzor ThreadPool, který upravuje velikost fondu podle rychlosti úkolu a na základě počtu zavěšených vláken. Pojďme nejprve k problému detekce visících vláken.

Zjištění závěsných vláken

Obrázek 1 ukazuje abstrakci vzoru:

Zde jsou dvě důležité třídy: ThreadManager a ManagedThread. Oba vycházejí z Javy Vlákno třída. The ThreadManager drží nádobu, která drží ManagedThreads. Když nový ManagedThread je vytvořen, přidá se do tohoto kontejneru.

 ThreadHangTester testthread = nový ThreadHangTester ("threadhangertest", 2000, false); testthread.start (); thrdManger.manage (testthread, ThreadManager.RESTART_THREAD, 10); thrdManger.start (); 

The ThreadManager iteruje tímto seznamem a volá ManagedThreadje isHung () metoda. V zásadě se jedná o logiku kontroly časového razítka.

 if (System.currentTimeMillis () - lastprocessingtime.get ()> maxprocessingtime) {logger.debug ("Thread is hung"); návrat true; } 

Pokud zjistí, že vlákno přešlo do smyčky úkolů a nikdy neaktualizovalo své výsledky, použije mechanismus obnovy, jak stanoví Správa vlákna.

 while (isRunning) {for (Iterator iterator = managedThreads.iterator (); iterator.hasNext ();) {ManagedThreadData thrddata = (ManagedThreadData) iterator.next (); if (thrddata.getManagedThread (). isHung ()) {logger.warn ("Bylo zjištěno zablokování vlákna pro ThreadName =" + thrddata.getManagedThread (). getName ()); switch (thrddata.getManagedAction ()) {případ RESTART_THREAD: // Zde je třeba restartovat vlákno // odebrat ze správce iterator.remove (); // pokud možno zastavte zpracování tohoto vlákna thrddata.getManagedThread (). stopProcessing (); if (thrddata.getManagedThread (). getClass () == ThreadHangTester.class) // Vědět, jaký typ vlákna vytvořit {ThreadHangTester newThread = new ThreadHangTester ("restarted_ThrdHangTest", 5000, true); // Vytvořit nové vlákno newThread.start (); // přidat zpět ke správě spravovat (newThread, thrddata.getManagedAction (), thrddata.getThreadChecktime ()); } přestávka; ......... 

Pro nový ManagedThread má být vytvořen a použit místo zavěšeného, ​​neměl by obsahovat žádný stát ani žádný kontejner. K tomu kontejner, na kterém ManagedThread činy by měly být odděleny. Tady používáme vzor Singleton založený na ENUM k uložení seznamu úkolů. Kontejner obsahující úkoly je tedy nezávislý na vlákně zpracovávajícím úkoly. Kliknutím na následující odkaz si stáhnete zdroj popsaného vzoru: Zdroj Java Thread Manager.

Závěsná vlákna a strategie Java ThreadPool

Java ThreadPool nemá mechanismus pro detekci visících vláken. Pomocí strategie, jako je pevná skupina vláken (Executors.newFixedThreadPool ()) nebude fungovat, protože pokud některé úkoly přestanou časem fungovat, všechna vlákna budou nakonec ve stavu zablokování. Další možností je použití zásady ThreadPool v mezipaměti (Executors.newCachedThreadPool ()). To by mohlo zajistit, že pro zpracování úlohy budou vždy k dispozici podprocesy, omezené pouze omezeními paměti VM, CPU a podprocesů. S touto zásadou však neexistuje žádná kontrola nad počtem vláken, která se vytvoří. Bez ohledu na to, zda podproces zpracování zablokuje nebo ne, použití této zásady při vysoké rychlosti úkolu vede k vytvoření velkého počtu podprocesů. Pokud nemáte dostatek prostředků pro JVM velmi brzy, narazíte na maximální prahovou hodnotu paměti nebo vysokou CPU. Je docela běžné vidět počet vláken zasažených stovky nebo tisíce. I když jsou vydány, jakmile je úkol zpracován, někdy během zpracování burstu ohromí vysoký počet vláken systémové prostředky.

Třetí možností je použití vlastních strategií nebo zásad. Jednou z takových možností je mít fond podprocesů, který se mění od 0 do určitého maximálního počtu. Takže i kdyby jedno vlákno viselo, nové vlákno by bylo vytvořeno, pokud bylo dosaženo maximálního počtu vláken:

 execexec = new ThreadPoolExecutor (0, 3, 60, TimeUnit.SECONDS, new SynchronousQueue ()); 

Tady 3 je maximální počet vláken a doba udržení naživu je nastavena na 60 sekund, protože se jedná o proces náročný na úkoly. Pokud zadáme dostatečně vysoký maximální počet vláken, jedná se víceméně o rozumnou zásadu, kterou lze použít v kontextu předsazení úkolů. Jediným problémem je, že pokud se závěsná vlákna nakonec neuvolní, existuje malá šance, že by se v určitém okamžiku mohla všechna vlákna zavěsit. Pokud je maximální počet podprocesů dostatečně vysoký a za předpokladu, že zablokování úkolu je neobvyklý jev, pak by se tato zásada hodila k účtu.

Bylo by sladké, kdyby ThreadPool měl také zásuvný mechanismus detekce visících vláken. O jednom takovém designu pojednám později. Samozřejmě, pokud jsou všechna vlákna zamrzlá, můžete nakonfigurovat a použít zásady odmítnuté úlohy fondu vláken. Pokud nechcete zahodit úkoly, které byste museli použít CallerRunsPolicy:

 execexec = new ThreadPoolExecutor (0, 20, 20, TimeUnit.MILLISECONDS, new SynchronousQueue () new ThreadPoolExecutor.CallerRunsPolicy ()); 

V tomto případě, pokud zablokování vlákna způsobilo odmítnutí úkolu, bude tento úkol předán volajícímu vláknu, které má být zpracováno. Vždy existuje šance, že tento úkol bude příliš viset. V takovém případě by celý proces zamrzl. Je tedy lepší v této souvislosti takovou politiku nepřidávat.

 veřejná třída NotificationProcessor implementuje Runnable {private final NotificationOriginator notificationOrginator; boolean isRunning = true; soukromá konečná ExecutorService execexec; AlarmNotificationProcessor (NotificationOriginator norginator) {// ctor // execexec = Executors.newCachedThreadPool (); // Příliš mnoho vláken // execexec = Executors.newFixedThreadPool (2); //, žádná detekce zablokování úloh execexec = nový ThreadPoolExecutor , 250, TimeUnit.MILLISECONDS, new SynchronousQueue (), new ThreadPoolExecutor.CallerRunsPolicy ()); } public void run () {while (isRunning) {try {final Task task = TaskQueue.INSTANCE.getTask (); Runnable thisTrap = new Runnable () {public void run () {++ alarmid; notificaionOrginator.notify (new OctetString (), // Zpracování úkolu nbialarmnew.getOID (), nbialarmnew.createVariableBindingPayload ()); É ........}}; execexec.execute (thisTrap); } 

Vlastní ThreadPool s detekcí zablokování

Knihovna fondů vláken se schopností detekce a zpracování zablokování úloh by byla skvělá. Jednu jsem vyvinul a níže ji předvedu. Toto je vlastně port z fondu podprocesů C ++, který jsem před časem navrhl a použil (viz odkazy). V zásadě toto řešení používá vzor příkazů a vzor řetězce odpovědnosti. Implementovat příkazový vzor v Javě bez pomoci podpory objektů Function je však trochu obtížné. K tomu jsem musel implementaci mírně změnit, abych použil odraz Java. Všimněte si, že v kontextu, ve kterém byl tento vzor navržen, bylo místo, kde musel být fond vláken nainstalován / zapojen, aniž by došlo ke změně kterékoli ze stávajících tříd. (Domnívám se, že jednou velkou výhodou objektově orientovaného programování je, že nám poskytuje způsob, jak navrhovat třídy tak, aby efektivně využívaly principu otevřeného uzavřeného přístupu. To platí zejména pro složitý starý starší kód a může mít menší význam pro nový vývoj produktu.) Proto jsem místo implementace příkazového vzoru použil rozhraní místo rozhraní. Zbytek kódu lze přenést bez větších změn, protože téměř všechny podprocesy synchronizace a signalizace podprocesů jsou k dispozici od verze Java 1.5.

 public class Příkaz {private Object [] argParameter; ........ // Ctor pro metodu se dvěma příkazy args (T pObj, String methodName, long timeout, String key, int arg1, int arg2) {m_objptr = pObj; m_methodName = mthodName; m_timeout = timeout; m_key = klíč; argParameter = nový objekt [2]; argParameter [0] = arg1; argParameter [1] = arg2; } // Volá metodu objektu void execute () {Class klass = m_objptr.getClass (); Třída [] paramTypes = nová Třída [] {int.class, int.class}; try {Method methodName = klass.getMethod (m_methodName, paramTypes); //System.out.println("Found the method -> "+ methodName); if (argParameter.length == 2) {methodName.invoke (m_objptr, (Object) argParameter [0], (Object) argParameter [1]); } 

Příklad použití tohoto vzoru:

 veřejná třída CTask {.. public int DoSomething (int a, int b) {...}} 

Příkaz cmd4 = nový příkaz (task4, "DoMultiplication", 1, "key2", 2,5);

Nyní tu máme další dvě důležité třídy. Jedním z nich je ThreadChain třída, která implementuje vzor Chain of Responsibility:

 public class ThreadChain implements Runnable {public ThreadChain (ThreadChain p, ThreadPool pool, String name) {AddRef (); deleteMe = false; zaneprázdněn = false; // -> velmi důležité next = p; // nastavit řetězec vláken - všimněte si, že je to jako propojený seznam impl threadpool = pool; // set the thread pool - Root of the threadpool ........ threadId = ++ ThreadId; ...... // spustit vlákno thisThread = new Thread (this, name + inttid.toString ()); thisThread.start (); } 

Tato třída má dvě hlavní metody. Jeden je booleovský CanHandle () který je iniciován ThreadPool třídy a poté pokračuje rekurzivně. Tím se zkontroluje, zda je aktuální vlákno (aktuální ThreadChain instance) je při zpracování úkolu zdarma. Pokud již zpracovává úkol, volá další v řetězci.

 public Boolean canHandle () {if (! busy) {// If not busy System.out.println ("Může zpracovat tuto událost v id =" + threadId); // todo signal an event try {condLock.lock (); condWait.signal (); // Signal the HandleRequest which is waiting for this in the run method .................................... ..... návrat true; } ......................................... /// Jinak se podívejte, jestli bude další objekt v řetězci je zdarma /// pro zpracování požadavku return next.canHandle (); 

Všimněte si, že HandleRequest je metoda ThreadChain který je vyvolán z Běh vlákna () metoda a čeká na signál z canHandle metoda. Všimněte si také, jak je úkol řešen pomocí příkazového vzoru.