Многопоточный пакет util.concurrent

Разработка многопоточного кода, обеспечивающего не только хорошую производительность, но и защиту данных приложения, не является тривиальной задачей. Именно для таких целей предназначен пакет java.util.concurrent. Разработчики, как правило, ограничиваются обычной синхронизацией, и поэтому с пакетом многопоточности java.util.concurrent они сталкиваются не так уж часто.

В данной статье приводится обзор пакета java.util.concurrent, разбитого на несколько функциональных групп. В описании каждой группы приводится ссылка на более подробное описание объектов с примерами на страницах сайта.

Дополнительную информацию о многопоточности можно найти в документации Oracle Java Concurrency Tutorials и в книге Doug Lea «Concurrent Programming in Java by Doug Lea», являющегося автором пакета java.util.concurrent и профессором университета штата Нью Йорк. К наиболее известным java разработкам Doug Lea следует отнести collections и util.concurrent, которые в том или ином виде отразились в существующих JDK. На домашней страничке Doug Lea размещена второе издание книги по многопоточности Concurrent Programming in Java™: Design Principles and Pattern, 2nd Edition, с которой можно познакомиться здесь.

Структура пакета java.util.concurrent

Классы и интерфейсы пакета java.util.concurrent объедининены в несколько групп по функциональному признаку, как это представлено в следующей таблице :

НаименованиеПримечание
collections Набор более эффективно работающих в многопоточной среде коллекций нежели стандартные универсальные коллекции из java.util пакета
synchronizers Объекты синхронизации, позволяющие разработчику управлять и/или ограничивать работу нескольких потоков.
atomic Набор атомарных классов, позволяющих использовать принцип действия механизма оптимистической блокировки для выполнения атомарных операций.
Queues Объекты создания блокирующих и неблокирующих очередей с поддержкой многопоточности.
Locks Механизмы синхронизации потоков, альтернативы базовым synchronized, wait, notify, notifyAll
Executors Механизмы создания пулов потоков и планирования работы асинхронных задач

Concurrent Collections

Обычные наборы данных, реализующих интерфейсы List, Set и Map, нельзя использовать в многопоточных приложениях, если требуется синхронизация, т.е. такие коллекции недопустимы для одновременного чтения и изменения данных разными потоками. Методы обрамления Collections framework (synchronizedList, synchronizedSet, synchronizedMap), появившиеся в JDK 1.2, имеют существенный недостаток, связанный с препятствованием масштабируемости, поскольку с коллекцией одновременно может работать только один поток.

Пакет java.util.concurrent предлагает свой набор потокобезопасных классов, допускающих разными потоками одновременное чтение и внесение изменений. Итераторы классов данного пакета представляют данные на определенный момент времени и не вызывают исключение ConcurrentModificationException. Все операции по изменению коллекции (add, set, remove) приводят к созданию новой копии внутреннего массива. Этим гарантируется, что при проходе итератором по коллекции не будет ConcurrentModificationException. Следует помнить, что при копировании массива копируются только ссылки на объекты.

CopyOnWriteArrayList реализует алгоритм CopyOnWrite и является потокобезопасным аналогом ArrayList. Класс CopyOnWriteArrayList содержит изменяемую ссылку на неизменяемый массив, обеспечивая преимущества потокобезопасности без необходимости использования блокировок. Т.е. при выполнении модифицирующей операции CopyOnWriteArrayList создаёт новую копию списка и гарантирует, что её итераторы вернут состояние списка на момент создания итератора и не вызовут ConcurrentModificationException. Описание CopyOnWriteArrayList с примером представлено здесь.

ConcurrentHashMap<K, V> реализует (implements) интерфейс java.util.concurrent.ConcurrentMap и отличается от HashMap и Hashtable внутренней структурой хранения пар key-value. СoncurrentHashMap использует несколько сегментов, и данный класс можно рассматривать как группу HashMap’ов. По умолчанию количество сегментов равно 16. Доступ к данным определяется по сегментам, а не по объекту. Итераторы данного класса фиксируют структуру данных на момент начала его использования. Описание ConcurrentHashMap<K, V> с примером представлено здесь.

