Описание и пример FutureTask

Одной из «неприятных» задач многопоточного приложения является определение состояния потока (работает или нет, реагирует на внешние воздействия или нет) и останов потока. Если рассматривать обычный поток Thread, то с ним могут возникнуть такие проблемы. Ведь в Java (JDK 1.5 и выше) нет механизма останова потока. Как известно метод Thread.stop() объявлен как Deprecated поскольку «не потокобезопасен», а безопасная инструкция Thread.interrupt() только сообщает потоку о необходимости остановки. Но если данное сообщение проигнорировано, т.е. разработчик не вставил обработку, то и поток не остановится.

И вот здесь на помощь разработчику приходит пакет Concurrent со своими интерфейсами java.util.concurrent.Callable, java.util.concurrent.Future. Данные интерфейсы (подробно представленные на сайте здесь ), а также их различные реализации позволяют решать задачи «внешнего» управления потоками, т.е. не в методе выполнения потока run() или call(). В данной статье будет рассмотрен класс FutureTask и приведен пример контроля состояния потоков и останова одного потока из другого.

Класс FutureTask<V>

Класс-оболочка FutureTask базируется на конкретной реализации интерфейса Future. Чтобы создать реализацию данного класса необходим объект Callable; после этого можно использовать Java Thread Pool Executor для асинхронной обработки. Таким образом, FutureTask представляет удобный механизм для превращения Callable одновременно в Future и Runnable, реализуя оба интерфейса. Объект класса FutureTask может быть передан на выполнение классу, реализующему интерфейс Executor, либо запущен в отдельном потоке, как класс, реализующий интерфейс Runnable.

Если посмотреть в исходники класса FutureTask, то можно увидеть, что он реализует интерфейс RunnableFuture, который, в свою очередь, наследует свойства интерфейсов Runnable и Future<V>.

Конструкторы класса FutureTask<V>

Класс FutureTask<V> содержит следующие два конструктора :

/* 
 * Конструктор создания FutureTask, который
 * после старта выполнит callable
*/
FutureTask(Callable<V> callable)

/*
 * Конструктор создания FutureTask, который после старта выполнит
 * runnable и при успешном завершении вернет объект result
*/
FutureTask(Runnable runnable, V result)

Методы класса FutureTask<V>

МетодОписание
V get() получение результата выполнения потока; вызов метода блокирует дальнейшее выполнения до окончания вычислений
V get(long timeout, TimeUnit unit) получение результата до окончания вычислений или до истечения указанного интервала времени; если в течение указанного времени вычисления не завершились, то вызывается исключение TimeoutException
boolean cancel(boolean mayInterrupt) отмена выполнения задачи; если задача уже стартована и параметр mayInterrupt равен true, то она прерывается, в противном случае, если вычисления еще не начаты, то они и не начнутся. При успешной отмене выполнения задачи метод возвращает значение true
boolean isCancelled() метод возвращает true, если задача была отменена до ее нормального завершения
boolean isDone() метод возвращает true, если выполнение задачи завершено, прервано или если в процессе ее выполнения возникло исключение

Пример использования FutureTask

Рассмотрим простой пример использования FutureTask, в котором создается пул из 3-х потоков, реализующих интерфейс Callable в виде класса CallableDelay. Основная идея примера связана с проверками выполняемых потоков и отмены выполнения задачи одного из потоков.

Конструктор класса CallableDelay в качестве параметров получает временно́й размер задержки delay и идентификатор потока. В зависимости от значения идентификатора потока в методе call() выполняется соответствующее количество циклов с заданной задержкой, после чего поток завершает работу. Второй поток на первом цикле прерывает выполнение 3-го потока вызовом метода cancel. Метод call потока возвращает текстовый объект String в виде наименования потока.

Метод areTasksDone() проверяет завершение выполнения всех задач/потоков вызовом методов isDone() объектов futureTask. Если выполнение всех потоков завершены, то сервис executor останавливает свою работу методом shutdown().

В конструкторе примера создаются два массива из объектов типа CallableDelay и FutureTask. После этого формируется пул для трех потоков сервиса типа ExecutorService и потоки стартуют методом execute сервиса executor. В цикле выполняются различного рода проверки : завершения работы потока методом isDone(), ожидания завершения потока методом get() с временны́ми параметрами и отмены выполнения потока методом isCancelled().

import java.util.concurrent.TimeUnit;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutionException;

public class FutureTaskExample
{
    CallableDelay[]      callable     = null;
    FutureTask<String>[] futureTask   = null;
    ExecutorService      executor     = null;
    private final int    THREAD_COUNT = 3;    
    //---------------------------------------------------------------
    class CallableDelay implements Callable<String>
    {
        private  long  delay;
        private  int   idx  ;
        private  int   cycle;

