Потокобезопасные concurrent коллекции

Пакет java.util.concurrent включает несколько потокобезопасных коллекций :

ConcurrentHashMap коллекция типа HashMap, реализующая интерфейс ConcurrentMap;
CopyOnWriteArrayList коллекция типа ArrayList с алгоритмом CopyOnWrite;
CopyOnWriteArraySet реализация интерфейса Set, использующая за основу CopyOnWriteArrayList;
• ConcurrentNavigableMap расширяет интерфейс NavigableMap;
• ConcurrentSkipListMap аналог коллекции TreeMap с сортировкой данных по ключу и с поддержкой многопоточности;
• ConcurrentSkipListSet реализация интерфейса Set, выполненная на основе класса ConcurrentSkipListMap.

На странице представлены только первые три потокобезопасные коллекции. Что касается ConcurrentNavigableMap, то итераторы этого класса декларируются как потокобезопасные и не вызывают ConcurrentModificationException. Класс ConcurrentSkipListMap гарантирует усредненную производительность выполнения операций для методов containsKey, get, put, remove и других подобных.

Представленные на странице примеры использования классов пакета java.util.concurrent связаны с перебором данных и одновременным внесением изменений в наборы. Примеры подтверждают неизменность значений при использовании итераторов классов и отсутствие исключений ConcurrentModificationException.

Прежде чем говорить о коллекции пакета java.util.concurrent посмотрим на следующие методы Collections framework, появившиеся в JDK 1.2 :

МетодОписание
Collections.synchronizedList  (List )
Collections.synchronizedSet   (Set  )
Collections.synchronizedMap (Map)
Методы обрамления для получения синхронизированной (потокобезопасной) коллекции

В этой таблице представлены синхронизируемые методы обрамления для получения потокобезопасной коллекции на основе несинхронизированной базовый коллекции. Для коллекции Set, возвращаемую методом обрамления, потокобезопасную коллекцию можно представить следующим кодом :

public static <T> Set<T> synchronizedSet(Set<T> s) 
{
    return new SynchronizedSet<T>(s);
}

Упрощённый подход к синхронизации с использованием методов обрамлений имеет существенный недостаток, связанный с препятствованием масштабируемости, поскольку с коллекцией одновременно может работать только один поток. Кроме этого, недостаточно обеспечить настоящую потокобезопасность коллекции, если множество распространённых составных операций всё ещё требуют дополнительной синхронизации. Так простые операции типа get (интерфейс List) и put (интерфейс Map) могут выполняться безопасно без дополнительной синхронизации. Но существует несколько распространённых операций, связанных с итератором Iterator<E> и методом add (put-if-absent), которые всё же нуждаются во внешней синхронизации, чтобы избежать конкуренции при обращении к данным.

Рассмотрим метод addValue (см. листинг ниже), реализующий принцип добавить-если-отсутствует (put-if-absent) с коллекцией типа Map. Если элемент отсутствует в коллекции map, то его необходимо добавить. Несмотря на то, что map синхронизирован, у параллельного потока существует возможность вставить значение с повторяющимся ключом между моментами возврата текущего потока из метода containsKey() и вызовом им метода put(). Чтобы гарантировать невозможность «одновременной» вставки в массив из параллельного потока, необходимо заключить пару строк метода в синхронизированный блок или синхронизировать весь метод.

HashMap<String, String> hm = new HashMap<String, String>();
Map<String, String> map = Collections.synchronizedMap(hm);

. . .

private void addValue(String key, String value) {
    if (!map.containsKey(key))
        map.put(key, value);
}

В следующем примере в цикле выводятся значения коллекции List в консоль. Такой код не потокобезопасен, поскольку List.size() мог стать недействительным во время выполнения цикла, если параллельный поток удалил элемент из списка. В этом случае на последнем цикле метод get(int) вернет null, а вызов метода toString() объекта null вызовет Exception. Чтобы избежать этого следует заблокировать весь код на время перебора, заключив его в блок synchronized, синхронизирующийся с List. Это решит проблему с конкуренцией за данные, но может сказаться на параллелизме, поскольку блокировка всей коллекции во время перебора может надолго закрыть доступ к списку другим потокам.

List list = Collections.synchronizedList(new ArrayList<String>());

. . .

// печать значений коллекции
for (int i = 0; i < list.size(); i++) {
    System.out.println(list.get(i).toString());
}

