Синхронизаторы пакета concurrent

Объекты синхронизации Synchroniser из пакета java.util.concurrent включают :

Semaphore — объект синхронизации, ограничивающий количество потоков, которые могут «войти» в заданный участок кода;
CountDownLatch — объект синхронизации, разрешающий вход в заданный участок кода при выполнении определенных условий;
CyclicBarrier — объект синхронизации типа «барьер», блокирующий выполнение определенного кода для заданного количества потоков;
Exchanger — объект синхронизации, позволяющий провести обмен данными между двумя потоками;
Phaser — объект синхронизации типа «барьер», но в отличие от CyclicBarrier, предоставляет больше гибкости.

Объект синхронизации Semaphore

Как было отмечено выше, Semaphore ограничивает доступ к определенному участку кода, иначе говоря, к общему ресурсу, в качестве которого могут выступать программые/аппаратные ресурсы или файловая система.

Для управления доступом к общему ресурсу Semaphore использует счетчик. Если значение счетчика больше нуля, то поток исполнения получает разрешение, после чего значение счетчика семафора уменьшается на единицу. При значении счетчика равным нулю очередному потоку исполнения в доступе будет отказано, и он будет заблокирован до тех пор, пока не будут освобождены ресурсы.

Как только один из потоков исполнения освободит ресурсы, т.е. завершит исполнение определенного участка кода, то значение счетчика семафора увеличивается на единицу. Если в это время имеется ожидающий разрешения другой поток исполнения, то он сразу же его получает.

Конструкторы Semaphore

Класс Semaphore имеет два приведенных ниже конструктора :

Semaphore(int permits);
Semaphore(int permits, boolean fair);

Параметр permits определяет исходное значение счетчика разрешений, т.е. количество потоков исполнения, которым может быть одновременно предоставлен доступ к общему ресурсу. По умолчанию ожидающим потокам предоставляется разрешение в неопределенном порядке. Если же использовать второй конструктор и параметру справедливости fair присвоить значение true, то разрешения будут предоставляться ожидающим потокам исполнения в том порядке, в каком они его запрашивали.

Метод получения разрешения acquire

Чтобы получить у семафора разрешение необходимо вызвать у него один из перегруженных методов acquire :

void acquire()           throws InterruptedException
void acquire(int number) throws InterruptedException

Первый метод без параметра запрашивает одно разрешение, а второй в качестве параметра использует количество разрешений. Обычно используется первый метод. Если разрешение не будет получено, то исполнение вызывающего потока будет приостановлено до тех пор, пока не будет получено разрешение, т.е. поток блокируется.

Освобождение ресурса

Чтобы освободить разрешение у семафора следует вызвать у него один из перегруженных методов release :

void release()
void release(int number)

В первом методе освобождается одно разрешение, а во втором — количество разрешений, обозначенное параметром number.

Полное англоязычное описание Semaphore

Пример использования Semaphore

В примере несколько всадников с лошадьми должны пройти контроль перед скачками. Количество контроллеров меньше количества всадников, поэтому некоторые всадники будут дожидаться, пока не освободиться один из контроллеров.

Общий ресурс CONTROL_PLACES, символизирующий контролеров и используемый всеми потоками, выделен оператором synchronized. С помощью семафора осуществляется контроль доступа только одному потоку.

import java.util.concurrent.Semaphore;

public class SemaphoreExample
{
    private static final int COUNT_CONTROL_PLACES = 5;
    private static final int COUNT_RIDERS         = 7;
    // Флаги мест контроля
    private static boolean[] CONTROL_PLACES = null;
    
    // Семафор
    private static Semaphore SEMAPHORE = null;

    public static class Rider implements Runnable 
    {
        private int ruderNum;

        public Rider(int ruderNum)  {
            this.ruderNum = ruderNum;
        }

