Блокирующие очереди пакета concurrent

Пакет java.util.concurrent включает классы для формирования блокирующих очередей с поддержкой многопоточности. Блокирующие очереди используются в тех случаях, когда нужно выполнить (проверить выполненение) какие-либо условия для продолжения потоками своей работы. Блокирующие очереди могут реализовывать интерфейсы BlockingQueue, BlockingDeque, TransferQueue. В пакете java.util.concurrent имеются следующие реализации блокирующих очередей :

  • ArrayBlockingQueue — очередь, реализующая классический кольцевой буфер;
  • LinkedBlockingQueue — односторонняя очередь на связанных узлах;
  • LinkedBlockingDeque — двунаправленная очередь на связанных узлах;
  • SynchronousQueue — блокирующую очередь без емкости (операция добавления одного потока находится в ожидании соответствующей операции удаления в другом потоке);
  • LinkedTransferQueue — реализация очереди на основе интерфейса TransferQueue;
  • DelayQueue — неограниченная блокирующая очередь, реализующая интерфейс Delayed;
  • PriorityBlockingQueue — реализация очереди на основе интерфейса PriorityQueue.

Использование очередей пакета java.util.concurrent может стать решением проблем взаимных блокировок и «голодания».

На странице представлены четыре примера использования блокирующих очередей :

  • пример с блокирующей очередью ArrayBlockingQueue;
  • пример с двунаправленной очередью LinkedBlockingDeque;
  • пример использования SynchronousQueue.
  • пример использования LinkedTransferQueue.

Интерфейс BlockingQueue

Интерфейс BlockingQueue определяет блокирующую очередь, наследующую свойства интерфейса Queue, в которой элементы хранятся в порядке «первый пришел, первый вышел» (FIFO – first in, first out). Реализация данного интерфейса обеспечивает блокировку потока в двух случаях :

  • при попытке получения элемента из пустой очереди;
  • при попытке размещения элемента в полной очереди.

Когда поток пытается получить элемент из пустой очереди, то он переводится в состояние ожидания до тех пор, пока какой-либо другой поток не разместит элемент в очереди. Аналогично при попытке положить элемент в полную очередь; поток ставится в ожидание до тех пор, пока другой поток не заберет элемент из очереди и, таким образом, не освободит место в ней. Естественно, понятие "полная очередь" подразумевает ограничение размера очереди.

BlockingQueue изящно решает проблему передачи собранных одним потоком элементов для обработки в другой поток без явных хлопот о проблемах синхронизации.

Основные методы интерфейса BlockingQueue

МетодОписание
boolean add(E e) Немедленное добавление элемента в очередь, если это возможно; метод возвращает true при благополучном завершении операции, либо вызывает IllegalStateException, если очередь полная.
boolean contains(Object o) Проверка наличия объекта в очереди; если объект найден в очереди метод вернет true.
boolean offer(E e) Немедленное размещение элемента в очереди при наличие свободного места; метод вернет true при успешном завершении операции, в противном случае вернет false.
boolean offer(E e, long timeout, TimeUnit unit) Размещение элемента в очереди при наличии свободного места; при необходимости определенное ожидание времени, пока не освободиться место.
E poll(long timeout, TimeUnit unit) Чтение и удаление элемента из очереди в течение определенного времени (таймаута).
void put(E e) Размещение элемента в очереди, ожидание при необходимости освобождения свободного места.
int remainingCapacity() Получения количества элементов, которое можно разместить в очереди без блокировки, либо Integer.MAX_VALUE при отсутствии внутреннего предела.
boolean remove(Object o) Удаление объекта из очереди, если он в ней присутствует.
E take() Получение с удалением элемента из очереди, при необходимости ожидание пока элемент не станет доступным.

BlockingQueue не признает нулевых элементов (null) и вызывает NullPointerException при попытке добавить или получить такой элемент. Нулевой элемент возвращает метод poll, если в течение таймаута не был размещен в очереди очередной элемент.

