410013796724260
• Webmoney
R335386147728
Z369087728698
Описание и пример FutureTaskОдной из «неприятных» задач многопоточного приложения является определение состояния потока (работает или нет, реагирует на внешние воздействия или нет) и останов потока. Если рассматривать обычный поток Thread, то с ним могут возникнуть такие проблемы. Ведь в Java (JDK 1.5 и выше) нет механизма останова потока. Как известно метод Thread.stop() объявлен как Deprecated поскольку «не потокобезопасен», а безопасная инструкция Thread.interrupt() только сообщает потоку о необходимости остановки. Но если данное сообщение проигнорировано, т.е. разработчик не вставил обработку, то и поток не остановится. И вот здесь на помощь разработчику приходит пакет Concurrent со своими интерфейсами java.util.concurrent.Callable, java.util.concurrent.Future. Данные интерфейсы (подробно представленные на сайте здесь ), а также их различные реализации позволяют решать задачи «внешнего» управления потоками, т.е. не в методе выполнения потока run() или call(). В данной статье будет рассмотрен класс FutureTask и приведен пример контроля состояния потоков и останова одного потока из другого. Класс FutureTask<V>Класс-оболочка FutureTask базируется на конкретной реализации интерфейса Future. Чтобы создать реализацию данного класса необходим объект Callable; после этого можно использовать Java Thread Pool Executor для асинхронной обработки. Таким образом, FutureTask представляет удобный механизм для превращения Callable одновременно в Future и Runnable, реализуя оба интерфейса. Объект класса FutureTask может быть передан на выполнение классу, реализующему интерфейс Executor, либо запущен в отдельном потоке, как класс, реализующий интерфейс Runnable. Если посмотреть в исходники класса FutureTask, то можно увидеть, что он реализует интерфейс RunnableFuture, который, в свою очередь, наследует свойства интерфейсов Runnable и Future<V>. Конструкторы класса FutureTask<V>Класс FutureTask<V> содержит следующие два конструктора : /* * Конструктор создания FutureTask, который * после старта выполнит callable */ FutureTask(Callable<V> callable) /* * Конструктор создания FutureTask, который после старта * выполнит runnable и при успешном завершении вернет * объект result */ FutureTask(Runnable runnable, V result) Методы класса FutureTask<V>
Пример использования FutureTaskРассмотрим простой пример использования FutureTask, в котором создается пул из 3-х потоков, реализующих интерфейс Callable в виде класса CallableDelay. Основная идея примера связана с проверками выполняемых потоков и отмены выполнения задачи одного из потоков. Конструктор класса CallableDelay в качестве параметров получает временно́й размер задержки delay и идентификатор потока. В зависимости от значения идентификатора потока в методе call() выполняется соответствующее количество циклов с заданной задержкой, после чего поток завершает работу. Второй поток на первом цикле прерывает выполнение 3-го потока вызовом метода cancel. Метод call потока возвращает текстовый объект String в виде наименования потока. Метод areTasksDone() проверяет завершение выполнения всех задач/потоков вызовом методов isDone() объектов futureTask. Если выполнение всех потоков завершены, то сервис executor останавливает свою работу методом shutdown(). В конструкторе примера создаются два массива из объектов типа CallableDelay и FutureTask. После этого формируется пул для трех потоков сервиса типа ExecutorService и потоки стартуют методом execute сервиса executor. В цикле выполняются различного рода проверки : завершения работы потока методом isDone(), ожидания завершения потока методом get() с временны́ми параметрами и отмены выполнения потока методом isCancelled(). import java.util.concurrent.TimeUnit; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; import java.util.concurrent.ExecutionException; public class FutureTaskExample { CallableDelay[] callable = null; FutureTask<String>[] futureTask = null; ExecutorService executor = null; private final int THREAD_COUNT = 3; //----------------------------------------------------- class CallableDelay implements Callable<String> { private long delay; private int idx ; private int cycle; public CallableDelay(int delay, int idx) { this.delay = delay; this.idx = idx; this.cycle = idx; } @Override public String call() throws Exception { while (cycle > 0) { Thread.sleep(delay); cycle--; if ((idx == 2) && (cycle > 0)) futureTask[futureTask.length - 1] .cancel(true); } /* * Идентификатор и наименование потока, * выполняющего данную callable задачу */ return ""+ idx + ". " + Thread.currentThread().getName(); } } //----------------------------------------------------- private boolean areTasksDone() { boolean isDone = true; for (int i = 0; i < THREAD_COUNT; i++) { if (!futureTask[i].isDone()) { isDone = false; break; } } return isDone; } //----------------------------------------------------- FutureTaskExample () { callable = new CallableDelay[THREAD_COUNT]; futureTask = new FutureTask [THREAD_COUNT]; // Сервис исполнения executor=Executors.newFixedThreadPool(THREAD_COUNT); for (int i = 0; i < THREAD_COUNT; i++) { callable [i] = new CallableDelay(1000, (i + 1)); futureTask[i] = new FutureTask<String>( callable[i]); executor.execute(futureTask[i]); } // Цикл работы executor'а while (true) { try { if (areTasksDone()) { // Завершение работы executor'а executor.shutdown(); System.out.println("\nexecutor shutdown"); return; } // Проверка завершения выполнения задачи 1-м // потоком if (!futureTask[0].isDone()) System.out.println( "1-ый поток завершен : " + futureTask[0].get()); System.out.println( "Ожидание завершения 2-го потока"); String txt = futureTask[1].get(200L, TimeUnit.MILLISECONDS); if(txt != null) System.out.println( "2-ой поток завершен : " + txt); System.out.println( "Проверка завершения 3-го потока"); if (futureTask[2].isCancelled()) System.out.println( "3-ой поток был прерван ..."); else if (!futureTask[2].isDone()) { txt = futureTask[2].get(); System.out.println( "3-ий поток завершен : "+txt); } Thread.sleep(200); } catch (InterruptedException | ExecutionException e) { System.err.println(e.getMessage()); } catch(TimeoutException e){ /* * 2-ой поток вызывает TimeoutException, * если задача не завершена за указанное * время */ System.err.println("TimeoutException"); } } } //----------------------------------------------------- public static void main(String[] args) { new FutureTaskExample(); } } Результат работы примераПри выполнении задачи информационные сообщения, представленные ниже, выводятся в консоль. Согласно последовательности вывода сообщений можно сказать, что при вызове метода isDone() объекта FutureTask 1-го потока программа перешла в режим ожидания завершения работы потока. После завершения выполнения 1-го потока программа перешла к проверке 2-го потока методом get() с временно́й задержкой.Так как за предоставленное время поток не смог завершить работу, то был вызван TimeoutException и цикл проверки повторился снова. Только после завершения работы 2-го потока программа перешла к проверке отмены/завершения 3-го потока. Метод isCancelled() подтвердил, что выполнение потока было прервано. Только после этого метод areTasksDone() подтвердил, что все потоки завершили выполнение и цикл проверок был прерван, сервис executor завершил выполнение методом shutdown(). 1-ый поток завершен : 1. pool-1-thread-1 Ожидание завершения 2-го потока TimeoutException Ожидание завершения 2-го потока TimeoutException Ожидание завершения 2-го потока TimeoutException Ожидание завершения 2-го потока TimeoutException Ожидание завершения 2-го потока 2-ой поток завершен : 2. pool-1-thread-2 Проверка завершения 3-го потока 3-ий поток был прерван ... executor shutdown Именование потоковСтандартная схема именования потока соответствует формату pool-N-thread-M, где N обозначает последовательный номер пула, а M – порядковый номер потока в пуле. Так наименование pool-1-thread-2 означает второй поток в первом пуле жизненного цикла JVM. Каждый раз, когда создается новый пул, глобальный счетчик N инкрементится. |