        @Override
        public void run() {
            System.out.printf("Всадник %d подошел к зоне контроля\n", 
                               ruderNum);
            try {
                // Запрос разрешения
                SEMAPHORE.acquire();
                System.out.printf("\tвсадник %d проверяет наличие 
                                 свободного контроллера\n", ruderNum);
                int controlNum = -1;
                // Ищем свободное место и подходим к контроллеру
                synchronized (CONTROL_PLACES){
                    for (int i = 0; i < COUNT_CONTROL_PLACES; i++)
                        // Есть ли свободные контроллеры?
                        if (CONTROL_PLACES[i]) {
                             // Занимаем место
                            CONTROL_PLACES[i] = false;
                            controlNum = i;
                            System.out.printf("\t\tвсадник %d подошел
                                   к контроллеру %d.\n", ruderNum, i);
                            break;
                        }
                }

                // Время проверки лошади и всадника
                Thread.sleep((int) (Math.random() * 10 + 1) * 1000);

                // Освобождение места контроля
                synchronized (CONTROL_PLACES) {
                    CONTROL_PLACES[controlNum] = true;
                }
                
                // Освобождение ресурса
                SEMAPHORE.release();
                System.out.printf("Всадник %d завершил проверку\n", 
                                   ruderNum);
            } catch (InterruptedException e) {}
        }
    }
    public static void main(String[] args) 
                                     throws InterruptedException
    {
        // Определяем количество мест контроля
        CONTROL_PLACES = new boolean[COUNT_CONTROL_PLACES];
        // Флаги мест контроля [true-свободно, false-занято]
        for (int i = 0; i < COUNT_CONTROL_PLACES; i++)
            CONTROL_PLACES[i] = true;
        /*
         *  Определяем семафор со следующими параметрами : 
         *  - количество разрешений 5
         *  - флаг очередности fair=true (очередь first_in-first_out)
         */
        SEMAPHORE = new Semaphore(CONTROL_PLACES.length, true);
        
        for (int i = 1; i <= COUNT_RIDERS; i++) {
            new Thread(new Rider(i)).start();
            Thread.sleep(400);
        }
    }
}

Результат выполнения примера с семафором

Обратите внимание, что «всадник 3» завершил проверку, после чего «всадник 6» вошел в блокируемый участок кода. А вот «всадник 7» успел раньше вывести сообщение о входе в блокируемый участок кода, чем «всадник 1» сообщил о его «покидании».


Всадник 1 подошел к зоне контроля
	всадник 1 проверяет наличие свободного контроллера
		всадник 1 подошел к контроллеру 0.
Всадник 2 подошел к зоне контроля
	всадник 2 проверяет наличие свободного контроллера
		всадник 2 подошел к контроллеру 1.
Всадник 3 подошел к зоне контроля
	всадник 3 проверяет наличие свободного контроллера
		всадник 3 подошел к контроллеру 2.
Всадник 4 подошел к зоне контроля
	всадник 4 проверяет наличие свободного контроллера
		всадник 4 подошел к контроллеру 3.
Всадник 5 подошел к зоне контроля
	всадник 5 проверяет наличие свободного контроллера
		всадник 5 подошел к контроллеру 4.
Всадник 6 подошел к зоне контроля
Всадник 7 подошел к зоне контроля
Всадник 3 завершил проверку
	всадник 6 проверяет наличие свободного контроллера
		всадник 6 подошел к контроллеру 2.
	всадник 7 проверяет наличие свободного контроллера
Всадник 1 завершил проверку
		всадник 7 подошел к контроллеру 0.
Всадник 2 завершил проверку
Всадник 6 завершил проверку
Всадник 4 завершил проверку
Всадник 5 завершил проверку
Всадник 7 завершил проверку
 

Объект синхронизации CountDownLatch

Объект синхронизации потоков CountDownLatch представляет собой «защелку с обратным отсчетом» : несколько потоков, выполняя определенный код, блокируются до тех пор, пока не будут выполнены заданные условия. Количество условий определяются счетчиком. Как только счетчик обнулится, т.е. будут выполнены все условия, самоблокировки выполняемых потоков снимаются, и они продолжают выполнение кода.

Таким образом, CountDownLatch также, как и Semaphore, работает со счетчиком, обнуление которого снимает самоблокировки выполняемых потоков. Конструктор CountDownLatch :

CountDownLatch(int number);

Параметр number определяет количество событий, которые должны произойти до того момента, когда будет снята самоблокировка.

Метод самоблокировки await

CountDownLatch имеет два перегруженных метода await для самоблокировки :

void await() throws InterruptedException;
boolean await(long wait, TimeUnit unit) throws InterruptedException;

В первом методе ожидание длится до тех пор, пока счетчик CountDownLatch не достигнет нуля. А во втором методе ожидание длится только в течение определенного периода времени, определяемого параметром ожидание wait. Время ожидания указывается в единицах unit объекта перечисления TimeUnit, определяюший временно́е разбиение. Существуют следующие значения данного перечисления :

