Неблокирующие concurrent очереди

Пакет java.util.concurrent для формирования неблокирующих очередей с поддержкой многопоточности включает два класса на связанных узлах :

  • ConcurrentLinkedQueue — неограниченная по емкости и ориентированная на многопоточное исполнение очередь;
  • ConcurrentLinkedDeque — неограниченная по емкости двухсторонняя очередь, ориентированная на многопоточное исполнение.

Классы формирования неблокирующих очередей реализуют интерфейсы Queue и Deque.

На странице представлены два примера использования неблокирующих очередей :

  • пример с очередью ConcurrentLinkedQueue;
  • пример с двунаправленной очередью ConcurrentLinkedDeque;

Интерфейс Queue

Обобщенный интерфейс Queue расширяет базовый интерфейс коллекции Collection и определяет поведение класса в качестве однонаправленной очереди. Функциональность интерфейса определяется следующими методами :

  • E element () — возвращает, но не удаляет, элемент из начала очереди; если очередь пуста, генерирует исключение NoSuchElementException;
  • boolean offer (E) — добавляет элемент в конец очереди; если элемент удачно добавлен, возвращает true, иначе - false;
  • E peek () — возвращает элемент из начала очереди без удаления; если очередь пуста, возвращает значение null;
  • E poll () — возвращает элемент из начала очереди с удалением; если очередь пуста, возвращает значение null;
  • E remove () — возвращает элемент с удалением из начала очереди; если очередь пуста, генерирует исключение NoSuchElementException.

В таблице методы интерфейса Queue систематизированы.

Действие Вызов исключения Возврат значения
Вставитьadd(e) offer(e)
Удалить remove() poll ()
Получитьelement()peek ()

Метод offer предназначен для использования, когда отказ в размещения элемента в очереди является нормальным явлением, а не исключением, например, с фиксированной емкостью.

Методы remove() и poll() отличаются только по их поведению при пустой очереди : метод remove() выдает исключение, в то время как метод poll() возвращает null.

Методы element() и peek() возвращают элемент из головы очереди, но не удаляют его.

Реализации интерфейса Queue обычно не позволяют вставку элементов null, хотя некоторые его реализации типа LinkedList не запрещают вставку null. Даже в реализациях, которые допускают это, null не должен быть вставлен в Queue, поскольку null также используется в качестве специального возвращаемого значения методом poll, чтобы указать, что очередь не содержит элементов.

Полная документация по интерфейсу Queue представлена здесь.

Очередь ConcurrentLinkedQueue

Класс ConcurrentLinkedQueue формирует неблокирующую, основанную на связанных узлах и ориентированную на многопоточное исполнение очередь. Размер очереди ConcurrentLinkedQueue не имеет ограничений. Эта очередь реализует принцип FIFO — «первым прибыл, первым убыл». Новые элементы размещаются в хвосте очереди, а операции извлечения получают элементы из головы очереди. Этот класс не разрешает использование null элементов.

Очередь ConcurrentLinkedQueue использует эффективный неблокирующий алгоритм "без ожидания" предложенный Мэджедом М. Майклом и Майклом Л. Скоттом (Maged M. Michael, Michael L. Scott).

Итераторы класса отражают состояние очереди на определенный момент времени его создания и не вызывают ConcurrentModificationException. Содержавшиеся в очереди элементы с начала создания iterator'a, будут возвращены точно по запросу.

Аккуратно используйте метод size, который в отличие от большиства наборов, не является constant-time operation. Из-за асинхронной природы этой очереди, определение текущего количества элементов может быть не точным и требует обхода элементов, если этот набор изменяется во время обхода. Объемные операции addAll, removeAll, retainAll, containsAll, equals и toArray не гарантируют, что будут выполнены атомарно. Например, iterator, работающий с методом addAll, мог бы просмотреть только некоторые из добавленных элементов.

Очередь ConcurrentLinkedQueue и iterator реализуют все дополнительные методы интерфейсов Queue и Iterator.

Конструкторы ConcurrentLinkedQueue

Класс ConcurrentLinkedQueue имеет два конструктора. Первый конструктор без параметров создает пустую очередь. Второй конструктор создает очередь и включает в неё элементы набора, добавленные в порядке обхода iterator'a.

ConcurrentLinkedQueue()
ConcurrentLinkedQueue(Collection<? extends E> c)

Полная документация по неблокирующей очереди ConcurrentLinkedQueue представлена здесь.

Пример очереди ConcurrentLinkedQueue

В примере использования неблокирующей очереди ConcurrentLinkedQueueExample создаются два потока. Первый поток Producer с задержками в 200 мс помещает в очередь 10 текстовых сообщений. Второй поток Consumer проверяет наполнение очереди и извлекает из очереди сообщения с задержкой 500 мс. Таким образом, очередь наполняется быстрее, чем опустошается. Пример демонстрирует, каким образом можно использовать неблокирующую очередь ConcurrentLinkedQueue.

