Описание и пример ExecutorService

В многопоточный пакет concurrent для управления потоками включено средство, называемое сервисом исполнения ExecutorService. Данное средство служит альтернативой классу Thread, предназначенному для управления потоками. В основу сервиса исполнения положен интерфейс Executor, в котором определен один метод :

void execute(Runnable thread);

При вызове метода execute исполняется поток thread. То есть, метод execute запускает указанный поток на исполнение. Следующий код показывает, как вместо обычного старта потока Thread.start() можно запустить поток с использованием сервиса исполнения :

// Вместо следующего кода
new Thread(new RunnableTask()).start();

// можно использовать 
ExecutorService executor;
. . .
executor.execute(new CallableSample1());
Future<String> f1 = executor.submit(new CallableSample2());

При запуске задач с помощью Executor пакета java.util.concurrent не требуется прибегать к низкоуровневой поточной функциональности класса Thread, достаточно создать объект типа ExecutorService с нужными свойствами и передать ему на исполнение задачу типа Callable. Впоследствии можно легко просмотреть результат выполнения этой задачи с помощью объекта Future.

Интерфейс ExecutorService расширяет свойства Executor, дополняя его методами управления исполнением и контроля. Так в интерфейс ExecutorService включен метод shutdown(), позволяющий останавливать все потоки исполнения, находящиеся под управлением экземпляра ExecutorService. Также в интерфейсе ExecutorService определяются методы, которые запускают потоки исполнения FutureTask, возвращающие результаты и позволяющие определять статус остановки.

Методы интерфейса ExecutorService

МетодОписание
boolean awaitTermination(long timeout, TimeUnit unit) Блокировка до тех пор, пока все задачи не завершат выполнение после запроса на завершение работы или пока не наступит тайм-аут или не будет прерван текущий поток, в зависимости от того, что произойдет раньше
List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks) Выполнение задач с возвращением списка задач с их статусом и результатами завершения
List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) Выполнение задач с возвращением списка задач с их статусом и результатами завершения в течение заданного времени
T invokeAny(Collection<? extends Callable<T>> tasks) Выполнение задач с возвращением результата успешно выполненной задачи (т. е. без создания исключения), если таковые имеются
T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) Выполнение задач в течение заданного времени с возвращением результата успешно выполненной задачи (т. е. без создания исключения), если таковые имеются
boolean isShutdown() Возвращает true, если исполнитель сервиса остановлен (shutdown)
boolean isTerminated() Возвращает true, если все задачи исполнителя сервиса завершены по команде остановки (shutdown)
void shutdown() Упорядоченное завершение работы, при котором ранее отправленные задачи выполняются, а новые задачи не принимаются
List<Runnable> shutdownNow() Остановка всех активно выполняемых задач, остановка обработки ожидающих задач, возвращение списка задач, ожидающих выполнения
Future<T> submit(Callable<T> task) Завершение выполнения задачи, возвращающей результат в виде объекта Future
Future<?> submit(Runnable task) Завершение выполнения задачи, возвращающей объект Future, представляющий данную задачу
Future<T> submit(Runnable task, T result) Завершение выполнения задачи, возвращающей объект Future, представляющий данную задачу

Наибольший интерес в интерфейсе ExecutorService представляет метод submit(), который ставит задачу в очередь на выполнение. В качестве входного параметра данный метод принимает объект типа Callable или Runnable, а возвращает параметризованный объект типа Future, который можно использовать для доступа к результату выполнения задачи. Метод call соответствующего Callable-объекта возвращает объект Future. С использованием объекта Future можно определить завершение выполнения задачи (метод isDone()) и получить доступ к результату (метод get) или исключительной ситуации, если в процессе выполнения задачи произошла ошибка.

Стоит обратить внимание на метод shutdown(), который выполняет остановку объекта ExecutorService. Поскольку потоки в объекте ExecutorService не останавливаются сами, как обычно, поэтому их необходимо явно остановить с помощью данного метода; при этом, если в ExecutorService находятся невыполненные задачи, то потоки будут остановлены только, когда завершится последняя задача.

Пакет Concurrent включает интерфейс ScheduledExecutorService, расширяющий интерфейс ExecutorService для поддержки планирования потоков исполнения. Кроме этого, в пакет включены три предопределенных класса исполнителей: ThreadPoolExecutor, ScheduledThreadPoolExecutor и ForkJoinPool.

ThreadPoolExecutor реализует интерфейс ExecutorService и обеспечивает поддержку управляемого пула потоков исполнения. Класс ScheduledThreadPoolExecutor реализует интерфейс ScheduledExecutorService для поддержки планирования пула потоков исполнения. А класс ForkJoinPool реализует интерфейс ExecutorService и применяется в каркасе Fork/Join Framework.

Пример использования ExecutorService