  • DAYS
  • HOURS
  • MINUTES
  • SECONDS
  • MICROSECONDS
  • MILLISECONDS
  • NANOSECONDS

Метод уменьшения счетчика countDown

Чтобы уменьшить счетчик объекта CountDownLatch следует вызвать метод countDown :

void countDown();

Примером CountDownLatch может служить паром, собирающий определенное количество транспорта и пассажиров, или экскурсовод, собирающий группу из заданного количества туристов.

Полное англоязычное описание CountDownLatch

Пример использования CountDownLatch

В примере несколько всадников должны подъехать к барьеру. Как только все всадники выстроятся перед барьером, будут даны команды «На старт», «Внимание», «Марш». После этого барьер опустится и начнутся скачки. Объект синхронизации CountDownLatch выполняет роль счетчика количества всадников и команд.

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample
{
    // Количество всадников
    private static final int RIDERS_COUNT = 5;
    // Объект синхронизации
    private static CountDownLatch LATCH;
    // Условная длина трассы
    private static final int trackLength = 3000;

    public static class Rider implements Runnable 
    {
        private int riderNumber; // номер всадника
        private int riderSpeed ; // скорость всадника постоянная

        public Rider(int riderNumber, int riderSpeed) 
        {
            this.riderNumber = riderNumber;
            this.riderSpeed  = riderSpeed;
        }

        @Override
        public void run() {
            try {
                System.out.printf(
                          "Всадник %d вышел на старт\n", 
                          riderNumber);
                //  Уменьшаем счетчик на 1
                LATCH.countDown();
                
                // Метод await блокирует поток до тех пор, пока
                // счетчик CountDownLatch не обнулится
                LATCH.await();
                // Ожидание, пока всадник не преодолеет трассу
                Thread.sleep(trackLength / riderSpeed * 10);
                System.out.printf(
                          "Всадник %d финишировал\n", riderNumber);
            } catch (InterruptedException e) {}
        }
    }
    public static Rider createRider(final int number)
    {
        return new Rider(number, (int) (Math.random() * 10 + 5));
    }

    public static void main(String[] args) 
                             throws InterruptedException
    {
        // Определение объекта синхронизации
        LATCH = new CountDownLatch(RIDERS_COUNT + 3);
        // Создание потоков всадников
        for (int i = 1; i <= RIDERS_COUNT; i++) {
            new Thread (createRider(i)).start();
            Thread.sleep(1000);
        }

        // Проверка наличия всех всадников на старте
        while (LATCH.getCount() > 3)
            Thread.sleep(100);

        Thread.sleep(1000);
        System.out.println("На старт!");
        LATCH.countDown();  // Уменьшаем счетчик на 1
        Thread.sleep(1000);
        System.out.println("Внимание!");
        LATCH.countDown(); // Уменьшаем счетчик на 1
        Thread.sleep(1000);
        System.out.println("Марш!");
        LATCH.countDown(); // Уменьшаем счетчик на 1
        
        // Счетчик обнулен, и все ожидающие этого события 
        // потоки разблокированы
    }
}

При «выходе на старт» поток вызывает методы countDown, уменьшая значение счетчика на 1, и await, блокируя самого себя в ожидании обнуления счетчика «защелки». Как только все потоки выстроятся на «старте» с интервалом в 1 сек. подаются команды. Каждая команда сопровождается уменьшением счетчика. После обнуления счетчика «защелки» потоки продолжают выполнение дальнейшего кода.

Результат выполнения примера


Всадник 1 вышел на старт
Всадник 2 вышел на старт
Всадник 3 вышел на старт
Всадник 4 вышел на старт
Всадник 5 вышел на старт
На старт!
Внимание!
Марш!
Всадник 4 финишировал
Всадник 3 финишировал
Всадник 1 финишировал
Всадник 2 финишировал
Всадник 5 финишировал
 

Объект синхронизации CyclicBarrier

Объект синхронизации CyclicBarrier представляет собой барьерную синхронизацию, используемую, как правило, в распределённых вычислениях. Особенно эффективно использование барьеров при циклических расчетах. При барьерной синхронизации алгоритм расчета делят на несколько потоков. С помощью барьера организуют точку сбора частичных результатов вычислений, в которой подводится итог этапа вычислений.

В исходном коде барьер для группы потоков означает, что каждый поток должен остановиться в определенном месте и ожидать прихода остальных потоков группы. Как только все потоки достигнут барьера, их выполнение продолжится.

Класс CyclicBarrier имеет 2 конструктора :

CyclicBarrier(int count);
CyclicBarrier(int count, Runnable class);

В первом конструкторе задается количество потоков, которые должны достигнуть барьера, чтобы после этого одновременно продолжить выполнение кода. Во втором конструкторе дополнительно задается реализующий интерфейс Runnable класс, который должен быть запущен после прихода к барьеру всех потоков. Поток запускать самостоятельно НЕ НУЖНО. CyclicBarrier это делает автоматически.

Для указания потоку о достижении барьера нужно вызвать один из перегруженных методов await :

void await() throws InterruptedException 
boolean await(long wait, TimeUnit unit) throws InterruptedException;

Назначение параметров wait и unit у второго метода описано выше (см. CountDownLatch).

Циклический барьер CyclicBarrier похож на CountDownLatch. Главное различие между ними связано с тем, что «защелку» нельзя использовать повторно после того, как её счётчик обнулится, а барьер можно использовать (в цикле). С точки зрения API циклический барьер CyclicBarrier имеет только метод самоблокировки await и не имеет метода декрементации счетчика, а также позволяет подключить и автоматически запускать дополнительный потоковый класс при достижении барьера всех исполняемых потоков.

Полное англоязычное описание CyclicBarrier

Пример использования CyclicBarrier

В примере организуется переправа. Паром может вместить только 3 автомобиля. Количество автомобилей 9. Роль парома выполняет объект синхронизации FerryBarrier, которому в качестве второго параметра передается реализующий интерфейс Runnable класс FerryBoat. Как только 3 потока достигнут барьера автоматически будет запущен FerryBoat, после завершения работы которого потоки продолжают свою работу.

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample 
{
    private static CyclicBarrier FerryBarrier;
    private static final int FerryBoat_size = 3;

    // Переправляющий автомобили паром
    public static class FerryBoat implements Runnable 
    {
        @Override
        public void run() {
            try {
                // Задержка на переправе 
                System.out.println("\nЗагрузка автомобилей");
                Thread.sleep(500);
                System.out.println("Паром переправил автомобили\n");
            } catch (InterruptedException e) {}
        }
    }

    // Класс автомобиля
    public static class Car implements Runnable
    {
        private int carNumber;

        public Car(int carNumber) {
            this.carNumber = carNumber;
        }

        @Override
        public void run() {
            try {
                System.out.printf(
                   "К переправе подъехал автомобиль %d\n", carNumber);
                // Вызов метода await при подходе к барьеру; поток 
                // блокируется в ожидании прихода остальных потоков
                FerryBarrier.await();
                System.out.printf(
                   "Автомобиль %d продолжил движение\n", carNumber);
            } catch (Exception e) {}
        }
    }

    public static void main(String[] args) 
                            throws InterruptedException {
        FerryBarrier = new CyclicBarrier(FerryBoat_size,
                                         new FerryBoat());
        for (int i = 1; i < 10; i++) {
            new Thread(new Car(i)).start();
            Thread.sleep(400);
        }
    }
}

Обратите внимание, что потоки подходят к барьеру с интервалом в 400 ms. Время задержки у барьера/переправы (после того, как собралось необходимое количество потоков), составляет 500 ms, если не считать время вывода сообщений в консоль. За это время к барьеру успевает подойти еще один поток. Что мы и видим при выводе сообщений в консоль.

Результат выполнения примера

Варьируя временем на переправе и временем прихода автомобилей на переправу, можно либо заставить паром простаивать, либо будут простаивать автомобили на переправе.


К переправе подъехал автомобиль 1
К переправе подъехал автомобиль 2
К переправе подъехал автомобиль 3

Загрузка автомобилей
К переправе подъехал автомобиль 4
Паром переправил автомобили

Автомобиль 3 продолжил движение
Автомобиль 1 продолжил движение
Автомобиль 2 продолжил движение
К переправе подъехал автомобиль 5
К переправе подъехал автомобиль 6

Загрузка автомобилей
К переправе подъехал автомобиль 7
Паром переправил автомобили

Автомобиль 6 продолжил движение
Автомобиль 5 продолжил движение
Автомобиль 4 продолжил движение
К переправе подъехал автомобиль 8
К переправе подъехал автомобиль 9

Загрузка автомобилей
Паром переправил автомобили

Автомобиль 7 продолжил движение
Автомобиль 9 продолжил движение
Автомобиль 8 продолжил движение
 

Объект синхронизации Exchanger

Класс Exchanger (обменник) предназначен для упрощения процесса обмена данными между двумя потоками исполнения. Принцип действия класса Exchanger связан с ожиданием того, что два отдельных потока должны вызвать его метод exchange. Как только это произойдет, Exchanger произведет обмен данными, предоставляемыми обоими потоками.

Обменник является обобщенным классом, он параметризируется типом объекта передачи :

Exchanger<V>();

Необходимо отметить, что обменник поддерживает передачу NULL значения, что дает возможность использовать его для передачи объекта в одну сторону или места синхронизации двух потоков.

Exchanger содержит перегруженный метод exchange, имеющий следующие формы :

V exchange(V buffer) throws InterruptedException;
V exchange(V buffer, long wait, TimeUnit unit)
                     throws InterruptedException;

Параметр buffer является ссылкой на обмениваемые данные. Метод возвращает данные из другого потока исполнения. Вторая форма метода позволяет определить время ожидания. Параметры wait и unit описаны выше. Метод exchange, вызванный в одном потоке, не завершится успешно до тех пор, пока он не будет вызван из второго потока исполнения.

Полное англоязычное описание Exchanger

Пример использования Exchanger

В примере использования объекта синхронизации Exchanger два почтальона из пунктов А и Б отправляются в соседние поселки В и Г доставить письма. Каждый из почтальонов должен доставить по письму в каждый из поселков. Чтобы не делать лишний круг, они встречаются в промежуточном поселке Д и обмениваются одним письмом. В результате этого каждому из почтальонов придется доставить письма только в один поселок. В примере все «шаги» почтальонов фиксируются выводом соответствующих сообщений в консоль.

import java.util.concurrent.Exchanger;

public class ExchangerExample 
{
    // Обменник почтовыми письмами
    private static Exchanger<Letter> EXCHANGER;