Методы BlockingQueue можно разделить на 4 группы, по-разному реагирующие на невозможность выполнения операции в текущий момент и откладывающие их выполнение на время : первые вызывают Exception, вторые возвращают определенное значение (null или false), третьи блокируют поток на неопределенное время до момента выполнения операции, четвертые блокируют поток на определенное время. Эти методы представлены в следующей таблице :

Вызывает Exception Чтение значения Блокировка Чтение с задержкой
Insert add(e)offer(e)put(e)offer(e, time, unit)
Remove remove()poll()take()poll(time, unit)
Проверка element()peek()не применимыйне применимый

Полная англоязычная документация интерфейса BlockingQueue с примером представлена здесь.

Интерфейс BlockingDeque

Интерфейс BlockingDeque, также, как и BlockingQueue, определяет блокирующую, но двунаправленную очередь, наследующую свойства интерфейса Deque и ориентированную на многопотоковое исполнение, не разрешающую нулевые элементы и с возможностью ограничения емкости. Реализации интерфейса BlockingDeque блокируют операции получения элементов, если очередь пустая, и добавления элемента в очередь, если она полная.

Методы BlockingDeque объединены в 4 группы, по-разному реагирующие на невозможность выполнения операции в текущий момент и откладывающие их выполнение на небольшое время : первые вызывают Exception, вторые возвращают определенное значение (null или false), третьи блокируют поток на неопределенное время до момента выполнения операции, четвертые блокируют поток на определенное время. Методы представлены в следующей таблице :

Первый Элемент (голова)
Вызывает Exception Чтение значения Блокировка Чтение с задержкой
Insert addFirst(e)offerFirst(e)putFirst(e)offerFirst(e, time, unit)
Remove removeFirst()pollFirst()takeFirst()pollFirst(time, unit)
Проверка getFirst()peekFirst()не применимыйне применимый
Последний Элемент (хвост)
Insert addLast(e)offerLast(e)putLast(e)offerLast(e, time, unit)
Remove removeLast()pollLast()takeLast()pollLast(time, unit)
Проверка addLastpeekLast()не применимыйне применимый

Реализация BlockingDeque может использоваться непосредственно в качестве BlockingQueue с механизмом FIFO. Следующие представленные в таблице методы и наследованные от интерфейса BlockingQueue, точно эквивалентны методам BlockingDeque :

BlockingQueue method Equivalent BlockingDeque method
Insert
add(e)addLast(e)
offer(e)offerLast(e)
put(e)putLast(e)
offer(e, time, unit)offerLast(e, time, unit)
Remove
remove()removeFirst()
poll() pollFirst()
take()takeFirst()
poll(time, unit)pollFirst(time, unit)
Examine (Проверка)
element()getFirst()
peek()peekFirst()

Действия по размещению объекта в BlockingDeque выполняйте перед действиями проверки доступа или удаления элемента из очереди в другом потоке.

Полная англоязычная документация интерфейса BlockingDeque представлена здесь.

Очередь ArrayBlockingQueue

Класс блокирующей очереди ArrayBlockingQueue реализует классический ограниченного размера кольцевой буфер FIFO — «первым прибыл - первым убыл». Новые элементы вставляются в хвост очереди; операции извлечения отдают элемент из головы очереди. Создаваемая емкость очереди не может быть изменена. Попытки вставить (put) элемент в полную очередь приведет к блокированию работы потока; попытка извлечь (take) элемент из пустой очереди также блокирует поток.

Данный класс поддерживает дополнительную политику справедливости параметром fair в конструкторе для упорядочивания работы ожидающих потоков производителей (вставляющих элементы) и потребителей (извлекающих элементы). По умолчанию упорядочивание работы очереди не гарантируется. Но если очередь создана с «fair=true», реализация класса ArrayBlockingQueue предоставляет доступ потоков в порядке FIFO. Справедливость обычно уменьшает пропускную способность, но также снижает изменчивость и предупреждает исчерпание ресурсов.

Класс ArrayBlockingQueue и его iterator реализуют все дополнительные методы Collection и Iterator.

Конструкторы класса ArrayBlockingQueue

Первый конструктор создает очередь фиксированной емкости и с политикой доступа по умолчанию. Второй конструктор — очередь с фиксированной емкостью и указанной политикой доступа. Последний конструктор создает очередь с фиксированной емкостью, указанной политикой доступа и включает в очередь элементы.