CopyOnWriteArraySet выполнен на основе CopyOnWriteArrayList с реализацией интерфейса Set. Описание CopyOnWriteArraySet с примером представлено здесь.

ConcurrentNavigableMap расширяет возможности интерфейса NavigableMap для использования в многопоточных приложениях; итераторы класса декларируются как потокобезопасные и не вызывают ConcurrentModificationException.

ConcurrentSkipListMap является аналогом коллекции TreeMap с сортировкой данных по ключу и с поддержкой многопоточности.

ConcurrentSkipListSet выполнен на основе ConcurrentSkipListMap с реализацией интерфейса Set.

Concurrent Synchronizers, объекты синхронизации

Прежде чем говорить о синхронизаторах пакета java.util.concurrent, вспомним, что такое синхронизация.

Синхронизация — это процесс, позволяющий выполнять в программе синхронно параллельные потоки. Несколько потоков могут мешать друг другу при обращении к одним и тем же объектам приложения. Для решения этой проблемы используется мьютекс, он же монитор, имеющий два состояния — объект занят и объект свободен. Монитор (мьютекс) — это высокоуровневый механизм взаимодействия и синхронизации процессов, обеспечивающий доступ к неразделяемым ресурсам. Мьютекс встроен в класс Object и, следовательно, имеется у каждого объекта.

Синхронизация в Java реализуется использованием зарезервированного слова synchronized. Можно использовать synchronized в классах, определяя синхронизированные методы или блоки. Но нельзя использовать synchronized в переменных или атрибутах в определении класса.

Когда какой-либо поток начинает использовать общий для всех потоков объект, то он проверяет мьютекс этого объекта. Если мьютекс свободен, то поток блокирует его, помечая как занятый, и приступает к использованию данного ресурса. После завершения работы, поток разблокирует мьютекс (помечает свободным). Если же поток обнаруживает, что объект заблокирован, то он «засыпает» в ожидании освобождения мьютекса. При освобождении мьютекса ожидающий поток тут же заблокирует его и приступит к работе. А как быть, если несколько потоков ожидают освобождения мьютекса? Кто первый встал, того и тапки … ? Эту проблему легко разрешит java.util.concurrent.Semaphore (см. ниже).

Таким образом, при обычной синхронизации потоков используют оператор synchronized для ограничения (блокирования) доступа к определенному методу, блоку кода или объекту без каких-либо условий. Пакет java.util.concurrent содержит пять объектов синхронизации, позволяющих накладывать определенные условия для синхронизации потоков.

Semaphore (семафор) — объект синхронизации, ограничивающий одновременный доступ к общему ресурсу нескольким потокам с помощью счетчика. При запросе разрешения и значении счетчика больше нуля доступ предоставляется, а счетчик уменьшается; в противном случае — доступ запрещается. При освобождении ресурса значение счетчика семафора увеличивается. Количество разрешений семафора определяется в конструкторе. Второй конструктор семаформа включает дополнительный параметр «справедливости», определяющий порядок предоставления разрешения ожидающим доступа потокам. Описание с примером представлено здесь.

CountDownLatch («защелка с обратным отсчетом») — объект синхронизации потоков, блокирующий один или несколько потоков до тех пор, пока не будут выполнены определенные условия. Количество условий задается счетчиком. При обнулении счетчика, т.е. при выполнении всех условий, блокировки выполняемых потоков будут сняты и они продолжат выполнение кода. Примером CountDownLatch может служить экскурсовод, собирающий группу из заданного количества туристов. Как только группа собрана она отправляется на экскурсию. Необходимо отметить, что счетчик одноразовый и не может быть инициализирован по-новому. Описание с примером представлено здесь