    static String msg1 = "Почтальон %s получил письма : %s, %s\n";
    static String msg2 = "Почтальон %s выехал из %s в %s\n";
    static String msg3 = "Почтальон %s приехал в пункт Д\n";
    static String msg4 = "Почтальон %s получил письма для %s\n";
    static String msg5 = "Почтальон %s привез в %s : %s, %s\n";
    
    public static class Postman implements Runnable
    {
        private String   id;
        private String   departure;
        private String   destination;
        private Letter[] letters;

        public Postman(String id, String departure, 
                       String destination, Letter[] letters) {
            this.id          = id;
            this.departure   = departure;
            this.destination = destination;
            this.letters     = letters;
        }

        @Override
        public void run() {
            try {
                System.out.printf(msg1, id, letters[0], letters[1]);
                System.out.printf(msg2, id, departure, destination);
                Thread.sleep((long) Math.random() * 5000 + 5000);
                System.out.printf(msg3, id);
                // Самоблокировка потока для обмена письмами
                letters[1] = EXCHANGER.exchange(letters[1]);
                // Обмен письмами
                System.out.printf(msg4, id, destination);
                Thread.sleep(1000 + (long) Math.random() * 5000);
                System.out.printf(msg5, id, destination, 
                                            letters[0], letters[1]);
            } catch (InterruptedException e) {}
        }
    }
    public static class Letter
    {
        private String address;
        public Letter(final String address) {
            this.address = address;
        }
        public String toString()
        {
            return address;
        }
    }
    public static void main(String[] args) 
                             throws InterruptedException 
    {
        EXCHANGER = new Exchanger<Letter>();
        // Формирование отправлений
        Letter[] posts1 = new Letter[2];
        Letter[] posts2 = new Letter[2]; 
		
        posts1[0] = new Letter("п.В - Петров"           ); 
        posts1[1] = new Letter("п.Г - Киса Воробьянинов");
        posts2[0] = new Letter("п.Г - Остап Бендер"     ); 
        posts2[1] = new Letter("п.В - Иванов"           );
        // Отправление почтальонов
        new Thread(new Postman("a", "А", "В", posts1)).start();
        Thread.sleep(100);
        new Thread(new Postman("б", "Б", "Г", posts2)).start();
    }
}

Результат выполнения примера

В консоль будет выведена следующая информация :


Почтальон a получил письма : п.В - Петров, п.Г - Киса Воробьянинов
Почтальон a выехал из А в В
Почтальон б получил письма : п.Г - Остап Бендер, п.В - Иванов
Почтальон б выехал из Б в Г
Почтальон a приехал в пункт Д
Почтальон б приехал в пункт Д
Почтальон б получил письма для Г
Почтальон a получил письма для В
Почтальон б привез в Г : п.Г - Остап Бендер, п.Г - Киса Воробьянинов
Почтальон a привез в В : п.В - Петров, п.В - Иванов
 

Объект синхронизации Phaser

Phaser (фазировщик), как и CyclicBarrier, является реализацией объекта синхронизации типа «Барьер» (CyclicBarrier). В отличии от CyclicBarrier, Phaser предоставляет больше гибкости. Чтобы лучше понять Phaser, можно привести два наглядно демонстрирующих его использование примера.

В качестве первого примера можно рассмотреть несколько потоков исполнения, реализующих процесс обработки заказов из трех стадий. На первой стадии отдельные потоки исполнения проверяют сведения о клиенте, наличие товара на складе и их стоимость. На второй стадии вычисляется стоимость заказа и стоимость доставки. На заключительной стадии подтверждается оплата и определяется ориентировочное время доставки. Во втором примере несколько потоков реализуют перевозку пассажиров городским транспортом. Пассажиры ожидают транспорт на разных остановках. Транспорт, останавливаясь на остановках, одних пассажиров «сажает», других «высаживает».

В этих примерах общим является то, что один объект синхронизации Phaser, исполняющий роль заказа и транспорта, играет главную роль, а другие потоки вступают в работу при определенном состоянии Phaser. Таким образом, класс Phaser позволяет определить объект синхронизации, ожидающий завершения определенной фазы. После этого он переходит к следующей фазе и снова ожидает ее завершения.

Важные особенности Phaser :