Для перебора элементов коллекции в Collections framework можно использовать итераторы. Однако итераторы также подвержены сбоям при работе в многопоточном приложении. Так, если один поток изменяет содержимое коллекции, а второй поток обрабатывает ее итератором Iterator, то при вызове метода Iterator.hasNext() или Iterator.next() будет вызвано исключение ConcurrentModificationException (см. код ниже). Чтобы обезопасить приложение от вызова исключения также, как и в предыдущем примере, необходимо целиком блокировать List на время перебора.

List list = Collections.synchronizedList(new ArrayList<String>());

. . .

// печать значений коллекции
try {
    for (Iterator i = list.iterator(); i.hasNext(); ) {
        System.out.println(i.next().toString());
    }
} catch (ConcurrentModificationException e) {
}

Для двух последних примеров в качестве альтернативы можно преобразовать список в массив, вызовом метода list.toArray(), и выполнить обработку элементов отдельно от коллекции. Но это может «дорого стоить», если список достаточно большой.

Таким образом, использование методов обрамления для получения синхронизированных коллекций представляет скрытую угрозу, поскольку разработчики полагают, что, раз коллекции синхронизированы, то они полностью потокобезопасны, и пренебрегают должной синхронизацией составных операций. Такие программы могут нормально функционировть при лёгкой нагрузке, но при серьёзной нагрузке они могут вызывать NullPointerException или ConcurrentModificationException.

Один из подходов к улучшению масштабируемости коллекции при сохранении потокобезопасности состоит в том, чтобы обходиться без общей блокировки всей таблицы, а использовать блокировки для каждого hash backet (или, в более общем случае, пула блокировок, где каждая блокировка защищает несколько бакетов). Это позволяет нескольким потокам обращаться к различным частям коллекции одновременно, без соперничества за единственную на всю коллекцию блокировку. Данный подход улучшает масштабируемость операций вставки, извлечения и удаления.

Класс ConcurrentHashMap

Работа обычного HashMap построена на принципах хэширования. Ключ объекта в виде хэш-кода используются в сочетании с методом equals(), для добавления и поиска элемента в коллекции. Если хэш-код объекта изменить, то в коллекции объект найти будет практически невозможно. Данный случай ведет к утечке памяти. Чтобы избежать этого ключ и значение должны быть неизмены. Это является главной причиной того, что неизменяемые классы типа String, Integer и остальные классы-оболочки подобного типа являются хорошим выбором для создания ключа. Но неизменность для HashMap рекомендована, но не является обязательной. Если необходимо использовать изменяемый объект, то нужно убедиться в том, что ключевой хэш-код объекта не меняется. Это может быть сделано путем переопределения метода hashCode. Кроме того ключевые классы должны работать корректно с методом equals(). Чтобы добавить объект в HashMap необходимо определить хэш-код и найти правильный сегмент массива hashTable, называемый hash bucket.

Класс ConcurrentHashMap появился в пакете java.util.concurrent в JDK 1.5, является потокобезопасной реализацией Map и предоставляет намного большую степень масштабирования (параллелизма), чем synchronizedMap. Отличие ConcurrentHashMap связано с внутренней структурой хранения пар key-value. СoncurrentHashMap использует несколько сегментов, и данный класс нужно рассматривать как группу HashMap’ов. Количество сегментов по умолчанию равно 16. Если пара key-value хранится в 10-ом сегменте, то ConcurrentHashMap заблокирует, при необходимости, только 10-й сегмент, и не будет блокировать остальные 15.

ConcurrentHashMap реализует (implements) интерфейс java.util.concurrent.ConcurrentMap :

public interface ConcurrentMap<K,V> extends Map<K,V> 
{
    // добавить, если нет объекта value по ключу K
    V putIfAbsent(K key, V value);

    // удалить, если имеется объект value с ключом K
    boolean remove(K key, V value);

    // заменить oldValue новым newValue объекта с ключом K
    boolean replace(K key, V oldValue, V newValue);

    // заменить новым значением newValue объект с ключом K
    V replace(K key, V newValue);
}

Метод putIfAbsent (K, V) добавляет новую пару key-value только в том случае, если в коллекции нет значения с данным ключом, и возвращает предыдущее значение для заданного ключа.

Метод remove(K, V) удаляет пару key-value только в том случае, если заданному ключу соответствует значение в коллекции Map, и возвращает true, если элемент был успешно удален.