        public CallableDelay(int delay, int idx)
        {
            this.delay = delay;
            this.idx   = idx;
            this.cycle = idx;
        }
        @Override
        public String call() throws Exception 
        {
            while (cycle > 0) {
                Thread.sleep(delay);
                cycle--;
                if ((idx == 2) && (cycle > 0))
                    futureTask[futureTask.length - 1].cancel(true);
            }
            /*
             *  Идентификатор и наименование потока, выполняющего
             *  данную callable задачу
             */
            return ""+ idx +". "+ Thread.currentThread().getName();
        }
    }
    //---------------------------------------------------------------
    private boolean areTasksDone()
    {
        boolean isDone = true;
        for (int i = 0; i < THREAD_COUNT; i++) {
            if (!futureTask[i].isDone()) {
                isDone = false;
                break;
            }
        }
        return isDone;
    }
    //---------------------------------------------------------------
    FutureTaskExample ()
    {
        callable   = new CallableDelay[THREAD_COUNT];
        futureTask = new FutureTask   [THREAD_COUNT];

        // Сервис исполнения
        executor = Executors.newFixedThreadPool(THREAD_COUNT);
        
        for (int i = 0; i < THREAD_COUNT; i++) {
            callable  [i] = new CallableDelay(1000, (i + 1));
            futureTask[i] = new FutureTask<String>(callable[i]);

            executor.execute(futureTask[i]);
        }
        // Цикл работы executor'а
        while (true) {
            try {
                if (areTasksDone()) {
                    // Завершение работы executor'а
                    executor.shutdown();
                    System.out.println("\nexecutor shutdown");
                    return;
                }
                // Проверка завершения выполнения задачи 1-м потоком
                if (!futureTask[0].isDone())
                    System.out.println("1-ый поток завершен : " 
                                              + futureTask[0].get());
                 
                System.out.println("Ожидание завершения 2-го потока");
                String txt = futureTask[1].get(200L, 
                                               TimeUnit.MILLISECONDS);
                if(txt != null)
                    System.out.println("2-ой поток завершен : "+txt);

                System.out.println("Проверка завершения 3-го потока");
                if (futureTask[2].isCancelled())
                    System.out.println("3-ой поток был прерван ...");
                else if (!futureTask[2].isDone()) {
                    txt = futureTask[2].get();
               	    System.out.println("3-ий поток завершен : "+txt);
                }
                Thread.sleep(200);
            } catch (InterruptedException | ExecutionException e) {
                System.err.println(e.getMessage());
            } catch(TimeoutException e){
                /*
                 *  2-ой поток вызывает TimeoutException, если задача
                 *  не завершена за указанное время
                 */
                System.err.println("TimeoutException");
            }
        }
    }
    //---------------------------------------------------------------
    public static void main(String[] args) 
    {
        new FutureTaskExample();
    }
}

Результат работы примера

При выполнении задачи информационные сообщения, представленные ниже, выводятся в консоль. Согласно последовательности вывода сообщений можно сказать, что при вызове метода isDone() объекта FutureTask 1-го потока программа перешла в режим ожидания завершения работы потока.

После завершения выполнения 1-го потока программа перешла к проверке 2-го потока методом get() с временно́й задержкой.Так как за предоставленное время поток не смог завершить работу, то был вызван TimeoutException и цикл проверки повторился снова.

Только после завершения работы 2-го потока программа перешла к проверке отмены/завершения 3-го потока. Метод isCancelled() подтвердил, что выполнение потока было прервано. Только после этого метод areTasksDone() подтвердил, что все потоки завершили выполнение и цикл проверок был прерван, сервис executor завершил выполнение методом shutdown().


1-ый поток завершен : 1. pool-1-thread-1
Ожидание завершения 2-го потока
TimeoutException

Ожидание завершения 2-го потока
TimeoutException

Ожидание завершения 2-го потока
TimeoutException

Ожидание завершения 2-го потока
TimeoutException

Ожидание завершения 2-го потока
2-ой поток завершен : 2. pool-1-thread-2
Проверка завершения 3-го потока
3-ий поток был прерван ...

executor shutdown
 

Именование потоков

Стандартная схема именования потока соответствует формату pool-N-thread-M, где N обозначает последовательный номер пула, а M – порядковый номер потока в пуле. Так наименование pool-1-thread-2 означает второй поток в первом пуле жизненного цикла JVM. Каждый раз, когда создается новый пул, глобальный счетчик N инкрементится.

  Рейтинг@Mail.ru