410013796724260
• Webmoney
R335386147728
Z369087728698
Неблокирующие concurrent очередиПакет java.util.concurrent для формирования неблокирующих очередей с поддержкой многопоточности включает два класса на связанных узлах :
Классы формирования неблокирующих очередей реализуют интерфейсы Queue и Deque. На странице представлены два примера использования неблокирующих очередей : Интерфейс QueueОбобщенный интерфейс Queue расширяет базовый интерфейс коллекции Collection и определяет поведение класса в качестве однонаправленной очереди. Функциональность интерфейса определяется следующими методами :
В таблице методы интерфейса Queue систематизированы.
Метод offer предназначен для использования, когда отказ в размещения элемента в очереди является нормальным явлением, а не исключением, например, с фиксированной емкостью. Методы remove() и poll() отличаются только по их поведению при пустой очереди : метод remove() выдает исключение, в то время как метод poll() возвращает null. Методы element() и peek() возвращают элемент из головы очереди, но не удаляют его. Реализации интерфейса Queue обычно не позволяют вставку элементов null, хотя некоторые его реализации типа LinkedList не запрещают вставку null. Даже в реализациях, которые допускают это, null не должен быть вставлен в Queue, поскольку null также используется в качестве специального возвращаемого значения методом poll, чтобы указать, что очередь не содержит элементов. Полная документация по интерфейсу Queue представлена здесь. Очередь ConcurrentLinkedQueueКласс ConcurrentLinkedQueue формирует неблокирующую, основанную на связанных узлах и ориентированную на многопоточное исполнение очередь. Размер очереди ConcurrentLinkedQueue не имеет ограничений. Эта очередь реализует принцип FIFO — «первым прибыл, первым убыл». Новые элементы размещаются в хвосте очереди, а операции извлечения получают элементы из головы очереди. Этот класс не разрешает использование null элементов. Очередь ConcurrentLinkedQueue использует эффективный неблокирующий алгоритм "без ожидания" предложенный Мэджедом М. Майклом и Майклом Л. Скоттом (Maged M. Michael, Michael L. Scott). Итераторы класса отражают состояние очереди на определенный момент времени его создания и не вызывают ConcurrentModificationException. Содержавшиеся в очереди элементы с начала создания iterator'a, будут возвращены точно по запросу. Аккуратно используйте метод size, который в отличие от большиства наборов, не является constant-time operation. Из-за асинхронной природы этой очереди, определение текущего количества элементов может быть не точным и требует обхода элементов, если этот набор изменяется во время обхода. Объемные операции addAll, removeAll, retainAll, containsAll, equals и toArray не гарантируют, что будут выполнены атомарно. Например, iterator, работающий с методом addAll, мог бы просмотреть только некоторые из добавленных элементов. Очередь ConcurrentLinkedQueue и iterator реализуют все дополнительные методы интерфейсов Queue и Iterator. Конструкторы ConcurrentLinkedQueueКласс ConcurrentLinkedQueue имеет два конструктора. Первый конструктор без параметров создает пустую очередь. Второй конструктор создает очередь и включает в неё элементы набора, добавленные в порядке обхода iterator'a. ConcurrentLinkedQueue() ConcurrentLinkedQueue(Collection<? extends E> c) Полная документация по неблокирующей очереди ConcurrentLinkedQueue представлена здесь. Пример очереди ConcurrentLinkedQueueВ примере использования неблокирующей очереди ConcurrentLinkedQueueExample создаются два потока. Первый поток Producer с задержками в 200 мс помещает в очередь 10 текстовых сообщений. Второй поток Consumer проверяет наполнение очереди и извлекает из очереди сообщения с задержкой 500 мс. Таким образом, очередь наполняется быстрее, чем опустошается. Пример демонстрирует, каким образом можно использовать неблокирующую очередь ConcurrentLinkedQueue. package example; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; public class ConcurrentLinkedQueueExample { private Queue<String> queue = null; private volatile boolean cycle = true; ConcurrentLinkedQueueExample() { queue = new ConcurrentLinkedQueue<String>(); Thread producer = new Thread(new Producer()); Thread consumer = new Thread(new Consumer()); producer.start(); consumer.start(); while (consumer.isAlive()) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } System.exit(0); } class Producer implements Runnable { public void run() { System.out.println("Producer started"); try { for (int i = 1; i <= 10; i++) { String str = "String" + i; queue.add(str); System.out.println("Producer added : " + str); Thread.sleep(200); } cycle = false; } catch (Exception ex) { ex.printStackTrace(); } } } class Consumer implements Runnable { public void run() { String str; System.out.println("Consumer started\n"); while (cycle || queue.size() > 0) { if ((str = queue.poll()) != null) System.out.println(" consumer removed : " + str); try { Thread.sleep(500); } catch (Exception ex) { ex.printStackTrace(); } } } } public static void main(String[] args) { new ConcurrentLinkedQueueExample(); } } Выполнение примераПри выполнении примера сообщения, представленные ниже, выводятся в консоль. Последовательность сообщений обусловлена временны́ми задержками в потоках при выполнении операций с очередью. Producer started Consumer started Producer added : String1 consumer removed : String1 Producer added : String2 Producer added : String3 consumer removed : String2 Producer added : String4 Producer added : String5 consumer removed : String3 Producer added : String6 Producer added : String7 Producer added : String8 consumer removed : String4 Producer added : String9 Producer added : String10 consumer removed : String5 consumer removed : String6 consumer removed : String7 consumer removed : String8 consumer removed : String9 consumer removed : String10 Интерфейс DequeИнтерфейс Deque расширяет (extends) свойства интерфейса Queue и обеспечивает двусторонний доступ к элементам очереди. Большинство реализаций Deque не устанавливают ограничение на количество элементов, которое они могут содержать в очереди. Но этот интерфейс поддерживает ограниченные емкостью двухсторонние очереди также, как и очереди без определения фиксированного размера. Интерфейс Deque включает методы доступа к элементам в обоих концах двухсторонней очереди. Эти методы обеспечивают вставку, удаление и извлечения элемента. Каждый из этих методов существует в двух формах: один вызывает исключение, если выполнение невозможно, другой возвращает специальное значение (null или false). Методы интерфейса Deque представлены в таблице.
Наследованные от интерфейса Queue следующие методы эквивалентны методам Deque :
Двухсторонние очереди Deque могут быть использованы как LIFO (last-in-first-out) стек (Stack). Данный интерфейс должен иметь предпочтение по отношению к классу Stack. Когда двухсторонняя очередь используется в качестве стека, элементы вставляются и выталкиваются из начала двухсторонней очереди. Методы Stack'a эквивалентны следующим методам Deque :
Метод peek одинаково хорошо работает при использовании Deque в качестве двухсторонней очереди или стека; в любом случае элементы извлекаются из начала двухсторонней очереди. Для удаления внутренних элементов Deque имеет два метода : removeFirstOccurrence и removeLastOccurrence. В отличие от List этот интерфейс не оказывает поддержку индексному доступу к элементам. Полная документация по интерфейсу Deque представлена здесь. Очередь ConcurrentLinkedDequeНеограниченная по емкости двухсторонняя очередь ConcurrentLinkedDeque, основанная на связанных узлах, реализует интерфейс Deque. Очередь обеспечивает безопасные операции размещения, удаления и доступа параллельно выполняющимся потокам. То есть, ConcurrentLinkedDeque можно использовать нескольким потокам одновременно для совместного доступа к общему набору. Данный класс не разрешает размещение null элементов в очереди. Итераторы класса отражают состояние двухсторонней очереди на момент их создания, не вызывают ConcurrentModificationException и могут использоваться одновременно с другими операциями. Также, как и с интерфейсом Queue, аккуратно используйте метод size, который не является constant-time operation. Из-за асинхронной природы этой очереди, определение текущего количества элементов может быть не точным и требует обхода элементов, если этот набор изменяется во время обхода. Объемные операции addAll, removeAll, retainAll, containsAll, equals и toArray не гарантируют, что будут выполнены атомарно. Например, iterator, работающий с методом addAll, мог бы просмотреть только некоторые из добавленных элементов. Конструкторы ConcurrentLinkedDequeКласс ConcurrentLinkedDeque имеет два конструктора. Первый конструктор без параметров создает пустую очередь. Второй конструктор создает очередь и включает в неё элементы набора, добавленные в порядке обхода iterator'a. ConcurrentLinkedDeque() ConcurrentLinkedDeque(Collection<? extends E> c) Полная документация по неблокирующей очереди ConcurrentLinkedDeque представлена здесь. Пример очереди ConcurrentLinkedDequeВ примере использования неблокирующей очереди ConcurrentLinkedDeque создаются два потока. Первый поток Producer с задержками в 200 мс помещает в очередь 10 текстовых сообщений. В зависимости от размера очереди (четное/нечетное) элемент помещается либо в хвост очереди, либо в начало. Второй поток Consumer проверяет наполнение очереди и извлекает из очереди сообщения с задержкой 500 мс. Consumer также проверяет четность размера очереди и извлекает элемент либо из головы, либо из хвоста очереди. Так как задержки в Producer меньше, чем в Consumer, то очередь наполняется быстрее, чем опустошается. Пример демонстрирует, каким образом можно использовать неблокирующую двухстороннюю очередь ConcurrentLinkedDeque. package example; import java.util.concurrent.ConcurrentLinkedDeque; public class ConcurrentLinkedDequeExample { private ConcurrentLinkedDeque<String> queue; public ConcurrentLinkedDequeExample() { queue = new ConcurrentLinkedDeque<String>(); Thread producer = new Thread(new Producer()); producer.start(); Thread consumer = new Thread(new Consumer()); consumer.start(); while (consumer.isAlive()) { try { Thread.sleep(1000); } catch (InterruptedException e) {} } System.exit(0); } class Producer implements Runnable { @Override public void run() { System.out.println("Producer started"); String name = "producer "; String ins = ""; for (int i = 0; i < 10; i++) { String element = "'" + name + i + "'"; if (queue.size() % 2 == 1) { queue.addFirst(element); ins = "addFirst (" + element + ")"; } else { queue.addLast(element); ins = "addLast (" + element + ")"; } System.out.println(name + ins + ": queue.size()=" + queue.size()); try { Thread.sleep(200); } catch (InterruptedException e) {} } } } class Consumer implements Runnable { @Override public void run() { System.out.println("Consumer started"); for (int i = 0; i < 10; i++) { String text = "\n consumer : queue.size()=" + queue.size(); String element; if (queue.size() % 2 == 1) element = "pollFirst : " + queue.pollFirst(); else element = "pollLast : " + queue.pollLast(); text += ", " + element; System.out.println(text); try { Thread.sleep(500); } catch (InterruptedException e) {} } } } public static void main(String[] args) { new ConcurrentLinkedDequeExample(); } } Выполнение примераПри выполнении примера в консоль выводятся представленные ниже сообщения. Последовательность сообщений обусловлена временны́ми задержками в потоках при выполнении операций с очередью. Producer started producer addLast 'producer 0': queue.size()=1 Consumer started consumer : queue.size()=1, pollFirst : 'producer 0' producer addLast 'producer 1': queue.size()=1 producer addFirst 'producer 2': queue.size()=2 consumer : queue.size()=2, pollLast : 'producer 1' producer addFirst 'producer 3': queue.size()=2 producer addLast 'producer 4': queue.size()=3 producer addFirst 'producer 5': queue.size()=4 consumer : queue.size()=4, pollLast : 'producer 4' producer addFirst 'producer 6': queue.size()=4 producer addLast 'producer 7': queue.size()=5 consumer : queue.size()=5, pollFirst : 'producer 6' producer addLast 'producer 8': queue.size()=5 producer addFirst 'producer 9': queue.size()=6 consumer : queue.size()=6, pollLast : 'producer 8' consumer : queue.size()=5, pollFirst : 'producer 9' consumer : queue.size()=4, pollLast : 'producer 7' consumer : queue.size()=3, pollFirst : 'producer 5' consumer : queue.size()=2, pollLast : 'producer 2' consumer : queue.size()=1, pollFirst : 'producer 3' Скачать примерыРассмотренные на странице примеры использования неблокирующих очередей пакета java.util.concurrent в виде проекта Eclipse можно скачать здесь (10.6 Кб). |