  1. Phaser может иметь несколько фаз (барьеров). Если количество фаз равно 1, то плавно переходим к CyclicBarrier (осталось только все исполнительные потоки остановить у барьера).
  2. Каждая фаза (цикл синхронизации) имеет свой номер.
  3. Количество участников-потоков для каждой фазы жестко не задано и может меняться. Исполнительный поток может регистрироваться в качестве участника и отменять свое участие;
  4. Исполнительный поток не обязан ожидать, пока все остальные участники соберутся у барьера. Достаточно только сообщить о своем прибытии.

Для создания объекта Phaser используется один из конструкторов :

Phaser();
Phaser(int parties);
Phaser(Phaser parent);
Phaser(Phaser parent, int parties);

Параметр parties определяет количество участников, которые должны пройти все фазы. Первый конструктор создает объект Phaser без каких-либо участников. Второй конструктор регистрирует передаваемое в конструктор количество участников. Третий и четвертый конструкторы дополнительно устанавливают родительский объект Phaser.

При создании экземпляр класса Phaser находится в нулевой фазе. В очередном состоянии (фазе) синхронизатор находится в ожидании до тех пор, пока все зарегистрированные потоки не завершат данную фазу. Потоки извещают об этом, вызывая один из методов arrive() или arriveAndAwaitAdvance().

Методы объекта синхронизации Phaser

МетодОписание
int register() Метод регистририрует участника и возвращает номер текущей фазы.
int arrive() Метод указывает на завершения выполнения текущей фазы и возвращает номер фазы. Если же работа Phaser закончена, то метод вернет отрицательное число. При вызове метода arrive поток не приостанавливается, а продолжает выполняться.
int arriveAndAwaitAdvance() Метод вызывается потоком/участником, чтобы указать, что он завершил текущую фазу. Это аналог метода CyclicBarrier.await(), сообщающего о прибытии к барьеру.
int arriveAndDeregister() Метод arriveAndDeregister сообщает о завершении всех фаз участником и снимается с регистрации. Данный метод возвращает номер текущей фазы или отрицательное число, если Phaser завершил свою работу
int getPhase() Получение номера текущей фазы.

Полное англоязычное описание Phaser

Пример использования Phaser

В примере PhaserExample создается несколько потоков, играющих роль пассажиров. Phaser играет роль метро, которое должно проследовать вдоль нескольких станций. Каждая станция (фаза) имеет свой номер. Класс Passenger играет роль пассажира, который на одной из станции должен зайти в вагон, а на другой выйти. Количество пассажиров, а также их места посадки и высадки, формируются случайным образом.

Листинг класса Passenger

Конструктор класса Passenger получает значение идентификатора, номера станций посадки и назначения (высадки). При создании объекта в консоль выводится информация о пассажире (метод toString).

Как только Phaser переходит в определенную фазу, номер которой соответствует станции посадки пассажира, то поток данного Passenger стартует (run) и выводит в консоль сообщение, что пассажир вошел в вагон, т.е. находится в ожидании следующей станции/фазы (arriveAndAwaitAdvance). Если следующая станция/фаза не будет соответствовать станции назначения, то Passenger продолжит свой путь. Как только Phaser перейдет в фазу, номер которой соответствует номеру станции назначания пассажира, то цикл контроля завершится и поток продолжит работу. С задержкой в 500 ms он сообщит, что покинул вагон и отменит регистрацию в Phaser (arriveAndDeregister).

Таким образом, поток/пассажир дожидается свой фазы/станции в цикле, выделенной в коде пунктирными комментариями. Вызов метода arriveAndAwaitAdvance возвращает значение следующего номера фазы, т.е. участник будет вызван при переходе Phaser в новое состояние. Если в этом состоянии значение фазы (getPhase) будет соответствовать номеру destination, то цикл прервется, в противном случае, ожидание следующей фазы и повторное выполнение проверки условия while.

private static String WAIT  = " ждёт на станции ";
private static String ENTER = " вошел в вагон"   ;
private static String EXIT  = " вышел из вагона ";
private static String SPACE = "    ";

public static class Passenger implements Runnable 
{
    int id;
    int departure;
    int destination;