Метод replace(K, V, V) заменяет по ключу старое значение на новое только в том случае, если старое значение соответствует заданному значению, и возвращает true, если значение было заменено на новое.

Метод replace(K, V) заменяет по ключу старое значение на новое, и возвращает предыдущее значение для заданного ключа.

Конструкторы класса ConcurrentHashMap

В следующем листинге приведены конструкторы ConcurrentHashMap с комментариями; дополнительных пояснений, полагаю, не требуется.

/*
 * Создание пустой коллекции с параметрами по умолчанию : 
 * capacity (16), load factor (0.75), concurrencyLevel (16)
 */ 
ConcurrentHashMap();

/*
 * Создание пустой коллекции с заданным initialCapacity,
 * остальные параметры по умолчанию
 */ 
ConcurrentHashMap(int initialCapacity)

/*
 * Создание пустой коллекции с заданными параметрами 
 *   initialCapacity, loadFactor;
 * параметр concurrencyLevel определен по умолчанию
 */ 
ConcurrentHashMap(int initialCapacity, float loadFactor);

/*
 * Создание пустой коллекции с заданными параметрами 
 *   initialCapacity, loadFactor, concurrencyLevel
 */ 
ConcurrentHashMap (int initialCapacity, 
                   float loadFactor, 
                   int concurrencyLevel);

/*
 * Создание потокобезопасной коллекции с заданными значения map
 *   initialCapacity, loadFactor, concurrencyLevel
 */ 
ConcurrentHashMap(Map<? extends K,? extends V> map)

Полное описание методов класса ConcurrentHashMapс представлено здесь.

Пример использования ConcurrentHashMap

Рассмотрим простой пример HashMapExample, в котором используем два класса : ConcurrentHashMap и HashMap.

Метод createMap создает объект типа Map<String, String>. В зависимости от параметра concurrent будет использован либо класс ConcurrentHashMap, либо HashMap. Поскольку ConcurrentHashMap реализует интерфейс ConcurrentMap, который расширяет свойства Map<K,V>, то в методе реализуется один их принципов объектно-ориентированного программирования — полиморфизм (всё по науке). Далее метод наполняет объект некоторыми значениями.

В методе addValue сначала выводится содержимое объекта в консоль. После этого с использованием итератора выполняется перебор всего набора данных объекта, и для ключа со значением "2" формируется новый объект, который добавляется в набор. Во время перебора значения набора выводятся в консоль. В заключение повторно выводится содержимое объекта в консоль. Следует отметить, что в этом методе мы проверяем срабатывание исключения ConcurrentModificationException, связанное с работой итератора и модификацией набора во время перебора. После нахождения ключа и добавления объекта в набор, цикл не прерывается. Результаты выполнения после описания примера.

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class HashMapExample 
{
    Map<String, String> map;

    public ConcurrentHashMapExample()
    {
        System.out.println("ConcurrentHashMap");
        createMap(true);
        addValue (true);

        System.out.println("\n\nHashMap");
        createMap(false);
        addValue (false);
    }

    private void addValue(boolean concurrent)
    {
        System.out.println("  before iterator : " + map);
        Iterator<String> t = map.keySet().iterator();

        System.out.print("  cycle : ");
        while(it.hasNext()){
            String key = it.next();
            if (key.equals("2")) {
                map.put(key + "new", "222");
            } else
                System.out.print("  " + key + "=" + map.get(key));
        }
        System.out.println();
        System.out.println("  after iterator : " + map);
    }

    private void createMap(boolean concurrent)
    {
        if (concurrent)
            map = new ConcurrentHashMap<String, String> ();
        else
            map = new HashMap<String, String> ();
        map.put("1", "1");
        map.put("2", "1");
        map.put("3", "1");
        map.put("4", "1");
        map.put("5", "1");
        map.put("6", "1");
    }

    public static void main(String[] args) 
    {
        new ConcurrentHashMapExample();
        System.exit(0);
    }
}

Выполнение примера

Информационные сообщения при выполнении примера выводятся в консоль. Что мы видим? При использование класса ConcurrentHashMap цикл перебора с использованием итератора завершился нормально; в консоль попал также новый объект с ключом "2", добавленный в набор во время итерации. А вот при использовании класса HashMap цикл был прерван вызовом исключения ConcurrentModificationException, как и ожидалось. Место ошибки : String key = it.next();. Т.е. итератор вызывает исключение при обращении к следующему объекту, если набор изменился.


