410013796724260
• Webmoney
R335386147728
Z369087728698
Описание и пример FutureTaskОдной из «неприятных» задач многопоточного приложения является определение состояния потока (работает или нет, реагирует на внешние воздействия или нет) и останов потока. Если рассматривать обычный поток Thread, то с ним могут возникнуть такие проблемы. Ведь в Java (JDK 1.5 и выше) нет механизма останова потока. Как известно метод Thread.stop() объявлен как Deprecated поскольку «не потокобезопасен», а безопасная инструкция Thread.interrupt() только сообщает потоку о необходимости остановки. Но если данное сообщение проигнорировано, т.е. разработчик не вставил обработку, то и поток не остановится. И вот здесь на помощь разработчику приходит пакет Concurrent со своими интерфейсами java.util.concurrent.Callable, java.util.concurrent.Future. Данные интерфейсы (подробно представленные на сайте здесь ), а также их различные реализации позволяют решать задачи «внешнего» управления потоками, т.е. не в методе выполнения потока run() или call(). В данной статье будет рассмотрен класс FutureTask и приведен пример контроля состояния потоков и останова одного потока из другого. Класс FutureTask<V>Класс-оболочка FutureTask базируется на конкретной реализации интерфейса Future. Чтобы создать реализацию данного класса необходим объект Callable; после этого можно использовать Java Thread Pool Executor для асинхронной обработки. Таким образом, FutureTask представляет удобный механизм для превращения Callable одновременно в Future и Runnable, реализуя оба интерфейса. Объект класса FutureTask может быть передан на выполнение классу, реализующему интерфейс Executor, либо запущен в отдельном потоке, как класс, реализующий интерфейс Runnable. Если посмотреть в исходники класса FutureTask, то можно увидеть, что он реализует интерфейс RunnableFuture, который, в свою очередь, наследует свойства интерфейсов Runnable и Future<V>. Конструкторы класса FutureTask<V>Класс FutureTask<V> содержит следующие два конструктора : /* * Конструктор создания FutureTask, который * после старта выполнит callable */ FutureTask(Callable<V> callable) /* * Конструктор создания FutureTask, который после старта * выполнит runnable и при успешном завершении вернет * объект result */ FutureTask(Runnable runnable, V result) Методы класса FutureTask<V>
Пример использования FutureTaskРассмотрим простой пример использования FutureTask, в котором создается пул из 3-х потоков, реализующих интерфейс Callable в виде класса CallableDelay. Основная идея примера связана с проверками выполняемых потоков и отмены выполнения задачи одного из потоков. Конструктор класса CallableDelay в качестве параметров получает временно́й размер задержки delay и идентификатор потока. В зависимости от значения идентификатора потока в методе call() выполняется соответствующее количество циклов с заданной задержкой, после чего поток завершает работу. Второй поток на первом цикле прерывает выполнение 3-го потока вызовом метода cancel. Метод call потока возвращает текстовый объект String в виде наименования потока. Метод areTasksDone() проверяет завершение выполнения всех задач/потоков вызовом методов isDone() объектов futureTask. Если выполнение всех потоков завершены, то сервис executor останавливает свою работу методом shutdown(). В конструкторе примера создаются два массива из объектов типа CallableDelay и FutureTask. После этого формируется пул для трех потоков сервиса типа ExecutorService и потоки стартуют методом execute сервиса executor. В цикле выполняются различного рода проверки : завершения работы потока методом isDone(), ожидания завершения потока методом get() с временны́ми параметрами и отмены выполнения потока методом isCancelled().
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutionException;
public class FutureTaskExample
{
CallableDelay[] callable = null;
FutureTask<String>[] futureTask = null;
ExecutorService executor = null;
private final int THREAD_COUNT = 3;
//-----------------------------------------------------
class CallableDelay implements Callable<String>
{
private long delay;
private int idx ;
private int cycle;
public CallableDelay(int delay, int idx)
{
this.delay = delay;
this.idx = idx;
this.cycle = idx;
}
@Override
public String call() throws Exception
{
while (cycle > 0) {
Thread.sleep(delay);
cycle--;
if ((idx == 2) && (cycle > 0))
futureTask[futureTask.length - 1]
.cancel(true);
}
/*
* Идентификатор и наименование потока,
* выполняющего данную callable задачу
*/
return ""+ idx + ". " +
Thread.currentThread().getName();
}
}
//-----------------------------------------------------
private boolean areTasksDone()
{
boolean isDone = true;
for (int i = 0; i < THREAD_COUNT; i++) {
if (!futureTask[i].isDone()) {
isDone = false;
break;
}
}
return isDone;
}
//-----------------------------------------------------
FutureTaskExample ()
{
callable = new CallableDelay[THREAD_COUNT];
futureTask = new FutureTask [THREAD_COUNT];
// Сервис исполнения
executor=Executors.newFixedThreadPool(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
callable [i] = new CallableDelay(1000, (i + 1));
futureTask[i] = new FutureTask<String>(
callable[i]);
executor.execute(futureTask[i]);
}
// Цикл работы executor'а
while (true) {
try {
if (areTasksDone()) {
// Завершение работы executor'а
executor.shutdown();
System.out.println("\nexecutor shutdown");
return;
}
// Проверка завершения выполнения задачи 1-м
// потоком
if (!futureTask[0].isDone())
System.out.println(
"1-ый поток завершен : "
+ futureTask[0].get());
System.out.println(
"Ожидание завершения 2-го потока");
String txt = futureTask[1].get(200L,
TimeUnit.MILLISECONDS);
if(txt != null)
System.out.println(
"2-ой поток завершен : " + txt);
System.out.println(
"Проверка завершения 3-го потока");
if (futureTask[2].isCancelled())
System.out.println(
"3-ой поток был прерван ...");
else if (!futureTask[2].isDone()) {
txt = futureTask[2].get();
System.out.println(
"3-ий поток завершен : "+txt);
}
Thread.sleep(200);
} catch (InterruptedException |
ExecutionException e) {
System.err.println(e.getMessage());
} catch(TimeoutException e){
/*
* 2-ой поток вызывает TimeoutException,
* если задача не завершена за указанное
* время
*/
System.err.println("TimeoutException");
}
}
}
//-----------------------------------------------------
public static void main(String[] args)
{
new FutureTaskExample();
}
}
Результат работы примераПри выполнении задачи информационные сообщения, представленные ниже, выводятся в консоль. Согласно последовательности вывода сообщений можно сказать, что при вызове метода isDone() объекта FutureTask 1-го потока программа перешла в режим ожидания завершения работы потока. После завершения выполнения 1-го потока программа перешла к проверке 2-го потока методом get() с временно́й задержкой.Так как за предоставленное время поток не смог завершить работу, то был вызван TimeoutException и цикл проверки повторился снова. Только после завершения работы 2-го потока программа перешла к проверке отмены/завершения 3-го потока. Метод isCancelled() подтвердил, что выполнение потока было прервано. Только после этого метод areTasksDone() подтвердил, что все потоки завершили выполнение и цикл проверок был прерван, сервис executor завершил выполнение методом shutdown(). 1-ый поток завершен : 1. pool-1-thread-1 Ожидание завершения 2-го потока TimeoutException Ожидание завершения 2-го потока TimeoutException Ожидание завершения 2-го потока TimeoutException Ожидание завершения 2-го потока TimeoutException Ожидание завершения 2-го потока 2-ой поток завершен : 2. pool-1-thread-2 Проверка завершения 3-го потока 3-ий поток был прерван ... executor shutdown Именование потоковСтандартная схема именования потока соответствует формату pool-N-thread-M, где N обозначает последовательный номер пула, а M – порядковый номер потока в пуле. Так наименование pool-1-thread-2 означает второй поток в первом пуле жизненного цикла JVM. Каждый раз, когда создается новый пул, глобальный счетчик N инкрементится. |