    public Passenger(int id, int departure, int destination) 
    {
        this.id          = id;
        this.departure   = departure;
        this.destination = destination;
        System.out.println(this + WAIT + departure);
    }

    @Override
    public void run()  {
        try {
            System.out.println(SPACE + this + ENTER);
            //-----------------------------------------------
            // Заявляем об участии и ждем станции назначения
            while (PHASER.getPhase() < destination)
                PHASER.arriveAndAwaitAdvance();
            //----------------------------------------------
            Thread.sleep(500);
            System.out.println(SPACE + this + EXIT);
            // Отмена регистрации
            PHASER.arriveAndDeregister();
        } catch (InterruptedException e) {}
    }

    @Override
    public String toString()
    {
        return "Пассажир " + id + 
               " {" + departure + " -> " + destination + '}';
    }
}

Примечание : Passenger является внутренним классом примера/класса PhaserExample, и для описания вынесен из общего кода, чтобы не загромождать листинг.

Листинг примера PhaserExample

В примере сначала создается объект синхронизации PHASER. После этого формируется массив пассажиров. При создании объекта Passenger случайным образом определяются станция посадки и станция назначения. После того, как массив пассажиров подготовлен, PHASER в цикле начинает менять свое состояние. На каждом шаге выполняется проверка «станции посадки пассажира». Если она соответствует значению фазы, то данный пассажир входит в вагон метро, т.е. регистрируется в PHASER и поток стартует. Таким образом, регистрация участников (исполнительных потоков) выполняется при нахождении PHASER в определенном состоянии/фазе. Пассажир покинет вагон при достижении метро станции назначения, т.е. при нахождении PHASER в соответствующей фазе. Но это произойдет уже в коде класса Passenger, рассмотренного выше.

import java.util.ArrayList;
import java.util.concurrent.Phaser;

public class PhaserExample
{
    private static Phaser PHASER;