ArrayBlockingQueue(int capacity)
ArrayBlockingQueue(int capacity, boolean fair)
ArrayBlockingQueue(int capacity, boolean fair, 
                   Collection<? extends E> c)

Метод toArray() возвращает массив элементов очереди типа Object[]. Полная англоязычная документация класса ArrayBlockingQueue представлена здесь.

Пример использования ArrayBlockingQueue

В примере BlockingQueueExample создается очередь drop типа ArrayBlockingQueue емкостью в один элемент и с установленом флагом справедливости. После этого запускаются два потока. Первый поток Producer помещает в очередь сообщения из массива messages с использованием метода put, а второй поток Consumer считывает из очереди сообщения методом take и выводит их в консоль.

import java.util.concurrent.*;
 
public class BlockingQueueExample 
{
    private BlockingQueue<String> drop;

    private final String    DONE     = "done";
    private final String[]  messages = {"Мама пошла готовить обед",
                                        "Мама позвала к столу",
                                        "Дети кушают молочную кашу",
                                        "А что ест папа?"};	
    public BlockingQueueExample() 
    {
        drop = new ArrayBlockingQueue<String>(1, true);
        (new Thread(new Producer())).start();
        (new Thread(new Consumer())).start();
    }

    class Producer implements Runnable 
    {
        public void run() {
            try {
                int cnt = 0;
                for (int i=0; i<messages.length; i++) {
                    drop.put(messages[i]);
                    if (++cnt < 3)
                        Thread.sleep(2000);
                }
                drop.put(DONE);
            } catch (InterruptedException e) {
                System.err.println(e.getMessage());
            }
        }
    }

    class Consumer implements Runnable
    {
        public void run() {
            try {
                String msg = null;
                while (!((msg = drop.take()).equals(DONE)))
                    System.out.println(msg);
            } catch (InterruptedException e) {
                System.err.println(e.getMessage());
            }
        }
    }

    public static void main(String[] args) {
        new BlockingQueueExample();
    }
}

Результат выполнения примера

Второй поток последовательно считывает из очереди сообщения и выводит их в консоль. Порядок поступления сообщений в очередь соблюдается; но следует отметить, что нагрузка незначительна.


Мама пошла готовить обед
Мама позвала к столу
Дети кушают молочную кашу
А что ест папа?
 

Очередь LinkedBlockingQueue

Класс блокирующей очереди LinkedBlockingQueue, основанный на соединенных узлах, упорядочивает порядок поступления и выдачи элементов FIFO — «первым прибыл - первым убыл». Новые элементы вставляются в хвост очереди; операции чтения извлекают элемент из головы очереди. У соединенных на узлах очереди обычно более высокая пропускная способность, чем у основанной на массиве очереди, но менее предсказуемая производительность в большинстве многопоточных приложений.

Конструкторы класса LinkedBlockingQueue

Первый конструктор создает пустую очередь фиксированной емкости. Второй конструктор создает очередь с фиксированной емкостью capacity. Третий конструктор создает очередь с набором элементов. Если используется конструктор без указания емкости очереди, то используется значение по умолчанию Integer.MAX_VALUE.

LinkedBlockingQueue()
LinkedBlockingQueue(int capacity)
LinkedBlockingQueue(Collection<? extends E> c)

Класс LinkedBlockingQueue и его iterator реализуют все опциональные методы Collection и Iterator.

Метод toArray() возвращает массив элементов очереди типа Object[]. Полная англоязычная документация класса LinkedBlockingQueue представлена здесь.

Очередь LinkedBlockingDeque

Класс LinkedBlockingDeque создает двунаправленную очередь с реализацией интерфейса BlockingDeque, наследуемого от интерфейса Deque. Данный класс может иметь ограничение на количество элементов в очереди. Если ограничение не задано, то оно равно значению Integer.MAX_VALUE.

// Конструкторы класса

LinkedBlockingDeque()
LinkedBlockingDeque(int capacity)
LinkedBlockingDeque(Collection<? extends E> c)