CyclicBarrier — объект синхронизации типа «Барьер» используется, как правило, в распределённых вычислениях. Барьерная синхронизация останавливает участника (исполняемый поток) в определенном месте в ожидании прихода остальных потоков группы. Как только все потоки достигнут барьера, барьер снимается и выполнение потоков продолжается. Циклический барьер CyclicBarrier, также, как и CountDownLatch, использует счетчик и похож на него. Отличие связано с тем, что «защелку» нельзя использовать повторно после того, как её счётчик обнулится, а барьер можно использовать (в цикле). Описание с примером представлено здесь

Exchanger — объект синхронизации, используемый для двустороннего обмена данными между двумя потоками. При обмене данными допускается null значения, что позволяет использовать класс для односторонней передачи объекта или же просто, как синхронизатор двух потоков. Обмен данными выполняется вызовом метода exchange, сопровождаемый самоблокировкой потока. Как только второй поток вызовет метод exchange, то синхронизатор Exchanger выполнит обмен данными между потоками. Описание с примером представлено здесь

Phaser — объект синхронизации типа «Барьер», но, в отличие от CyclicBarrier, может иметь несколько барьеров (фаз), и количество участников на каждой фазе может быть разным. Описание с примером представлено здесь

Атомарные классы пакета java.concurrent.atomic

Пакет java.util.concurrent.atomic включает девять атомарных классов для выполнения, так называемых, атомарных операций. Операция является атомарной, если её можно безопасно выполнять при параллельных вычислениях в нескольких потоках, не используя при этом ни блокировок, ни синхронизацию synchronized.

Атомарный класс включает метод compareAndSet, реализующий механизм оптимистичной блокировки и позволяющий изменить значение только в том случае, если оно равно ожидаемому значению. Т.е. если значение атомарного класса было изменено в другом потоке, то оно не будет равно ожидаемому значению, и метод compareAndSet не позволит изменить значение.

Ряд архитектур процессоров имеют инструкцию Compare-And-Swap (CAS), которая реализует операцию compareAndSet. Таким образом, на уровне инструкций процессора имеется поддержка необходимой атомарной операции. В архитектурах процессоров, где инструкция не поддерживается, операции реализованы иными низкоуровневыми средствами.

Подробнее об описании атомарных классов с примерами можно познакомиться здесь.

Queues

Пакет java.util.concurrent содержит классы формирования неблокирующих и блокирующих очередей для многопоточных приложений. Неблокирующие очереди «заточены» на скорость выполнения, блокирующие очереди приостанавливают потоки при работе с очередью.

Неблокирующие очереди
Потокобезопасные и неблокирующие очереди на связанных узлах (linked nodes) реализуют интерфейс Queue и его наследника Deque.

ConcurrentLinkedQueue реализуют интерфейс Queue и формирует неблокирующую и ориентированную на многопоточное исполнение очередь. Размер очереди ConcurrentLinkedQueue не имеет ограничений. Имплементация очереди использует wait-free алгоритм от Michael & Scott, адаптированный для работы с garbage collector'ом. Данный алгоритм довольно эффективен и очень быстр, т.к. построен на CAS (Compare-And-Swap). Описание с примером представлено здесь.

ConcurrentLinkedDeque реализует интерфейс Deque (Double ended queue), читается как «Deck». Данная реализация позволяет добавлять и получать элемента с обеих сторон очереди. Соответственно, класс поддерживает оба режима работы : FIFO (First In First Out) и LIFO (Last In First Out). ConcurrentLinkedDeque следует использовать в том случае, если необходимо реализовывать LIFO, поскольку за счет двунаправленности данный класс проигрывает по производительности очереди ConcurrentLinkedQueue. Описание с примером представлено здесь.

Блокирующие очереди

При обработке большого количества потоков данных использование неблокирующих очередей иногда может оказаться явно недостаточным : разгребающие очереди потоки перестанут справляться с наплывом данных, что может привести к «out of memory» или перегрузить IO/Net настолько, что производительность упадет в разы, пока не наступит отказ системы по таймаутам или из-за отсутствия свободных дескрипторов в системе. Самое неприятное в данном случае то, что возникающая ситуация является нестабильной, сложной для отладки. Для таких случаев нужна блокирующая очередь с возможностью задать её размер и/или условия блокировки.