ConcurrentHashMap
  before iterator : {1=1, 2=1, 3=1, 4=1, 5=1, 6=1}
  cycle :   1=1  3=1  4=1  5=1  6=1  2new=222
  after iterator : {1=1, 2=1, 3=1, 4=1, 5=1, 6=1, 2new=222}

HashMap
  before iterator : {1=1, 2=1, 3=1, 4=1, 5=1, 6=1}
  cycle :   1=1 \
Exception in thread "main" java.util.ConcurrentModificationException
    at java.util.HashMap$HashIterator.nextNode(Unknown Source)
    at java.util.HashMap$KeyIterator.next(Unknown Source)
    at example.HashMapExample.addValue(HashMapExample.java:30)
    at example.HashMapExample.<init>(HashMapExample.java:20)
    at example.HashMapExample.main(HashMapExample.java:58)
 

Конечно, данную проблему с классом HashMap можно решить, дополнив код прерыванием цикла (см. следующий код), но это не тема данной статьи.

if (key.equals("2")) {
    map.put(key + "new", "222");
    if (!concurrent)
        break;
} else
    System.out.print("  " + key + "=" + map.get(key));

Класс CopyOnWriteArrayList

Класс CopyOnWriteArrayList следует использовать вместо ArrayList в потоконагруженных приложениях, где могут иметь место нечастые операции вставки и удаления в одних потоках и одновременный перебор в других. Это типично для случая, когда коллекция ArrayList используется для хранения списка объектов.

При использовании обычной ArrayList в многопоточном приложении необходимо либо блокировать целый список во время перебора, либо клонировать его перед перебором; оба варианта требуют дополнительных ресурсов. CopyOnWriteArrayList вместо этого создаёт новую копию списка при выполнении модифицирующей операции и гарантирует, что её итераторы вернут состояние списка на момент создания итератора и не выкинут ConcurrentModificationException. Это так называемый алгоритм CopyOnWrite. Нет необходимости клонировать список до перебора или блокировать его во время перебора, т.к. используемая итератором копия списка изменяться не будет. Другими словами, CopyOnWriteArrayList содержит изменяемую ссылку на неизменяемый массив, поэтому до тех пор, пока эта ссылка остаётся фиксированной, вы получаете все преимущества потокобезопасности от неизменности без необходимости блокировок.

Конструкторы класса CopyOnWriteArrayList

Конструкторы CopyOnWriteArrayList с комментариями приведены в следующем листинге.

// создание пустой потокобезопасной коллекции
CopyOnWriteArrayList()

// создание потокобезопасной коллекции с данными list
CopyOnWriteArrayList(Collection<? extends E> list);

// создание потокобезопасной коллекции с копированием данных
CopyOnWriteArrayList(E[] toCopyIn)

Полное англоязычное описание класса CopyOnWriteArrayList представлено здесь.

Пример использования CopyOnWriteArrayList

Рассмотрим простой пример CopyOnWriteArrayListExample, в котором используем класс CopyOnWriteArrayList. В примере формируется набор данных lst, на основании которого создается потокобезопасная коллекция list типа CopyOnWriteArrayList. Данные коллекции list с помощью итератора выводятся в консоль два раза. В первом цикле в коллекцию вносятся изменения, во втором цикле данные выводятся без изменений. Результаты работы примера ниже после листинга примера.

import java.util.List;
import java.util.Iterator;
import java.util.ArrayList;
import java.util.concurrent.CopyOnWriteArrayList;

public class CopyOnWriteArrayListExample
{
    List<String> list;
    public CopyOnWriteArrayListExample()
    {
        List<String> lst = new ArrayList<String>();
        lst.add("Java");
        lst.add("J2EE");
        lst.add("J2SE");
        lst.add("Collection");
        lst.add("Concurrent");

        list = new CopyOnWriteArrayList<String>(lst);

        System.out.println("ЦИКЛ с изменением");
        printCollection(true);
        System.out.println("\nЦИКЛ без изменением");
        printCollection(false);

    }
    private void printCollection(boolean change)
    {
        Iterator<String> iterator = list.iterator();
        while(iterator.hasNext()){
            String element = iterator.next();
            System.out.printf("  %s %n", element);
            if (change) { 
                if (element.equals("Collection")) {
                    list.add("Новая строка");
                    list.remove(element);
                }
            }
        }
    }