Класс LinkedBlockingDeque и его iterator реализуют все опциональные методы Collection и Iterator.

Элементы в двустороннюю очередь LinkedBlockingDeque можно добавлять при помощи следующих методов :

  • boolean add(E e)
  • void addFirst(E e)
  • void addLast(E e)

Метод add() аналогичен методу addLast(). В случае нехватки места в двусторонней очереди вызывается исключение IllegalStateException. Элементы можно также добавить при помощи следующих методов :

  • boolean offer(E e)
  • boolean offerFirst(E e)
  • boolean offerLast(E e)

В отличие от добавления элементов при помощи метода addXXX(), при добавлении элементов методом offerXXX() возвращается false, если элемент не может быть добавлен. Для удаления элементов имеются методы :

  • remove(), removeFirst(), removeLast()
  • poll(), pollFirst(), pollLast()

Методы removeXXX() выбрасывают исключение NoSuchElementException при пустой двусторонней очереди. Методы pollXXX() используются для чтения с удалением и возвращают значение null, если очередь пуста.

Несмотря на то, что работа с двусторонними очередями предполагает удаление элементов только с концов очереди, можно удалять определенный объект очереди при помощи следующих методов :

  • boolean remove(Object o)
  • boolean removeFirstOccurrence(Object o)
  • boolean removeLastOccurrence(Object o)

Так как концептуально двусторонняя очередь является привязанной с двух сторон, то можно проводить поиск элементов в любом порядке. Итератор iterator() можно использовать для поиска элементов с начала до конца, а descendingIterator() — для поиска элементов в обратном направлении с конца до начала. Однако нельзя получить доступ к элементу по его местоположению.

Метод toArray() возвращает массив элементов очереди типа Object[]. Полная англоязычная документация класса LinkedBlockingDeque представлена здесь.

В каких случаях нужны двусторонние очереди? Этот тип очереди позволяет сформировать удобные структуры данных при использовании рекурсивных процедур, как, например, работа с лабиринтами и разбор исходных данных. Так, при разборе исходных данных, можно сохранять правильные варианты в очереди, добавляя их с одной стороны. Если вариант при проверке оказывается неверным, то он удаляется, возвращаясь к последнему верному элементу. В этом случае используется только одна сторона очереди, как в стеке. При достижении «дна» необходимо вернуться в начало для получения решения, которое начинается с последнего элемента. Другим типичным примером является планировщик заданий в операционной системе.

Нижеприведенный пример демонстрирует использование интерфейса BlockingDeque, а вернее его реализацию — класса LinkedBlockingDeque с установленными границами. Это далеко не лучший пример использования двусторонней очереди, но он позволяет показать применение API и возникающие при достижении предела очереди события.

Пример использования LinkedBlockingDeque

В примере все сообщения, выводимые в консоль, и объекты (коллекция names, очередь deque) для наглядности вынесены за пределы конструктора класса. Сначала в конструкторе создается массив из элементов (месяцев) календаря и формируется двунаправленная блокирующая очередь типа LinkedBlockingDeque с ограничением в 6 элементов. После этого стартуют два потока : Producer для добавления элементов в очередь и Consumer для извлечения элемента с удалением. Задержки после выполнения операций с очередью у потоков различны, Producer опережает Consumer, т.е. очередь должна быстрее наполняться.

Поток Producer вставляет элементы из коллекции в начало очереди. Поток Consumer извлекает из очереди элементы : если размер очереди нечетный, то элемент извлекается из головы, в противном случае из хвоста очереди. Все сообщения выводятся в консоль. Результат работы примера после листинга.

import java.util.*;
import java.util.concurrent.*;

public class LinkedBlockingDequeExample 
{
    final String EXTRACT     = "Извлечение из map : %s%n"   ;
    final String INSERT      = "Добавление в очередь : %s%n";
    final String WAIT        = "... ожидание : %s%n"        ;
    final String SIZE        = "--- deque.size=%d ---%n"    ;
    final String REMOVE_HEAD = "\tremove head: %s%n"        ;
    final String REMOVE_TAIL = "\tremove tail: %s%n"        ;

    Map<String, Integer>  names = null;
    Deque<String>         deque = null;