package example;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public class ConcurrentLinkedQueueExample 
{
    private Queue<String> queue = null;

    private volatile boolean cycle = true;

    ConcurrentLinkedQueueExample()
    {
        queue = new ConcurrentLinkedQueue<String>();

        Thread producer = new Thread(new Producer());
        Thread consumer = new Thread(new Consumer());
        producer.start();
        consumer.start();

        while (consumer.isAlive()) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.exit(0);
    }

    class Producer implements Runnable {

        public void run() {
            System.out.println("Producer started");
            try {
                for (int i = 1; i <= 10; i++) {
                    String str = "String" + i; 
                    queue.add(str);
                    System.out.println("Producer added : " + str);
                    Thread.sleep(200);
                }
                cycle = false;
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }
    class Consumer implements Runnable {
        public void run() {
            String str;
            System.out.println("Consumer started\n");
            while (cycle || queue.size() > 0) {
                if ((str = queue.poll()) != null)
                    System.out.println("  consumer removed : "
                                                         + str);
                try {
                    Thread.sleep(500);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) 
    {
        new ConcurrentLinkedQueueExample();
    }
}

Выполнение примера

При выполнении примера сообщения, представленные ниже, выводятся в консоль. Последовательность сообщений обусловлена временны́ми задержками в потоках при выполнении операций с очередью.


Producer started
Consumer started

Producer added : String1
  consumer removed : String1
Producer added : String2
Producer added : String3
  consumer removed : String2
Producer added : String4
Producer added : String5
  consumer removed : String3
Producer added : String6
Producer added : String7
Producer added : String8
  consumer removed : String4
Producer added : String9
Producer added : String10
  consumer removed : String5
  consumer removed : String6
  consumer removed : String7
  consumer removed : String8
  consumer removed : String9
  consumer removed : String10
 

Интерфейс Deque

Интерфейс Deque расширяет (extends) свойства интерфейса Queue и обеспечивает двусторонний доступ к элементам очереди. Большинство реализаций Deque не устанавливают ограничение на количество элементов, которое они могут содержать в очереди. Но этот интерфейс поддерживает ограниченные емкостью двухсторонние очереди также, как и очереди без определения фиксированного размера.

Интерфейс Deque включает методы доступа к элементам в обоих концах двухсторонней очереди. Эти методы обеспечивают вставку, удаление и извлечения элемента. Каждый из этих методов существует в двух формах: один вызывает исключение, если выполнение невозможно, другой возвращает специальное значение (null или false). Методы интерфейса Deque представлены в таблице.

Первый элемент (голова)
Последний элемент (хвост)
С исключением Получение значения С исключением Получение значения
ВставитьaddFirst(e) offerFirst(e) addLast(e) offerLast(e)
УдалитьremoveFirst(e) pollFirst(e) removeLast(e) pollLast(e)
ИсследоватьgetFirst(e) peekFirst(e) getLast(e) peekLast(e)

Наследованные от интерфейса Queue следующие методы эквивалентны методам Deque :

Метод Queue Эквивалентный метод Deque
add(e)addLast(e)
offer(e)offerLast(e)
remove()removeFirst()
poll() pollFirst()
element()getFirst()
peek()peekFirst()

Двухсторонние очереди Deque могут быть использованы как LIFO (last-in-first-out) стек (Stack). Данный интерфейс должен иметь предпочтение по отношению к классу Stack. Когда двухсторонняя очередь используется в качестве стека, элементы вставляются и выталкиваются из начала двухсторонней очереди. Методы Stack'a эквивалентны следующим методам Deque :

Метод Stack'a Эквивалентный метод Deque
push(e)addFirst(e)
pop()removeFirst()
peek()peekFirst()

Метод peek одинаково хорошо работает при использовании Deque в качестве двухсторонней очереди или стека; в любом случае элементы извлекаются из начала двухсторонней очереди.

Для удаления внутренних элементов Deque имеет два метода : removeFirstOccurrence и removeLastOccurrence.

В отличие от List этот интерфейс не оказывает поддержку индексному доступу к элементам.

Полная документация по интерфейсу Deque представлена здесь.

Очередь ConcurrentLinkedDeque

Неограниченная по емкости двухсторонняя очередь ConcurrentLinkedDeque, основанная на связанных узлах, реализует интерфейс Deque. Очередь обеспечивает безопасные операции размещения, удаления и доступа параллельно выполняющимся потокам. То есть, ConcurrentLinkedDeque можно использовать нескольким потокам одновременно для совместного доступа к общему набору. Данный класс не разрешает размещение null элементов в очереди.

Итераторы класса отражают состояние двухсторонней очереди на момент их создания, не вызывают ConcurrentModificationException и могут использоваться одновременно с другими операциями.

Также, как и с интерфейсом Queue, аккуратно используйте метод size, который не является constant-time operation. Из-за асинхронной природы этой очереди, определение текущего количества элементов может быть не точным и требует обхода элементов, если этот набор изменяется во время обхода. Объемные операции addAll, removeAll, retainAll, containsAll, equals и toArray не гарантируют, что будут выполнены атомарно. Например, iterator, работающий с методом addAll, мог бы просмотреть только некоторые из добавленных элементов.

Конструкторы ConcurrentLinkedDeque

Класс ConcurrentLinkedDeque имеет два конструктора. Первый конструктор без параметров создает пустую очередь. Второй конструктор создает очередь и включает в неё элементы набора, добавленные в порядке обхода iterator'a.

ConcurrentLinkedDeque()
ConcurrentLinkedDeque(Collection<? extends E> c)

Полная документация по неблокирующей очереди ConcurrentLinkedDeque представлена здесь.

Пример очереди ConcurrentLinkedDeque

В примере использования неблокирующей очереди ConcurrentLinkedDeque создаются два потока. Первый поток Producer с задержками в 200 мс помещает в очередь 10 текстовых сообщений. В зависимости от размера очереди (четное/нечетное) элемент помещается либо в хвост очереди, либо в начало. Второй поток Consumer проверяет наполнение очереди и извлекает из очереди сообщения с задержкой 500 мс. Consumer также проверяет четность размера очереди и извлекает элемент либо из головы, либо из хвоста очереди.

Так как задержки в Producer меньше, чем в Consumer, то очередь наполняется быстрее, чем опустошается. Пример демонстрирует, каким образом можно использовать неблокирующую двухстороннюю очередь ConcurrentLinkedDeque.

package example;

import java.util.concurrent.ConcurrentLinkedDeque;

public class ConcurrentLinkedDequeExample 
{
    private ConcurrentLinkedDeque<String> queue;

    public ConcurrentLinkedDequeExample()
    {
        queue = new ConcurrentLinkedDeque<String>();

        Thread producer = new Thread(new Producer());
        producer.start();

        Thread consumer = new Thread(new Consumer());
        consumer.start();

        while (consumer.isAlive()) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {}
        }
        System.exit(0);
    }
    class Producer implements Runnable 
    {
        @Override
        public void run() {
            System.out.println("Producer started");
            String name = "producer ";
            String ins  = "";
            for (int i = 0; i < 10; i++) {
                String element = "'" + name + i + "'";
                if (queue.size() % 2 == 1) {
                    queue.addFirst(element);
                    ins = "addFirst (" + element + ")";
                } else {
                    queue.addLast(element);
                    ins = "addLast (" + element + ")";
                }
                System.out.println(name + ins + ": queue.size()=" +
                                                   queue.size());
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {}
            }
        }
    }
    class Consumer implements Runnable {
        @Override
        public void run() {
            System.out.println("Consumer started");
            for (int i = 0; i < 10; i++) {
                String text = "\n   consumer : queue.size()=" +
                                               queue.size();
                String element;
                if (queue.size() % 2 == 1)
                    element = "pollFirst : " + queue.pollFirst();
                else
                    element = "pollLast : " + queue.pollLast();
                text += ", " + element;
                System.out.println(text);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {}
            }
        }
    }
    public static void main(String[] args) 
    {
        new ConcurrentLinkedDequeExample();
    }
}

Выполнение примера

При выполнении примера в консоль выводятся представленные ниже сообщения. Последовательность сообщений обусловлена временны́ми задержками в потоках при выполнении операций с очередью.


Producer started
producer addLast 'producer 0': queue.size()=1

Consumer started

   consumer : queue.size()=1, pollFirst : 'producer 0'
producer addLast 'producer 1': queue.size()=1
producer addFirst 'producer 2': queue.size()=2

   consumer : queue.size()=2, pollLast : 'producer 1'
producer addFirst 'producer 3': queue.size()=2
producer addLast 'producer 4': queue.size()=3
producer addFirst 'producer 5': queue.size()=4

   consumer : queue.size()=4, pollLast : 'producer 4'
producer addFirst 'producer 6': queue.size()=4
producer addLast 'producer 7': queue.size()=5

   consumer : queue.size()=5, pollFirst : 'producer 6'
producer addLast 'producer 8': queue.size()=5
producer addFirst 'producer 9': queue.size()=6

   consumer : queue.size()=6, pollLast : 'producer 8'
   consumer : queue.size()=5, pollFirst : 'producer 9'
   consumer : queue.size()=4, pollLast : 'producer 7'
   consumer : queue.size()=3, pollFirst : 'producer 5'
   consumer : queue.size()=2, pollLast : 'producer 2'
   consumer : queue.size()=1, pollFirst : 'producer 3'
 

Скачать примеры

Рассмотренные на странице примеры использования неблокирующих очередей пакета java.util.concurrent в виде проекта Eclipse можно скачать здесь (10.6 Кб).

  Рейтинг@Mail.ru