    private static String OPEN  = "    ... открытие дверей ...";
    private static String CLOSE = "    ... закрытие дверей ...";

    public static void main(String[] args) 
                                     throws InterruptedException
    {
        // Регистрация объекта синхронизации
        PHASER = new Phaser(1);

        ArrayList<Passenger> passengers = new ArrayList<>();
        // Формирование массива пассажиров
        for (int i = 1; i < 5; i++) {
            if ((int) (Math.random() * 2) > 0)
                // Этот пассажир проезжает одну станцию
                passengers.add(new Passenger(10 + i, i, i + 1));

            if ((int) (Math.random() * 2) > 0) {
                // Этот пассажир едет до конечной
                Passenger p = new Passenger(20 + i, i, 5);
               	passengers.add(p);
            }
        }

        // Фазы 0 и 6 - конечные станции 
        // Фазы 1...5 - промежуточные станции
        for (int i = 0; i < 7; i++) {
            switch (i) {
                case 0:
                    System.out.println("Метро вышло из тупика");
                    // Нулевая фаза, участников нет
                    PHASER.arrive();
                    break;
                case 6:
                    // Завершаем синхронизацию
                    System.out.println("Метро ушло в тупик");
                    PHASER.arriveAndDeregister();
                    break;
                default:
                    int currentStation = PHASER.getPhase();
                    System.out.println("Станция " + currentStation);
                    // Проверка наличия пассажиров на станции
                    for (Passenger pass : passengers)
                        if (pass.departure == currentStation) {
                            // Регистрация участника
                            PHASER.register();
                            new Thread (pass).start();
                        }
                    System.out.println(OPEN);

                    // Phaser ожидает завершения фазы всеми участниками
                    PHASER.arriveAndAwaitAdvance();

                    System.out.println(CLOSE);
            }
        }
    }

Результаты выполнения программы


Пассажир 11 {1 -> 2} ждёт на станции 1
Пассажир 21 {1 -> 5} ждёт на станции 1
Пассажир 12 {2 -> 3} ждёт на станции 2
Пассажир 23 {3 -> 5} ждёт на станции 3
Пассажир 14 {4 -> 5} ждёт на станции 4
Метро вышло из тупика
Станция 1
    ... открытие дверей ...
    Пассажир 11 {1 -> 2} вошел в вагон
    Пассажир 21 {1 -> 5} вошел в вагон
    ... закрытие дверей ...
Станция 2
    ... открытие дверей ...
    Пассажир 12 {2 -> 3} вошел в вагон
    Пассажир 11 {1 -> 2} вышел из вагона 
    ... закрытие дверей ...
Станция 3
    ... открытие дверей ...
    Пассажир 23 {3 -> 5} вошел в вагон
    Пассажир 12 {2 -> 3} вышел из вагона 
    ... закрытие дверей ...
Станция 4
    ... открытие дверей ...
    Пассажир 14 {4 -> 5} вошел в вагон
    ... закрытие дверей ...
Станция 5
    ... открытие дверей ...
    Пассажир 21 {1 -> 5} вышел из вагона 
    Пассажир 23 {3 -> 5} вышел из вагона 
    Пассажир 14 {4 -> 5} вышел из вагона 
    ... закрытие дверей ...
Метро ушло в тупик
 

Скачать примеры

Рассмотренные на странице примеры использования синхронизаторов Synchroniser's пакета java.util.concurrent в виде проекта Eclipse можно скачать здесь (21.3 Кб).

  Рейтинг@Mail.ru