    LinkedBlockingDequeExample()
    {
        Calendar now    = Calendar.getInstance();
        Locale   locale = Locale.getDefault();

        names = now.getDisplayNames(Calendar.MONTH, 
                                    Calendar.ALL_STYLES, locale);
        
        System.out.printf("Список коллекции : %s%n", names);
        
        deque = new LinkedBlockingDeque<String>(6);

        Thread producer = new Thread(new Producer());
        producer.start();

        (new Thread(new Consumer())).start();

        while (producer.isAlive()) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException ignored) {}
        }
        System.exit(0);
    }
    class Producer implements Runnable 
    {
        public Producer() {}
        public void run() {
            Set<String>      keys = names.keySet();
            Iterator<String> iter = keys.iterator();
            String element = null;
            while ((iter.hasNext()) || (element != null)) {
                if (element == null) {
                    element = iter.next();
                    System.out.printf(EXTRACT, element);
                }
                // Добавление элемента в начало	     
                if (deque.offerFirst(element)) {
                    System.out.printf(INSERT, element);
                    iter.remove();
                    element = null;
                } else {
                    System.out.printf(WAIT, element);
                    try {
                        Thread.sleep(250);
                    } catch (InterruptedException ignored) {}
                }
                System.out.printf(SIZE, deque.size());
            }
            try {
                Thread.sleep(3500);
            } catch (InterruptedException ignored) {}
        }    
    }

    class Consumer implements Runnable
    {
        public Consumer() { }
         
        public void run() {
            while (true) {
                if ((deque.size() % 2 == 1))
                    // удаление из начала
                    System.out.printf(REMOVE_HEAD, 
                                      deque.pollFirst());
                else
                    // удаление из конца
                    System.out.printf(REMOVE_TAIL,
                                      deque.pollLast());
                try {
                    // пауза между циклами
                    Thread.sleep(500);
                } catch (InterruptedException ignored) {}
            }
        }
    }

    public static void main(String args[])
    {
        new LinkedBlockingDequeExample();
    }
}

Выполнение примера LinkedBlockingDeque

Коллекция включает различные варианты представления месяцев календаря. Producer извлекает элемент из map и добавляет его в начало очереди. После этого в консоль выводится размер очереди. Если очередь полная, то Producer ждет освобождения места. Consumer извлекает элемент из очереди. Поскольку он всегда «приходит» к полной очереди (6 элементов), то извлекает он из хвоста, до тех пор, пока Producer не закончит добавлять элементы в очередь.


Список коллекции : {                                        \
  Сентябрь=8, сентября=8, Декабрь=11, Октябрь=9, Февраль=1, \ 
  августа=7, декабря=11, октября=9, февраля=1, Август=7,    \
  Апрель=3, Ноябрь=10, Январь=0, апреля=3, ноября=10,       \
  января=0, Нояб.=10, Сент.=8, Февр.=1, марта=2, Авг.=7,    \ 
  Апр.=3, Дек.=11, Июль=6, Июнь=5, Март=2, Окт.=9, Янв.=0,  \
  июля=6, июня=5, Май=4, авг=7, апр=3, дек=11, июл=6,       \
  июн=5, мар=2, мая=4, ноя=10, окт=9, сен=8, фев=1, янв=0}

	remove tail: null
Извлечение из map : Сентябрь
Добавление в очередь : Сентябрь
--- deque.size=1 ---
Извлечение из map : сентября
Добавление в очередь : сентября
--- deque.size=2 ---
Извлечение из map : Декабрь
Добавление в очередь : Декабрь
--- deque.size=3 ---
Извлечение из map : Октябрь
Добавление в очередь : Октябрь
--- deque.size=4 ---
Извлечение из map : Февраль
Добавление в очередь : Февраль
--- deque.size=5 ---
Извлечение из map : августа
Добавление в очередь : августа
--- deque.size=6 ---
Извлечение из map : декабря
... ожидание : декабря
--- deque.size=6 ---
... ожидание : декабря
	remove tail: Сентябрь
--- deque.size=5 ---
Добавление в очередь : декабря
--- deque.size=6 ---
Извлечение из map : октября
... ожидание : октября
--- deque.size=6 ---
... ожидание : октября
	remove tail: сентября
