diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMap.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMap.java new file mode 100644 index 0000000000..8af0660be6 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMap.java @@ -0,0 +1,504 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.activemq.artemis.utils.collections; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Arrays; +import java.util.List; + +import java.util.concurrent.locks.StampedLock; +import java.util.function.LongFunction; + +import com.google.common.collect.Lists; + +/** + * Map from long to an Object. + * + * Provides similar methods as a ConcurrentMap with 2 differences: + *
    + *
  1. No boxing/unboxing from long -> Long + *
  2. Open hash map with linear probing, no node allocations to store the values + *
+ * + * @param + */ +@SuppressWarnings("unchecked") +public class ConcurrentLongHashMap { + + private static final Object EmptyValue = null; + private static final Object DeletedValue = new Object(); + + private static final float MapFillFactor = 0.66f; + + private static final int DefaultExpectedItems = 256; + private static final int DefaultConcurrencyLevel = 16; + + private final Section[] sections; + + public ConcurrentLongHashMap() { + this(DefaultExpectedItems); + } + + public ConcurrentLongHashMap(int expectedItems) { + this(expectedItems, DefaultConcurrencyLevel); + } + + public ConcurrentLongHashMap(int expectedItems, int numSections) { + checkArgument(numSections > 0); + if (expectedItems < numSections) { + expectedItems = numSections; + } + + int perSectionExpectedItems = expectedItems / numSections; + int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor); + this.sections = (Section[]) new Section[numSections]; + + for (int i = 0; i < numSections; i++) { + sections[i] = new Section<>(perSectionCapacity); + } + } + + public int size() { + int size = 0; + for (Section s : sections) { + size += s.size; + } + return size; + } + + long getUsedBucketCount() { + long usedBucketCount = 0; + for (Section s : sections) { + usedBucketCount += s.usedBuckets; + } + return usedBucketCount; + } + + public long capacity() { + long capacity = 0; + for (Section s : sections) { + capacity += s.capacity; + } + return capacity; + } + + public boolean isEmpty() { + for (Section s : sections) { + if (s.size != 0) { + return false; + } + } + + return true; + } + + public V get(long key) { + long h = hash(key); + return getSection(h).get(key, (int) h); + } + + public boolean containsKey(long key) { + return get(key) != null; + } + + public V put(long key, V value) { + checkNotNull(value); + long h = hash(key); + return getSection(h).put(key, value, (int) h, false, null); + } + + public V putIfAbsent(long key, V value) { + checkNotNull(value); + long h = hash(key); + return getSection(h).put(key, value, (int) h, true, null); + } + + public V computeIfAbsent(long key, LongFunction provider) { + checkNotNull(provider); + long h = hash(key); + return getSection(h).put(key, null, (int) h, true, provider); + } + + public V remove(long key) { + long h = hash(key); + return getSection(h).remove(key, null, (int) h); + } + + public boolean remove(long key, Object value) { + checkNotNull(value); + long h = hash(key); + return getSection(h).remove(key, value, (int) h) != null; + } + + private Section getSection(long hash) { + // Use 32 msb out of long to get the section + final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1); + return sections[sectionIdx]; + } + + public void clear() { + for (Section s : sections) { + s.clear(); + } + } + + public void forEach(EntryProcessor processor) { + for (Section s : sections) { + s.forEach(processor); + } + } + + /** + * @return a new list of all keys (makes a copy) + */ + public List keys() { + List keys = Lists.newArrayListWithExpectedSize((int) size()); + forEach((key, value) -> keys.add(key)); + return keys; + } + + public ConcurrentLongHashSet keysLongHashSet() { + ConcurrentLongHashSet concurrentLongHashSet = new ConcurrentLongHashSet(size()); + forEach((key, value) -> concurrentLongHashSet.add(key)); + return concurrentLongHashSet; + } + + public List values() { + List values = Lists.newArrayListWithExpectedSize((int) size()); + forEach((key, value) -> values.add(value)); + return values; + } + + public interface EntryProcessor { + void accept(long key, V value); + } + + // A section is a portion of the hash map that is covered by a single + @SuppressWarnings("serial") + private static final class Section extends StampedLock { + private long[] keys; + private V[] values; + + private int capacity; + private volatile int size; + private int usedBuckets; + private int resizeThreshold; + + Section(int capacity) { + this.capacity = alignToPowerOfTwo(capacity); + this.keys = new long[this.capacity]; + this.values = (V[]) new Object[this.capacity]; + this.size = 0; + this.usedBuckets = 0; + this.resizeThreshold = (int) (this.capacity * MapFillFactor); + } + + @SuppressWarnings("NonAtomicVolatileUpdate") + V get(long key, int keyHash) { + int bucket = keyHash; + + long stamp = tryOptimisticRead(); + boolean acquiredLock = false; + + try { + while (true) { + int capacity = this.capacity; + bucket = signSafeMod(bucket, capacity); + + // First try optimistic locking + long storedKey = keys[bucket]; + V storedValue = values[bucket]; + + if (!acquiredLock && validate(stamp)) { + // The values we have read are consistent + if (storedKey == key) { + return storedValue != DeletedValue ? storedValue : null; + } else if (storedValue == EmptyValue) { + // Not found + return null; + } + } else { + // Fallback to acquiring read lock + if (!acquiredLock) { + stamp = readLock(); + acquiredLock = true; + storedKey = keys[bucket]; + storedValue = values[bucket]; + } + + if (capacity != this.capacity) { + // There has been a rehashing. We need to restart the search + bucket = keyHash; + continue; + } + + if (storedKey == key) { + return storedValue != DeletedValue ? storedValue : null; + } else if (storedValue == EmptyValue) { + // Not found + return null; + } + } + + ++bucket; + } + } finally { + if (acquiredLock) { + unlockRead(stamp); + } + } + } + + @SuppressWarnings("NonAtomicVolatileUpdate") + V put(long key, V value, int keyHash, boolean onlyIfAbsent, LongFunction valueProvider) { + int bucket = keyHash; + + long stamp = writeLock(); + int capacity = this.capacity; + + // Remember where we find the first available spot + int firstDeletedKey = -1; + + try { + while (true) { + bucket = signSafeMod(bucket, capacity); + + long storedKey = keys[bucket]; + V storedValue = values[bucket]; + + if (storedKey == key) { + if (storedValue == EmptyValue) { + values[bucket] = value != null ? value : valueProvider.apply(key); + ++size; + ++usedBuckets; + return valueProvider != null ? values[bucket] : null; + } else if (storedValue == DeletedValue) { + values[bucket] = value != null ? value : valueProvider.apply(key); + ++size; + return valueProvider != null ? values[bucket] : null; + } else if (!onlyIfAbsent) { + // Over written an old value for same key + values[bucket] = value; + return storedValue; + } else { + return storedValue; + } + } else if (storedValue == EmptyValue) { + // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted + // key, we should write at that position + if (firstDeletedKey != -1) { + bucket = firstDeletedKey; + } else { + ++usedBuckets; + } + + keys[bucket] = key; + values[bucket] = value != null ? value : valueProvider.apply(key); + ++size; + return valueProvider != null ? values[bucket] : null; + } else if (storedValue == DeletedValue) { + // The bucket contained a different deleted key + if (firstDeletedKey == -1) { + firstDeletedKey = bucket; + } + } + + ++bucket; + } + } finally { + if (usedBuckets > resizeThreshold) { + try { + rehash(); + } finally { + unlockWrite(stamp); + } + } else { + unlockWrite(stamp); + } + } + } + + @SuppressWarnings("NonAtomicVolatileUpdate") + private V remove(long key, Object value, int keyHash) { + int bucket = keyHash; + long stamp = writeLock(); + + try { + while (true) { + int capacity = this.capacity; + bucket = signSafeMod(bucket, capacity); + + long storedKey = keys[bucket]; + V storedValue = values[bucket]; + if (storedKey == key) { + if (value == null || value.equals(storedValue)) { + if (storedValue == EmptyValue || storedValue == DeletedValue) { + return null; + } + + --size; + V nextValueInArray = values[signSafeMod(bucket + 1, capacity)]; + if (nextValueInArray == EmptyValue) { + values[bucket] = (V) EmptyValue; + --usedBuckets; + } else { + values[bucket] = (V) DeletedValue; + } + + return storedValue; + } else { + return null; + } + } else if (storedValue == EmptyValue) { + // Key wasn't found + return null; + } + + ++bucket; + } + + } finally { + unlockWrite(stamp); + } + } + + void clear() { + long stamp = writeLock(); + + try { + Arrays.fill(keys, 0); + Arrays.fill(values, EmptyValue); + this.size = 0; + this.usedBuckets = 0; + } finally { + unlockWrite(stamp); + } + } + + public void forEach(EntryProcessor processor) { + long stamp = tryOptimisticRead(); + + int capacity = this.capacity; + long[] keys = this.keys; + V[] values = this.values; + + boolean acquiredReadLock = false; + + try { + + // Validate no rehashing + if (!validate(stamp)) { + // Fallback to read lock + stamp = readLock(); + acquiredReadLock = true; + + capacity = this.capacity; + keys = this.keys; + values = this.values; + } + + // Go through all the buckets for this section + for (int bucket = 0; bucket < capacity; bucket++) { + long storedKey = keys[bucket]; + V storedValue = values[bucket]; + + if (!acquiredReadLock && !validate(stamp)) { + // Fallback to acquiring read lock + stamp = readLock(); + acquiredReadLock = true; + + storedKey = keys[bucket]; + storedValue = values[bucket]; + } + + if (storedValue != DeletedValue && storedValue != EmptyValue) { + processor.accept(storedKey, storedValue); + } + } + } finally { + if (acquiredReadLock) { + unlockRead(stamp); + } + } + } + + private void rehash() { + // Expand the hashmap + int newCapacity = capacity * 2; + long[] newKeys = new long[newCapacity]; + V[] newValues = (V[]) new Object[newCapacity]; + + // Re-hash table + for (int i = 0; i < keys.length; i++) { + long storedKey = keys[i]; + V storedValue = values[i]; + if (storedValue != EmptyValue && storedValue != DeletedValue) { + insertKeyValueNoLock(newKeys, newValues, storedKey, storedValue); + } + } + + capacity = newCapacity; + keys = newKeys; + values = newValues; + usedBuckets = size; + resizeThreshold = (int) (capacity * MapFillFactor); + } + + private static void insertKeyValueNoLock(long[] keys, V[] values, long key, V value) { + int bucket = (int) hash(key); + + while (true) { + bucket = signSafeMod(bucket, keys.length); + + V storedValue = values[bucket]; + + if (storedValue == EmptyValue) { + // The bucket is empty, so we can use it + keys[bucket] = key; + values[bucket] = value; + return; + } + + ++bucket; + } + } + } + + private static final long HashMixer = 0xc6a4a7935bd1e995L; + private static final int R = 47; + + static long hash(long key) { + long hash = key * HashMixer; + hash ^= hash >>> R; + hash *= HashMixer; + return hash; + } + + static int signSafeMod(long n, int Max) { + return (int) n & (Max - 1); + } + + static int alignToPowerOfTwo(int n) { + return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1)); + } +} \ No newline at end of file diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSet.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSet.java new file mode 100644 index 0000000000..17d94b7c51 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSet.java @@ -0,0 +1,423 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.activemq.artemis.utils.collections; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.locks.StampedLock; + +/** + * Concurrent hash set for primitive longs + * + * Provides similar methods as a ConcurrentSet<Long> but since it's an open hash map with linear probing, no node + * allocations are required to store the values. + *

+ * Items MUST be >= 0. + */ +public class ConcurrentLongHashSet { + + private static final long EmptyItem = -1L; + private static final long DeletedItem = -2L; + + private static final float SetFillFactor = 0.66f; + + private static final int DefaultExpectedItems = 256; + private static final int DefaultConcurrencyLevel = 16; + + private final Section[] sections; + + public interface ConsumerLong { + void accept(long item); + } + + public ConcurrentLongHashSet() { + this(DefaultExpectedItems); + } + + public ConcurrentLongHashSet(int expectedItems) { + this(expectedItems, DefaultConcurrencyLevel); + } + + public ConcurrentLongHashSet(int expectedItems, final int numSections) { + checkArgument(numSections > 0); + if (expectedItems < numSections) { + expectedItems = numSections; + } + + int perSectionExpectedItems = expectedItems / numSections; + int perSectionCapacity = (int) (perSectionExpectedItems / SetFillFactor); + this.sections = new Section[numSections]; + + for (int i = 0; i < numSections; i++) { + sections[i] = new Section(perSectionCapacity); + } + } + + public int size() { + int size = 0; + for (Section s : sections) { + size += s.size; + } + return size; + } + + public long capacity() { + long capacity = 0; + for (Section s : sections) { + capacity += s.capacity; + } + return capacity; + } + + public boolean isEmpty() { + for (Section s : sections) { + if (s.size != 0) { + return false; + } + } + + return true; + } + + long getUsedBucketCount() { + long usedBucketCount = 0; + for (Section s : sections) { + usedBucketCount += s.usedBuckets; + } + return usedBucketCount; + } + + public boolean contains(long item) { + checkBiggerEqualZero(item); + long h = hash(item); + return getSection(h).contains(item, (int) h); + } + + public boolean add(long item) { + checkBiggerEqualZero(item); + long h = hash(item); + return getSection(h).add(item, (int) h); + } + + /** + * Remove an existing entry if found + * + * @param item + * @return true if removed or false if item was not present + */ + public boolean remove(long item) { + checkBiggerEqualZero(item); + long h = hash(item); + return getSection(h).remove(item, (int) h); + } + + private Section getSection(long hash) { + // Use 32 msb out of long to get the section + final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1); + return sections[sectionIdx]; + } + + public void clear() { + for (Section s : sections) { + s.clear(); + } + } + + public void forEach(ConsumerLong processor) { + for (Section s : sections) { + s.forEach(processor); + } + } + + /** + * @return a new list of all keys (makes a copy) + */ + public Set items() { + Set items = new HashSet<>(); + forEach(items::add); + return items; + } + + // A section is a portion of the hash map that is covered by a single + @SuppressWarnings("serial") + private static final class Section extends StampedLock { + // Keys and values are stored interleaved in the table array + private long[] table; + + private int capacity; + private volatile int size; + private int usedBuckets; + private int resizeThreshold; + + Section(int capacity) { + this.capacity = alignToPowerOfTwo(capacity); + this.table = new long[this.capacity]; + this.size = 0; + this.usedBuckets = 0; + this.resizeThreshold = (int) (this.capacity * SetFillFactor); + Arrays.fill(table, EmptyItem); + } + + boolean contains(long item, int hash) { + long stamp = tryOptimisticRead(); + boolean acquiredLock = false; + int bucket = signSafeMod(hash, capacity); + + try { + while (true) { + // First try optimistic locking + long storedItem = table[bucket]; + + if (!acquiredLock && validate(stamp)) { + // The values we have read are consistent + if (item == storedItem) { + return true; + } else if (storedItem == EmptyItem) { + // Not found + return false; + } + } else { + // Fallback to acquiring read lock + if (!acquiredLock) { + stamp = readLock(); + acquiredLock = true; + + bucket = signSafeMod(hash, capacity); + storedItem = table[bucket]; + } + + if (item == storedItem) { + return true; + } else if (storedItem == EmptyItem) { + // Not found + return false; + } + } + + bucket = (bucket + 1) & (table.length - 1); + } + } finally { + if (acquiredLock) { + unlockRead(stamp); + } + } + } + + @SuppressWarnings("NonAtomicVolatileUpdate") + boolean add(long item, long hash) { + long stamp = writeLock(); + int bucket = signSafeMod(hash, capacity); + + // Remember where we find the first available spot + int firstDeletedItem = -1; + + try { + while (true) { + long storedItem = table[bucket]; + + if (item == storedItem) { + // Item was already in set + return false; + } else if (storedItem == EmptyItem) { + // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted + // key, we should write at that position + if (firstDeletedItem != -1) { + bucket = firstDeletedItem; + } else { + ++usedBuckets; + } + + table[bucket] = item; + ++size; + return true; + } else if (storedItem == DeletedItem) { + // The bucket contained a different deleted key + if (firstDeletedItem == -1) { + firstDeletedItem = bucket; + } + } + + bucket = (bucket + 1) & (table.length - 1); + } + } finally { + if (usedBuckets > resizeThreshold) { + try { + rehash(); + } finally { + unlockWrite(stamp); + } + } else { + unlockWrite(stamp); + } + } + } + + @SuppressWarnings("NonAtomicVolatileUpdate") + private boolean remove(long item, int hash) { + long stamp = writeLock(); + int bucket = signSafeMod(hash, capacity); + + try { + while (true) { + long storedItem = table[bucket]; + if (item == storedItem) { + --size; + + cleanBucket(bucket); + return true; + + } else if (storedItem == EmptyItem) { + // Key wasn't found + return false; + } + + bucket = (bucket + 1) & (table.length - 1); + } + } finally { + unlockWrite(stamp); + } + } + + private void cleanBucket(int bucket) { + int nextInArray = (bucket + 1) & (table.length - 1); + if (table[nextInArray] == EmptyItem) { + table[bucket] = EmptyItem; + --usedBuckets; + } else { + table[bucket] = DeletedItem; + } + } + + void clear() { + long stamp = writeLock(); + + try { + Arrays.fill(table, EmptyItem); + this.size = 0; + this.usedBuckets = 0; + } finally { + unlockWrite(stamp); + } + } + + public void forEach(ConsumerLong processor) { + long stamp = tryOptimisticRead(); + + long[] table = this.table; + boolean acquiredReadLock = false; + + try { + + // Validate no rehashing + if (!validate(stamp)) { + // Fallback to read lock + stamp = readLock(); + acquiredReadLock = true; + table = this.table; + } + + // Go through all the buckets for this section + for (int bucket = 0; bucket < table.length; bucket++) { + long storedItem = table[bucket]; + + if (!acquiredReadLock && !validate(stamp)) { + // Fallback to acquiring read lock + stamp = readLock(); + acquiredReadLock = true; + + storedItem = table[bucket]; + } + + if (storedItem != DeletedItem && storedItem != EmptyItem) { + processor.accept(storedItem); + } + } + } finally { + if (acquiredReadLock) { + unlockRead(stamp); + } + } + } + + private void rehash() { + // Expand the hashmap + int newCapacity = capacity * 2; + long[] newTable = new long[newCapacity]; + Arrays.fill(newTable, EmptyItem); + + // Re-hash table + for (int i = 0; i < table.length; i++) { + long storedItem = table[i]; + if (storedItem != EmptyItem && storedItem != DeletedItem) { + insertKeyValueNoLock(newTable, newCapacity, storedItem); + } + } + + capacity = newCapacity; + table = newTable; + usedBuckets = size; + resizeThreshold = (int) (capacity * SetFillFactor); + } + + private static void insertKeyValueNoLock(long[] table, int capacity, long item) { + int bucket = signSafeMod(hash(item), capacity); + + while (true) { + long storedKey = table[bucket]; + + if (storedKey == EmptyItem) { + // The bucket is empty, so we can use it + table[bucket] = item; + return; + } + + bucket = (bucket + 1) & (table.length - 1); + } + } + } + + private static final long HashMixer = 0xc6a4a7935bd1e995L; + private static final int R = 47; + + static long hash(long key) { + long hash = key * HashMixer; + hash ^= hash >>> R; + hash *= HashMixer; + return hash; + } + + static int signSafeMod(long n, int Max) { + return (int) (n & (Max - 1)); + } + + static int alignToPowerOfTwo(int n) { + return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1)); + } + + static void checkBiggerEqualZero(long n) { + if (n < 0L) { + throw new IllegalArgumentException("Keys and values must be >= 0"); + } + } +} \ No newline at end of file diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMapTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMapTest.java new file mode 100644 index 0000000000..b08251fe44 --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMapTest.java @@ -0,0 +1,405 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.LongFunction; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNull; + +public class ConcurrentLongHashMapTest { + + @Test + public void simpleInsertions() { + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(16); + + assertTrue(map.isEmpty()); + assertNull(map.put(1, "one")); + assertFalse(map.isEmpty()); + + assertNull(map.put(2, "two")); + assertNull(map.put(3, "three")); + + assertEquals(map.size(), 3); + + assertEquals(map.get(1), "one"); + assertEquals(map.size(), 3); + + assertEquals(map.remove(1), "one"); + assertEquals(map.size(), 2); + assertEquals(map.get(1), null); + assertEquals(map.get(5), null); + assertEquals(map.size(), 2); + + assertNull(map.put(1, "one")); + assertEquals(map.size(), 3); + assertEquals(map.put(1, "uno"), "one"); + assertEquals(map.size(), 3); + } + + @Test + public void testRemove() { + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(); + + assertTrue(map.isEmpty()); + assertNull(map.put(1, "one")); + assertFalse(map.isEmpty()); + + assertFalse(map.remove(0, "zero")); + assertFalse(map.remove(1, "uno")); + + assertFalse(map.isEmpty()); + assertTrue(map.remove(1, "one")); + assertTrue(map.isEmpty()); + } + + @Test + public void testNegativeUsedBucketCount() { + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(16, 1); + + map.put(0, "zero"); + assertEquals(1, map.getUsedBucketCount()); + map.put(0, "zero1"); + assertEquals(1, map.getUsedBucketCount()); + map.remove(0); + assertEquals(0, map.getUsedBucketCount()); + map.remove(0); + assertEquals(0, map.getUsedBucketCount()); + } + + @Test + public void testRehashing() { + int n = 16; + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(n / 2, 1); + assertEquals(map.capacity(), n); + assertEquals(map.size(), 0); + + for (int i = 0; i < n; i++) { + map.put(i, i); + } + + assertEquals(map.capacity(), 2 * n); + assertEquals(map.size(), n); + } + + @Test + public void testRehashingWithDeletes() { + int n = 16; + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(n / 2, 1); + assertEquals(map.capacity(), n); + assertEquals(map.size(), 0); + + for (int i = 0; i < n / 2; i++) { + map.put(i, i); + } + + for (int i = 0; i < n / 2; i++) { + map.remove(i); + } + + for (int i = n; i < (2 * n); i++) { + map.put(i, i); + } + + assertEquals(map.capacity(), 2 * n); + assertEquals(map.size(), n); + } + + @Test + public void concurrentInsertions() throws Throwable { + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(); + ExecutorService executor = Executors.newCachedThreadPool(); + + final int nThreads = 16; + final int N = 100_000; + String value = "value"; + + List> futures = new ArrayList<>(); + for (int i = 0; i < nThreads; i++) { + final int threadIdx = i; + + futures.add(executor.submit(() -> { + Random random = new Random(); + + for (int j = 0; j < N; j++) { + long key = random.nextLong(); + // Ensure keys are uniques + key -= key % (threadIdx + 1); + + map.put(key, value); + } + })); + } + + for (Future future : futures) { + future.get(); + } + + assertEquals(map.size(), N * nThreads); + + executor.shutdown(); + } + + @Test + public void concurrentInsertionsAndReads() throws Throwable { + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(); + ExecutorService executor = Executors.newCachedThreadPool(); + + final int nThreads = 16; + final int N = 100_000; + String value = "value"; + + List> futures = new ArrayList<>(); + for (int i = 0; i < nThreads; i++) { + final int threadIdx = i; + + futures.add(executor.submit(() -> { + Random random = new Random(); + + for (int j = 0; j < N; j++) { + long key = random.nextLong(); + // Ensure keys are uniques + key -= key % (threadIdx + 1); + + map.put(key, value); + } + })); + } + + for (Future future : futures) { + future.get(); + } + + assertEquals(map.size(), N * nThreads); + + executor.shutdown(); + } + + @Test + public void testIteration() { + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(); + + assertEquals(map.keys(), Collections.emptyList()); + assertEquals(map.values(), Collections.emptyList()); + + map.put(0, "zero"); + + assertEquals(map.keys(), Lists.newArrayList(0L)); + assertEquals(map.values(), Lists.newArrayList("zero")); + + map.remove(0); + + assertEquals(map.keys(), Collections.emptyList()); + assertEquals(map.values(), Collections.emptyList()); + + map.put(0, "zero"); + map.put(1, "one"); + map.put(2, "two"); + + List keys = map.keys(); + Collections.sort(keys); + assertEquals(keys, Lists.newArrayList(0L, 1L, 2L)); + + List values = map.values(); + Collections.sort(values); + assertEquals(values, Lists.newArrayList("one", "two", "zero")); + + map.put(1, "uno"); + + keys = map.keys(); + Collections.sort(keys); + assertEquals(keys, Lists.newArrayList(0L, 1L, 2L)); + + values = map.values(); + Collections.sort(values); + assertEquals(values, Lists.newArrayList("two", "uno", "zero")); + + map.clear(); + assertTrue(map.isEmpty()); + } + + @Test + public void testHashConflictWithDeletion() { + final int Buckets = 16; + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(Buckets, 1); + + // Pick 2 keys that fall into the same bucket + long key1 = 1; + long key2 = 27; + + int bucket1 = ConcurrentLongHashMap.signSafeMod(ConcurrentLongHashMap.hash(key1), Buckets); + int bucket2 = ConcurrentLongHashMap.signSafeMod(ConcurrentLongHashMap.hash(key2), Buckets); + assertEquals(bucket1, bucket2); + + assertEquals(map.put(key1, "value-1"), null); + assertEquals(map.put(key2, "value-2"), null); + assertEquals(map.size(), 2); + + assertEquals(map.remove(key1), "value-1"); + assertEquals(map.size(), 1); + + assertEquals(map.put(key1, "value-1-overwrite"), null); + assertEquals(map.size(), 2); + + assertEquals(map.remove(key1), "value-1-overwrite"); + assertEquals(map.size(), 1); + + assertEquals(map.put(key2, "value-2-overwrite"), "value-2"); + assertEquals(map.get(key2), "value-2-overwrite"); + + assertEquals(map.size(), 1); + assertEquals(map.remove(key2), "value-2-overwrite"); + assertTrue(map.isEmpty()); + } + + @Test + public void testPutIfAbsent() { + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(); + assertEquals(map.putIfAbsent(1, "one"), null); + assertEquals(map.get(1), "one"); + + assertEquals(map.putIfAbsent(1, "uno"), "one"); + assertEquals(map.get(1), "one"); + } + + @Test + public void testComputeIfAbsent() { + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(16, 1); + AtomicInteger counter = new AtomicInteger(); + LongFunction provider = key -> counter.getAndIncrement(); + + assertEquals(map.computeIfAbsent(0, provider).intValue(), 0); + assertEquals(map.get(0).intValue(), 0); + + assertEquals(map.computeIfAbsent(1, provider).intValue(), 1); + assertEquals(map.get(1).intValue(), 1); + + assertEquals(map.computeIfAbsent(1, provider).intValue(), 1); + assertEquals(map.get(1).intValue(), 1); + + assertEquals(map.computeIfAbsent(2, provider).intValue(), 2); + assertEquals(map.get(2).intValue(), 2); + } + + int Iterations = 1; + int ReadIterations = 100; + int N = 1_000_000; + + public void benchConcurrentLongHashMap() throws Exception { + // public static void main(String args[]) { + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(N, 1); + + for (long i = 0; i < Iterations; i++) { + for (int j = 0; j < N; j++) { + map.put(i, "value"); + } + + for (long h = 0; h < ReadIterations; h++) { + for (int j = 0; j < N; j++) { + map.get(i); + } + } + + for (int j = 0; j < N; j++) { + map.remove(i); + } + } + } + + public void benchConcurrentHashMap() throws Exception { + ConcurrentHashMap map = new ConcurrentHashMap<>(N, 0.66f, 1); + + for (long i = 0; i < Iterations; i++) { + for (int j = 0; j < N; j++) { + map.put(i, "value"); + } + + for (long h = 0; h < ReadIterations; h++) { + for (int j = 0; j < N; j++) { + map.get(i); + } + } + + for (int j = 0; j < N; j++) { + map.remove(i); + } + } + } + + void benchHashMap() throws Exception { + HashMap map = new HashMap<>(N, 0.66f); + + for (long i = 0; i < Iterations; i++) { + for (int j = 0; j < N; j++) { + map.put(i, "value"); + } + + for (long h = 0; h < ReadIterations; h++) { + for (int j = 0; j < N; j++) { + map.get(i); + } + } + + for (int j = 0; j < N; j++) { + map.remove(i); + } + } + } + + public static void main(String[] args) throws Exception { + ConcurrentLongHashMapTest t = new ConcurrentLongHashMapTest(); + + long start = System.nanoTime(); + // t.benchHashMap(); + long end = System.nanoTime(); + + System.out.println("HM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms"); + + start = System.nanoTime(); + t.benchConcurrentHashMap(); + end = System.nanoTime(); + + System.out.println("CHM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms"); + + start = System.nanoTime(); + // t.benchConcurrentLongHashMap(); + end = System.nanoTime(); + + System.out.println("CLHM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms"); + + } +} \ No newline at end of file diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSetTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSetTest.java new file mode 100644 index 0000000000..24337f10bd --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSetTest.java @@ -0,0 +1,249 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ConcurrentLongHashSetTest { + + @Test + public void simpleInsertions() { + ConcurrentLongHashSet set = new ConcurrentLongHashSet(16); + + assertTrue(set.isEmpty()); + assertTrue(set.add(1)); + assertFalse(set.isEmpty()); + + assertTrue(set.add(2)); + assertTrue(set.add(3)); + + assertEquals(set.size(), 3); + + assertTrue(set.contains(1)); + assertEquals(set.size(), 3); + + assertTrue(set.remove(1)); + assertEquals(set.size(), 2); + assertFalse(set.contains(1)); + assertFalse(set.contains(5)); + assertEquals(set.size(), 2); + + assertTrue(set.add(1)); + assertEquals(set.size(), 3); + assertFalse(set.add(1)); + assertEquals(set.size(), 3); + } + + @Test + public void testRemove() { + ConcurrentLongHashSet set = new ConcurrentLongHashSet(); + + assertTrue(set.isEmpty()); + assertTrue(set.add(1)); + assertFalse(set.isEmpty()); + + assertFalse(set.remove(0)); + assertFalse(set.isEmpty()); + assertTrue(set.remove(1)); + assertTrue(set.isEmpty()); + } + + @Test + public void testRehashing() { + int n = 16; + ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1); + assertEquals(set.capacity(), n); + assertEquals(set.size(), 0); + + for (int i = 0; i < n; i++) { + set.add(i); + } + + assertEquals(set.capacity(), 2 * n); + assertEquals(set.size(), n); + } + + @Test + public void testRehashingWithDeletes() { + int n = 16; + ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1); + assertEquals(set.capacity(), n); + assertEquals(set.size(), 0); + + for (int i = 0; i < n / 2; i++) { + set.add(i); + } + + for (int i = 0; i < n / 2; i++) { + set.remove(i); + } + + for (int i = n; i < (2 * n); i++) { + set.add(i); + } + + assertEquals(set.capacity(), 2 * n); + assertEquals(set.size(), n); + } + + @Test + public void concurrentInsertions() throws Throwable { + ConcurrentLongHashSet set = new ConcurrentLongHashSet(); + ExecutorService executor = Executors.newCachedThreadPool(); + + final int nThreads = 16; + final int N = 100_000; + + List> futures = new ArrayList<>(); + for (int i = 0; i < nThreads; i++) { + final int threadIdx = i; + + futures.add(executor.submit(() -> { + Random random = new Random(); + + for (int j = 0; j < N; j++) { + long key = Math.abs(random.nextLong()); + // Ensure keys are unique + key -= key % (threadIdx + 1); + + set.add(key); + } + })); + } + + for (Future future : futures) { + future.get(); + } + + assertEquals(set.size(), N * nThreads); + + executor.shutdown(); + } + + @Test + public void concurrentInsertionsAndReads() throws Throwable { + ConcurrentLongHashSet map = new ConcurrentLongHashSet(); + ExecutorService executor = Executors.newCachedThreadPool(); + + final int nThreads = 16; + final int N = 100_000; + + List> futures = new ArrayList<>(); + for (int i = 0; i < nThreads; i++) { + final int threadIdx = i; + + futures.add(executor.submit(() -> { + Random random = new Random(); + + for (int j = 0; j < N; j++) { + long key = Math.abs(random.nextLong()); + // Ensure keys are unique + key -= key % (threadIdx + 1); + + map.add(key); + } + })); + } + + for (Future future : futures) { + future.get(); + } + + assertEquals(map.size(), N * nThreads); + + executor.shutdown(); + } + + @Test + public void testIteration() { + ConcurrentLongHashSet set = new ConcurrentLongHashSet(); + + assertEquals(set.items(), Collections.emptySet()); + + set.add(0L); + + assertEquals(set.items(), Sets.newHashSet(0L)); + + set.remove(0L); + + assertEquals(set.items(), Collections.emptySet()); + + set.add(0L); + set.add(1L); + set.add(2L); + + List values = Lists.newArrayList(set.items()); + Collections.sort(values); + assertEquals(values, Lists.newArrayList(0L, 1L, 2L)); + + set.clear(); + assertTrue(set.isEmpty()); + } + + @Test + public void testHashConflictWithDeletion() { + final int Buckets = 16; + ConcurrentLongHashSet set = new ConcurrentLongHashSet(Buckets, 1); + + // Pick 2 keys that fall into the same bucket + long key1 = 1; + long key2 = 27; + + int bucket1 = ConcurrentLongHashSet.signSafeMod(ConcurrentLongHashSet.hash(key1), Buckets); + int bucket2 = ConcurrentLongHashSet.signSafeMod(ConcurrentLongHashSet.hash(key2), Buckets); + assertEquals(bucket1, bucket2); + + assertTrue(set.add(key1)); + assertTrue(set.add(key2)); + assertEquals(set.size(), 2); + + assertTrue(set.remove(key1)); + assertEquals(set.size(), 1); + + assertTrue(set.add(key1)); + assertEquals(set.size(), 2); + + assertTrue(set.remove(key1)); + assertEquals(set.size(), 1); + + assertFalse(set.add(key2)); + assertTrue(set.contains(key2)); + + assertEquals(set.size(), 1); + assertTrue(set.remove(key2)); + assertTrue(set.isEmpty()); + } + +} \ No newline at end of file diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java index 943077c59f..8de3da6400 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java @@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.journal.impl; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.Set; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; @@ -30,7 +29,7 @@ import org.apache.activemq.artemis.core.journal.EncoderPersister; import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord; -import org.apache.activemq.artemis.utils.ConcurrentHashSet; +import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet; /** * Super class for Journal maintenances such as clean up and Compactor @@ -56,7 +55,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback private ActiveMQBuffer writingChannel; - private final Set recordsSnapshot = new ConcurrentHashSet<>(); + private final ConcurrentLongHashSet recordsSnapshot; protected final List newDataFiles = new ArrayList<>(); @@ -67,14 +66,14 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory, final JournalImpl journal, final JournalFilesRepository filesRepository, - final Set recordsSnapshot, + final ConcurrentLongHashSet recordsSnapshot, final long nextOrderingID) { super(); this.journal = journal; this.filesRepository = filesRepository; this.fileFactory = fileFactory; this.nextOrderingID = nextOrderingID; - this.recordsSnapshot.addAll(recordsSnapshot); + this.recordsSnapshot = recordsSnapshot; } // Public -------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java index 8e5ca2c8f8..5ef240ae2b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java @@ -18,8 +18,6 @@ package org.apache.activemq.artemis.core.journal.impl; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -43,6 +41,7 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRec import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRecordTX; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX; +import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; /** * Journal used at a replicating backup server during the synchronization of data with the 'live' @@ -54,7 +53,7 @@ public final class FileWrapperJournal extends JournalBase { private final ReentrantLock lockAppend = new ReentrantLock(); - private final ConcurrentMap transactions = new ConcurrentHashMap<>(); + private final ConcurrentLongHashMap transactions = new ConcurrentLongHashMap<>(); private final JournalImpl journal; protected volatile JournalFile currentFile; @@ -181,7 +180,7 @@ public final class FileWrapperJournal extends JournalBase { IOCompletion callback, boolean lineUpContext) throws Exception { JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null); - AtomicInteger value = transactions.remove(Long.valueOf(txID)); + AtomicInteger value = transactions.remove(txID); if (value != null) { commitRecord.setNumberOfRecords(value.get()); } @@ -195,7 +194,7 @@ public final class FileWrapperJournal extends JournalBase { boolean sync, IOCompletion callback) throws Exception { JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData); - AtomicInteger value = transactions.get(Long.valueOf(txID)); + AtomicInteger value = transactions.get(txID); if (value != null) { prepareRecord.setNumberOfRecords(value.get()); } @@ -204,7 +203,7 @@ public final class FileWrapperJournal extends JournalBase { private int count(long txID) throws ActiveMQException { AtomicInteger defaultValue = new AtomicInteger(1); - AtomicInteger count = transactions.putIfAbsent(Long.valueOf(txID), defaultValue); + AtomicInteger count = transactions.putIfAbsent(txID, defaultValue); if (count != null) { return count.incrementAndGet(); } @@ -219,7 +218,7 @@ public final class FileWrapperJournal extends JournalBase { @Override public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception { JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID); - AtomicInteger value = transactions.remove(Long.valueOf(txID)); + AtomicInteger value = transactions.remove(txID); if (value != null) { rollbackRecord.setNumberOfRecords(value.get()); } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java index 8b89c3ed96..e3e1e7b469 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java @@ -18,12 +18,8 @@ package org.apache.activemq.artemis.core.journal.impl; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; @@ -41,6 +37,8 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRec import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; +import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet; import org.jboss.logging.Logger; public class JournalCompactor extends AbstractJournalUpdateTask implements JournalRecordProvider { @@ -53,11 +51,11 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ private static final short COMPACT_SPLIT_LINE = 2; // Snapshot of transactions that were pending when the compactor started - private final Map pendingTransactions = new ConcurrentHashMap<>(); + private final ConcurrentLongHashMap pendingTransactions = new ConcurrentLongHashMap<>(); - private final Map newRecords = new HashMap<>(); + private final ConcurrentLongHashMap newRecords = new ConcurrentLongHashMap<>(); - private final Map newTransactions = new HashMap<>(); + private final ConcurrentLongHashMap newTransactions = new ConcurrentLongHashMap<>(); /** * Commands that happened during compacting @@ -120,18 +118,18 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ return newDataFiles; } - public Map getNewRecords() { + public ConcurrentLongHashMap getNewRecords() { return newRecords; } - public Map getNewTransactions() { + public ConcurrentLongHashMap getNewTransactions() { return newTransactions; } public JournalCompactor(final SequentialFileFactory fileFactory, final JournalImpl journal, final JournalFilesRepository filesRepository, - final Set recordsSnapshot, + final ConcurrentLongHashSet recordsSnapshot, final long firstFileID) { super(fileFactory, journal, filesRepository, recordsSnapshot, firstFileID); } @@ -628,7 +626,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ } @Override - public Map getRecords() { + public ConcurrentLongHashMap getRecords() { return newRecords; } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 81ae9c0fee..1758999b99 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -31,8 +31,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; @@ -74,6 +72,8 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalBundle; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ConcurrentHashSet; +import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; +import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet; import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory; @@ -168,12 +168,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal private final JournalFilesRepository filesRepository; // Compacting may replace this structure - private final ConcurrentMap records = new ConcurrentHashMap<>(); + private final ConcurrentLongHashMap records = new ConcurrentLongHashMap<>(); - private final Set pendingRecords = new ConcurrentHashSet<>(); + private final ConcurrentLongHashSet pendingRecords = new ConcurrentLongHashSet(); // Compacting may replace this structure - private final ConcurrentMap transactions = new ConcurrentHashMap<>(); + private final ConcurrentLongHashMap transactions = new ConcurrentLongHashMap<>(); // This will be set only while the JournalCompactor is being executed private volatile JournalCompactor compactor; @@ -345,7 +345,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } @Override - public Map getRecords() { + public ConcurrentLongHashMap getRecords() { return records; } @@ -1487,12 +1487,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal return; } - compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keySet(), dataFilesToProcess.get(0).getFileID()); + compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keysLongHashSet(), dataFilesToProcess.get(0).getFileID()); - for (Map.Entry entry : transactions.entrySet()) { - compactor.addPendingTransaction(entry.getKey(), entry.getValue().getPositiveArray()); - entry.getValue().setCompacting(); - } + transactions.forEach((id, pendingTransaction) -> { + compactor.addPendingTransaction(id, pendingTransaction.getPositiveArray()); + pendingTransaction.setCompacting(); + }); // We will calculate the new records during compacting, what will take the position the records will take // after compacting @@ -1540,9 +1540,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal newDatafiles = localCompactor.getNewDataFiles(); // Restore newRecords created during compacting - for (Map.Entry newRecordEntry : localCompactor.getNewRecords().entrySet()) { - records.put(newRecordEntry.getKey(), newRecordEntry.getValue()); - } + localCompactor.getNewRecords().forEach((id, newRecord) -> { + records.put(id, newRecord); + }); // Restore compacted dataFiles for (int i = newDatafiles.size() - 1; i >= 0; i--) { @@ -1559,9 +1559,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // Replay pending commands (including updates, deletes and commits) - for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values()) { - newTransaction.replaceRecordProvider(this); - } + localCompactor.getNewTransactions().forEach((id, newTransaction) -> newTransaction.replaceRecordProvider(this)); localCompactor.replayPendingCommands(); @@ -1569,7 +1567,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // This has to be done after the replay pending commands, as we need to delete commits // that happened during the compacting - for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values()) { + localCompactor.getNewTransactions().forEach((id, newTransaction) -> { if (logger.isTraceEnabled()) { logger.trace("Merging pending transaction " + newTransaction + " after compacting the journal"); } @@ -1579,7 +1577,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } else { ActiveMQJournalLogger.LOGGER.compactMergeError(newTransaction.getId()); } - } + }); } finally { journalLock.writeLock().unlock(); } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java index 6c5107a752..c9c92f4407 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java @@ -16,7 +16,7 @@ */ package org.apache.activemq.artemis.core.journal.impl; -import java.util.Map; +import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; /** * This is an interface used only internally. @@ -29,5 +29,5 @@ public interface JournalRecordProvider { JournalCompactor getCompactor(); - Map getRecords(); + ConcurrentLongHashMap getRecords(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index 4bf2726adf..6f899f33cc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -371,7 +371,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon * @throws Exception */ private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception { - Long id = Long.valueOf(msg.getId()); + long id = msg.getId(); byte[] data = msg.getData(); SequentialFile channel1; switch (msg.getFileType()) { diff --git a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java index c08f1dddd6..7493949999 100644 --- a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java +++ b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.stress.journal; import java.io.File; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -238,12 +237,15 @@ public class JournalCleanupCompactStressTest extends ActiveMQTestBase { reloadJournal(); - Collection records = journal.getRecords().keySet(); - System.out.println("Deleting everything!"); - for (Long delInfo : records) { - journal.appendDeleteRecord(delInfo, false); - } + + journal.getRecords().forEach((id, record) -> { + try { + journal.appendDeleteRecord(id, false); + } catch (Exception e) { + new RuntimeException(e); + } + }); journal.forceMoveNextFile();