    public static void main(String args[]) 
    {
        new CopyOnWriteArrayListExample();
        System.exit(0);
    }
}

Выполнение примера

В первом цикле, несмотря на внесение изменений в набор, в консоли представлены исходные данные. Во втором цикле — измененный набор данных. Пример демонстрирует, что итератор набора данных CopyOnWriteArrayList не вызвал исключения ConcurrentModificationException при одновременном внесении изменений и переборе значений — это значит, что алгоритм CopyOnWrite действует.


ЦИКЛ с изменением
  Java 
  J2EE 
  J2SE 
  Collection 
  Concurrent 

ЦИКЛ без изменением
  Java 
  J2EE 
  J2SE 
  Concurrent 
  Новая строка 
 

Класс CopyOnWriteArraySet

CopyOnWriteArraySet создан на основе класса CopyOnWriteArrayList, т.е. использует все его возможности. Он добавлен в JDK 1.5 как и остальные коллекции пакета java.util.concurrent. Лучше всего CopyOnWriteArraySet использовать для read-only коллекций небольших размеров. Если в данных коллекции произойдут изменения, накладные расходы, связанные с копированием, не должны быть ресурсоёмкими.

Необходимо помнить, что итераторы класса CopyOnWriteArraySet не поддерживают операцию remove(). Попытка удалить элемент во время итерирации приведет к вызову исключения UnsupportedOperationException. В своей работе итераторы используют «моментальный снимок» массива, который был сделан на момент создания итератора.

Таким образом, если набор данных небольшой и не подвержен изменениям, то лучше использовать CopyOnWriteArraySet.

Конструкторы класса CopyOnWriteArraySet

В следующем листинге приведены конструкторы класса с комментариями; дополнительных пояснений, полагаю, не требуется.

// Создание пустого набора данных
CopyOnWriteArraySet()

// Создание набора с элементами коллекции coll
CopyOnWriteArraySet(Collection<? extends E> coll)

Полное англоязычное описание методов класса CopyOnWriteArraySet с примером представлено здесь.

Пример использования CopyOnWriteArraySet

Рассмотрим пример ArraySetExample с использованием класса CopyOnWriteArraySet. В примере формируется набор данных list, на основании которого создается потокобезопасный набор cowSet типа CopyOnWriteArraySet. В качестве данных используется внутренний класс User. Данные коллекции cowSet с помощью итератора выводятся в консоль два раза. В первом цикле в коллекцию вносятся изменения : изменяется имя одного объекта и добавляется другой. Во втором цикле данные выводятся без изменений. Результаты работы примера ниже после листинга.

import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArraySet;

public class ArraySetExample 
{
    List<User>                list  ;
    CopyOnWriteArraySet<User> cowSet;

    public ArraySetExample()
    {
        list = new ArrayList<User>(); 
        list.add(new User ("Прохор "));
        list.add(new User ("Георгий"));
        list.add(new User ("Михаил" ));

        cowSet = new CopyOnWriteArraySet<User>(list);

        System.out.println("Цикл с измением");

        Iterator<User> itr = cowSet.iterator();
        int cnt = 0;
        while (itr.hasNext()) {
            User user = itr.next();
            System.out.println("  " + user.name);
            if (++cnt == 2) {
                cowSet.add(new User("Павел"));
                user.name += " Иванович";
            }
        }
        
        System.out.println("\nЦикл без измения");
        itr = cowSet.iterator();
        while (itr.hasNext()) {
            User user = itr.next();
            System.out.println("  " + user.name);
        }
    }
    class User {
        private String name;
        public User(String name) {
            this.name = name;
        }
    }

    public static void main(String args[]) 
    {
        new ArraySetExample();
    }
}

Выполнение примера

В первом цикле в консоль выдены исходные данные несмотря на внесение изменений в набор. Во втором цикле — измененный набор данных. Пример подтверждает, что итератор набора данных CopyOnWriteArraySet не вызвал исключения ConcurrentModificationException при одновременном переборе и изменении значений.


Цикл с измением
  Прохор 
  Георгий
  Михаил

Цикл без измения
  Прохор 
  Георгий Иванович
  Михаил
  Павел  
 

Скачать примеры

Рассмотренные на странице примеры использования потокобезопасных коллекций пакета java.util.concurrent в виде проекта Eclipse можно скачать здесь (9.14 Кб).

  Рейтинг@Mail.ru