--- deque.size=5 ---

. . .

Добавление в очередь : фев
--- deque.size=6 ---
Извлечение из map : янв
... ожидание : янв
--- deque.size=6 ---
... ожидание : янв
	remove tail: мар
--- deque.size=5 ---
Добавление в очередь : янв
--- deque.size=6 ---
	remove tail: мая
	remove head: янв
	remove tail: ноя
	remove head: фев
	remove tail: окт
	remove head: сен
	remove tail: null
 

Очередь SynchronousQueue

Класс SynchronousQueue формирует блокирующую очередь, в которой каждая операция добавления в одном потоке должна ждать соответствующей операции удаления в другом потоке и наоборот. В сущности, SynchronousQueue является еще одной реализацией представленного выше интерфейса BlockingQueue. Данный тип очереди предоставляет удобный способ обмена одиночными элементами между потоками посредством семантики блокировки, используемой в ArrayBlockingQueue. Синхронная очередь не имеет внутренней емкости, даже в один элемент.

Полная англоязычная документация интерфейса SynchronousQueue с примером представлена здесь.

Лучше один раз, но гарантированно, чем ... . Рассмотрим представленный ниже пример SynchQueuesExample с использованием синхронной очереди.

Листинг примера использования SynchronousQueue

Пример SynchQueuesExample создан на основе примера BlockingQueueExample. Поэтому в листинге представлен не весь код, а только внесенные изменения. В конце страницы можно скачать исходный код всех примеров.

В примере SynchQueuesExample создаются два потока, которые работают с очередью SynchronousQueue. Первый поток Producer вставляет элементы в очередь, а второй поток Consumer с задержкой в 3 сек. извлекает из очереди элементы. Перед каждой и после каждой операции вставки и чтения выводятся соответствующие сообщения. Основная идея примера — проверка ожидания выполнения операций внесения и извлечения в синхронной очереди.

private final String BEFORE_PUT ="*** %s before put message";
private final String AFTER_PUT  ="*** %s after put message\n";

private final String BEFORE_TAKE="--- %s before take message";
private final String AFTER_TAKE ="--- %s after take message\n";

SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");

public SynchQueuesExample ()
{
    drop = new SynchronousQueue<String>();
    (new Thread(new Producer())).start();
    (new Thread(new Consumer())).start();
}
void printMessage(final String templ)
{
    String text = String.format(templ, sdf.format(new Date()));
    System.out.println(text);
}
class Producer implements Runnable {
    public void run() {
        try {
            for (int i = 0; i < messages.length; i++) {
                printMessage(BEFORE_PUT);
                drop.put(messages[i]);
                printMessage(AFTER_PUT);
            }
            drop.put(DONE);
        } catch (InterruptedException e) {}
    }
}

class Consumer implements Runnable {
    public void run() {
        try {
            String msg = null;
            while (true) {
                printMessage(BEFORE_TAKE);
                Thread.sleep(3000);
                if (!((msg = drop.take()).equals(DONE))) {
                    System.out.println(msg);
                } else
                    break;
                printMessage(AFTER_TAKE);
            }
        } catch (InterruptedException e) {}
    }
}

Результат выполнения

Ниже представлены результаты работы примера. Как и следовало ожидать Producer при выполнении операции вставки блокируется, пока Consumer не начнет забирать элемент из очереди. Завершается операция вставки прежде операции извлечения.


*** 14:26:12 before put message
--- 14:26:12 before take message
Мама пошла готовить обед
*** 14:26:15 after put message

--- 14:26:15 after take message ---

*** 14:26:15 before put message
--- 14:26:15 before take message
Мама позвала к столу
*** 14:26:18 after put message

--- 14:26:18 after take message ---

*** 14:26:18 before put message
--- 14:26:18 before take message
Дети кушают молочную кашу
--- 14:26:21 after take message ---

*** 14:26:21 after put message

--- 14:26:21 before take message
*** 14:26:21 before put message
А что ест папа?
*** 14:26:24 after put message

--- 14:26:24 after take message ---