Блокирующие очереди реализуют интерфейсы BlockingQueue, BlockingDeque, TransferQueue. Интерфейс BlockingQueue хранит элементы в порядке «первый пришел, первый вышел». Добавленные в очередь элементы в определенном порядке, будут извлечены из неё в том же самом порядке. Реализация BlockingQueue гарантирует, что любая попытка извлечь элемент из пустой очереди заблокирует вызывающий поток до тех пор, пока не появится доступный элемент. Аналогично , любая попытка вставить элемент в заполненную очередь заблокирует вызывающий поток до тех пор, пока не освободится место для нового элемента. Интерфейс BlockingDeque включает дополнительные методы для двунаправленной блокирующей очереди, у которой данные можно добавлять и извлекать с обоих сторон очереди.

Интерфейсы блокирующих очередей наряду с возможностью определения размера очереди включают методы, по-разному реагирующие на незаполнение или переполнение queue. Так, например, при добавлении элемента в переполненную очередь, один из методов вызовет IllegalStateException, другой вернет false, третий заблокирует поток, пока не появится место, четвертый же заблокирует поток на определенное время (таймаут) и вернет false, если место так и не появится.

ArrayBlockingQueue — блокирующая очередь, реализующая классический кольцевой буфер. Параметр fair в конструкторе позволяет управлять справедливостью очереди для упорядочивания работы ожидающих потоков производителей (вставляющих элементы) и потребителей (извлекающих элементы). Описание с примером представлено здесь.

LinkedBlockingQueue — блокирующая очередь на связанных узлах, реализующая «two lock queue» алгоритм : один lock добавляет элемент, второй извлекает. За счет двух lock'ов данная очередь показывает более высокую производительность по сравнению с ArrayBlockingQueue, но и расход памяти повышается. Размер очереди задается через конструктор и по умолчанию равен Integer.MAX_VALUE. Описание с примером представлено здесь.

LinkedBlockingDeque — двунаправленная блокирующая очередь на связанных узлах, реализованная как простой двунаправленный список с одним локом. Размер очереди задается через конструктор и по умолчанию равен Integer.MAX_VALUE. Описание с примером представлено здесь.

SynchronousQueue — блокирующая очередь, в которой каждая операция добавления должна ждать соответствующей операции удаления в другом потоке и наоборот. Т.е. очередь реализует принцип «один вошел, один вышел». SynchronousQueue не имеет никакой внутренней емкости, даже емкости в один элемент. Описание с примером представлено здесь.

LinkedTransferQueue — блокирующая очередь с реализацией интерфейса TransferQueue, который позволяет при добавлении элемента в очередь заблокировать вставляющий поток до тех пор, пока другой поток не заберет элемент из очереди. Блокировка может быть как с таймаутом, так и с проверкой ожидающего потока. Таким образом может быть реализован механизм передачи сообщений с поддержкой как синхронных, так и асинхронных сообщений. Подробное описание представлено здесь.

DelayQueue — специфичный вид очереди, позволяющий извлекать элементы только после некоторой задержки, определенной в каждом элементе через метод getDelay интерфейса Delayed. Подробное описание представлено здесь.

PriorityBlockingQueue — является многопоточной оберткой интерфейса PriorityQueue. При размещении элемента в очереди, его порядок определяется в соответствии с «натуральным» упорядочиванием, либо логикой Comparator'а или имплементации Comparable интерфейса. Подробное описание представлено здесь.

Описание пакета java.concurrent.locks

Пакет java.util.concurrent.locks включает классы, которые существенно отличаются от встроенной синхронизации и мониторов, и которые можно использовать для блокировки ресурсов с определенными условиями. Этот пакет дает намного большую гибкость в использовании блокировок без условий и с условием.