Рассмотрим пример использования ExecutorService. В примере создадим фиксированный пул из двух потоков исполнения executor и четыре потока. Имплементации потоков в качестве параметра принимают объект синхронизации потоков CountDownLatch, так называемую «защелку», и текстовую строку. Сервис executor стартует все 4 потока на исполнение. Таким образом, четыре потока должны совместно использовать пул из двух потоков. Первые попавшие в пул потоки приступают к исполнению. Оставшие переходят в режим ожидания и вступают в работу по мере освобождения пула.

«Защелки» CountDownLatch используются для того, чтобы раньше времени «не выскочить» на команду завершения выполнения shutdown. Методы защелок await тормозят этот выход, переводя программу в ожидание завершения работы потока. После того, как все задачи будут выполнены, пул закрывается и программа завершает свою работу.

Вызов метода shutdown очень важен. Если его не использовать, то программа не смогла бы завершиться, поскольку исполнитель оставался бы активным. В этом можно убедиться, закомментировав вызов метода shutdown.

Результаты выполнения примера представлены ниже.

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;

public class ExecutorServiceExample
{
    SimpleDateFormat     sdf   = null;
    private  final  int  COUNT = 5;

    ExecutorServiceExample()
    {
        sdf = new SimpleDateFormat("HH:mm:ss.S");

        CountDownLatch cdl1 = new CountDownLatch(COUNT);
        CountDownLatch cdl2 = new CountDownLatch(COUNT);
        CountDownLatch cdl3 = new CountDownLatch(COUNT);
        CountDownLatch cdl4 = new CountDownLatch(COUNT);

        ExecutorService executor;
        executor = Executors.newFixedThreadPool(2);

        System.out.println("Запуск потоков");
        executor.execute(new MyThread(cdl1, "Thread.1"));
        executor.execute(new MyThread(cdl2, "Thread.2"));
        executor.execute(new MyThread(cdl3, "Thread.3"));
        executor.execute(new MyThread(cdl4, "Thread.4"));

        try {
            cdl1.await();
            cdl2.await();
            cdl3.await();
            cdl4.await();
        } catch(InterruptedException exc) { }

        executor.shutdown();
        System.out.println("Завершение потоков");
    }
    //-------------------------------------------------
    void printMessage(final String templ)
    {
        String text = sdf.format(new Date())+" : "+templ;
        System.out.println(text);
    }
    //-------------------------------------------------
    class MyThread implements Runnable 
    {
        String         name;
        CountDownLatch latch;
        MyThread(CountDownLatch c, String n) 
        {
            latch = c;
            name = n;
            new Thread(this);
        }

        public void run() 
        {
            try {
                for(int i = 0; i < COUNT; i++) {
                    printMessage(name + " - " + i);
                    latch.countDown();
                    Thread.sleep((long)(Math.random()*1500));
                }
                printMessage(name + " completed");
            } catch (InterruptedException e) {}
        }
    }
    //-------------------------------------------------
   public static void main(String args[]) 
    {
        new ExecutorServiceExample(); 
    }
}

Вывод результатов деятельности потоков в консоль

В методе run в цикле в консоль выводится текст строки, номер цикла и увеличивается счетчик «защелки». Между циклами – небольшая задержка. После завершения всех циклов в консоль выводится соответствующее сообщение.

Метод «защелки» countDown увеличивает счетчик. При достижении счетчиком порогового значения (COUNT) метод await «снимает защелку». После того, как «защелки» всех потоков сняты, пул закрывается.

Обратите внимание, что, как только открылась последняя защелка, сначала сервис исполнения успел завершить работу, а после этого поток. Попробуйте самостоятельно закомментировать код с ожиданием открытия защелок (методы await) и запустить пример на исполнения. Вы должны увидеть, что строки запуска и завершения потоков окажутся рядом, но сервис executor все равно выполнит задачи всех четырех потоков.


Запуск потоков
14:46:54.048 : Thread.2 - 0
14:46:54.048 : Thread.1 - 0
14:46:55.070 : Thread.2 - 1
14:46:55.118 : Thread.1 - 1
14:46:55.286 : Thread.2 - 2
14:46:56.078 : Thread.1 - 2
14:46:56.112 : Thread.1 - 3
14:46:56.152 : Thread.2 - 3
14:46:56.261 : Thread.2 - 4
14:46:56.356 : Thread.2 completed
14:46:56.357 : Thread.3 - 0
14:46:56.850 : Thread.3 - 1
14:46:57.632 : Thread.1 - 4
14:46:58.071 : Thread.3 - 2
14:46:58.468 : Thread.3 - 3
14:46:58.595 : Thread.3 - 4
14:46:59.039 : Thread.1 completed
14:46:59.039 : Thread.4 - 0
14:47:00.023 : Thread.3 completed
14:47:00.488 : Thread.4 - 1
14:47:01.982 : Thread.4 - 2
14:47:03.044 : Thread.4 - 3
14:47:03.975 : Thread.4 - 4
Завершение потоков
14:47:04.679 : Thread.4 completed
 
  Рейтинг@Mail.ru