--- 14:26:24 before take message
 

Очередь LinkedTransferQueue

В отличие от реализации очередей интерфейса BlockingQueue, где потоки могут быть блокированы при чтении, если очередь пустая, либо при записи, если очередь полная, очереди интерфейса TransferQueue блокируют поток записи до тех пор, пока другой поток не извлечет элемент. Для этого следует использовать метод transfer.

Иначе говоря, реализация BlockingQueue гарантирует, что элемент, созданный производителем (Producer), должен находиться в очереди, в то время как реализация TransferQueue гарантирует, что элемент Producer'а «получает» потребитель (Consumer).

Документация класса LinkedTransferQueue представлена здесь.

Пример использования LinkedTransferQueue

Следующий пример демонстрирует применение очереди LinkedTransferQueue. В примере содаются два потока, работающие с очередью TransferQueue<String>. Поток производитель Producer размещает элемент в очереди с использованием метода transfer. Поток потребитель Consumer извлекает элементы из очереди и выводит их в консоль. Перед получением элемента из очереди Consumer делает небольшую задержку, по которой можно оценить, кто кого ожидает, т.е. какой поток блокируется очередью. Перед каждой операции с очередью и после выполнения операции потоки выводят соответствующие сообщения в консоль с указанием времени.

import java.util.concurrent.TransferQueue;
import java.util.concurrent.LinkedTransferQueue;

public class TransferQueueExample 
{
    TransferQueue<String> queue = null;

    String WAIT_producer = "Producer waiting to transfer : ";
    String TRANSFERED    = "Producer transfered :"          ;

    String WAIT_consumer = "Consumer waiting to consumed : ";
    String CONSUMED      = "Consumer consumed : "           ;

    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");

    public TransferQueueExample()
    {
        queue = new LinkedTransferQueue<String>();

        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
    }
    void printMessage(final String msg)
    {
        String text = sdf.format(new Date()) + " " + msg;
        System.out.println(text);
    }
    class Producer implements Runnable{
        @Override
        public void run() {
            for(int i = 0; i < 2; i++){
                try{
                    printMessage(WAIT_producer + i);
                    queue.transfer("'producer-" + i + "'");
                    printMessage(TRANSFERED + i + "\n");
                } catch(Exception e){}
            }
        }

    }
    class Consumer implements Runnable{
        @Override
        public void run() {
            for(int i = 0; i < 2; i++){
                try{
                    Thread.sleep(2000);
                     printMessage(WAIT_consumer + i);
                    String element = queue.take();
                    printMessage(CONSUMED + element);
                }catch(Exception e){}
            }
        }
    }

    public static void main(String args[]) {
        new TransferQueueExample();
    }
}

Результат выполнения

Ниже представлены результаты работы примера TransferQueueExample. Как и следовало ожидать Producer при размещении элемента в очередь блокируется, пока Consumer не извлечет элемент из очереди (см. время) :


14:23:25 Producer waiting to transfer : 0
14:23:27 Consumer waiting to consumed : 0
14:23:27 Consumer consumed : 'producer-0'
14:23:27 Producer transfered :0

14:23:27 Producer waiting to transfer : 1
14:23:29 Consumer waiting to consumed : 1
14:23:29 Consumer consumed : 'producer-1'
14:23:29 Producer transfered :1
 

Очередь DelayQueue

Неограниченная очередь блокирования элементов DelayQueue реализует интерфейс Delayed и позволяет извлекать элемент с некоторой временно́й задержкой. Если задержка не истекла, то метод poll вернет null. Очередь не разрешает запись нулевых элементов. Метод класса getDelay(TimeUnit.NANOSECONDS) вернет значение меньше или равное нулю, если время еще не истекло. Метод size возвращает общее количество элементов с истекшим и неистекшим временем задержки.

Класс DelayQueue и его iterator реализуют все дополнительные методы Collection и Iterator интерфейсы.

Документация класса DelayQueue представлена здесь.

Скачать примеры

Рассмотренные на странице примеры использования блокирующих очередей пакета java.util.concurrent в виде проекта Eclipse можно скачать здесь (20.5 Кб).

  Рейтинг@Mail.ru