Lock — базовый интерфейс, предоставляющий более гибкий подход при ограничении доступа к ресурсам/блокам по сравнению с использованием synchronized. Так, при использовании нескольких блокировок, порядок их освобождения может быть произвольный. Имеется возможность перехода к альтернативному сценарию, если блокировка уже захвачена. Более подробное описание Lock с примером представлено здесь.

Condition — интерфейсное условие в сочетании с блокировкой Lock позволяет заменить методы монитора/мьютекса (wait, notify и notifyAll) объектом, управляющим ожиданием событий. Объект с условием чаще всего получается из блокировок с использованием метода lock.newCondition(). Таким образом можно получить несколько комплектов wait/notify для одного объекта. Блокировка Lock заменяет использование synchronized, а Condition — объектные методы монитора. Более подробное описание Condition с примером представлено здесь.

ReadWriteLock — интерфейс создания read/write блокировок, который реализует один единственный класс ReentrantReadWriteLock. Блокировку чтение-запись следует использовать при длительных и частых операциях чтения и редких операциях записи. Тогда при доступе к защищенному ресурсу используются разные методы блокировки, как показано ниже :

ReadWriteLock rwl = new ReentrantReadWriteLock();
Lock  readLock    = rwl.readLock();
Lock  writeLock   = rwl.writeLock();

Более подробное описание интерфейса ReadWriteLock здесь.

Описание Executors

Многопоточный пакет concurrent включает средства, называемые сервисами исполнения, позволяющие управлять потоковыми задачами с возможностью получения результатов через интерфейсы Future и Callable.

Callable<V> — расширенный аналог интерфейса Runnable, позволяющий возвращать типизированное значение. В интерфейсе используется метод call, являющийся аналогом метода run интерфейса Runnable. Более подробное описание интерфейса Callable с примером представлено здесь.

Future<V> — интерфейс для получения результатов работы потока. Объект данного типа возвращает сервис исполнения ExecutorService, который в качестве параметра получает объект типа Callable<V>. Метод get объекта Future блокирует текущий поток (с таймаутом или без) до завершения работы потока Callable. Кроме этого, интерфейс Future включает методы для отмены операции и проверки текущего статуса. В качестве имплементации часто используется класс FutureTask. Более подробное описание интерфейса Future с примером представлено здесь.

FutureTask<V> — класс-оболочка, базирующаяся на конкретной реализации интерфейса Future. Чтобы создать реализацию класса FutureTask необходим объект Callable. FutureTask представляет удобный механизм для превращения Callable одновременно в Future и Runnable, реализуя оба интерфейса. Имплементация FutureTask может быть передана на выполнение классу, реализующему интерфейс Executor, либо запущена в отдельном потоке, как класс, реализующий интерфейс Runnable. Более подробное описание интерфейса FutureTask с примером представлено здесь.

ExecutorService — представляет собой интерфейс, имплементация которого используется для запуска потоков. Потоки можно запускать, используя методы execute и submit. Оба метода в качестве параметра принимают объекты Runnable или Callable. Метод submit возвращает значение типа Future, позволяющий получить результат выполнения потока. Метод invokeAll работает со списками задач с блокировкой потока до завершения всех задач в переданном списке или до истечения заданного времени. Метод invokeAny блокирует вызывающий поток до завершения любой из переданных задач. Реализация данного интерфейса включает метод shutdown, позволяющий завершить все принятые на исполнение задачи и блокирует поступление новых.

Интерфейс ScheduledExecutorService расширяет свойства ExecutorService для поддержки планирования потоков исполнения. В пакет concurrent включены три предопределенных класса исполнителей: ThreadPoolExecutor, ScheduledThreadPoolExecutor и ForkJoinPool. Чтобы получить реализацию данных объектов необходимо использовать класс-фабрику Executors. Описание интерфейса ExecutorService с примером представлено здесь

  Рейтинг@Mail.ru