From c1d55aa84f2ca9946afc39038c14d729c8bae36a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Andr=C3=A9=20Pearce?= Date: Fri, 12 May 2017 07:14:47 +0100 Subject: [PATCH] ARTEMIS-1156: FIX: Long Autoboxing occurring on Hot Path Building on ARTEMIS-905 JCtools ConcurrentMap replacement first proposed but currently parked by @franz1981, replace the collections with primitive key concurrent collections to avoid auto boxing. The goal of this is to reduce/remove autoboxing on the hot path. We are just adding jctools to the broker (should not be in client dependencies) Like wise targeting specific use case with specific implementation rather than a blanket replace all. Using collections from Bookkeeper, reduces outside tlab allocation, on resizing compared to JCTools, which occurs frequently on testing. --- .../collections/ConcurrentLongHashMap.java | 504 ++++++++++++++++++ .../collections/ConcurrentLongHashSet.java | 423 +++++++++++++++ .../ConcurrentLongHashMapTest.java | 405 ++++++++++++++ .../ConcurrentLongHashSetTest.java | 249 +++++++++ .../impl/AbstractJournalUpdateTask.java | 9 +- .../core/journal/impl/FileWrapperJournal.java | 13 +- .../core/journal/impl/JournalCompactor.java | 20 +- .../core/journal/impl/JournalImpl.java | 36 +- .../journal/impl/JournalRecordProvider.java | 4 +- .../core/replication/ReplicationEndpoint.java | 2 +- .../JournalCleanupCompactStressTest.java | 14 +- 11 files changed, 1628 insertions(+), 51 deletions(-) create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMap.java create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSet.java create mode 100644 artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMapTest.java create mode 100644 artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSetTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMap.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMap.java new file mode 100644 index 0000000000..8af0660be6 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMap.java @@ -0,0 +1,504 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.activemq.artemis.utils.collections; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Arrays; +import java.util.List; + +import java.util.concurrent.locks.StampedLock; +import java.util.function.LongFunction; + +import com.google.common.collect.Lists; + +/** + * Map from long to an Object. + * + * Provides similar methods as a ConcurrentMap with 2 differences: + *
    + *
  1. No boxing/unboxing from long -> Long + *
  2. Open hash map with linear probing, no node allocations to store the values + *
+ * + * @param + */ +@SuppressWarnings("unchecked") +public class ConcurrentLongHashMap { + + private static final Object EmptyValue = null; + private static final Object DeletedValue = new Object(); + + private static final float MapFillFactor = 0.66f; + + private static final int DefaultExpectedItems = 256; + private static final int DefaultConcurrencyLevel = 16; + + private final Section[] sections; + + public ConcurrentLongHashMap() { + this(DefaultExpectedItems); + } + + public ConcurrentLongHashMap(int expectedItems) { + this(expectedItems, DefaultConcurrencyLevel); + } + + public ConcurrentLongHashMap(int expectedItems, int numSections) { + checkArgument(numSections > 0); + if (expectedItems < numSections) { + expectedItems = numSections; + } + + int perSectionExpectedItems = expectedItems / numSections; + int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor); + this.sections = (Section[]) new Section[numSections]; + + for (int i = 0; i < numSections; i++) { + sections[i] = new Section<>(perSectionCapacity); + } + } + + public int size() { + int size = 0; + for (Section s : sections) { + size += s.size; + } + return size; + } + + long getUsedBucketCount() { + long usedBucketCount = 0; + for (Section s : sections) { + usedBucketCount += s.usedBuckets; + } + return usedBucketCount; + } + + public long capacity() { + long capacity = 0; + for (Section s : sections) { + capacity += s.capacity; + } + return capacity; + } + + public boolean isEmpty() { + for (Section s : sections) { + if (s.size != 0) { + return false; + } + } + + return true; + } + + public V get(long key) { + long h = hash(key); + return getSection(h).get(key, (int) h); + } + + public boolean containsKey(long key) { + return get(key) != null; + } + + public V put(long key, V value) { + checkNotNull(value); + long h = hash(key); + return getSection(h).put(key, value, (int) h, false, null); + } + + public V putIfAbsent(long key, V value) { + checkNotNull(value); + long h = hash(key); + return getSection(h).put(key, value, (int) h, true, null); + } + + public V computeIfAbsent(long key, LongFunction provider) { + checkNotNull(provider); + long h = hash(key); + return getSection(h).put(key, null, (int) h, true, provider); + } + + public V remove(long key) { + long h = hash(key); + return getSection(h).remove(key, null, (int) h); + } + + public boolean remove(long key, Object value) { + checkNotNull(value); + long h = hash(key); + return getSection(h).remove(key, value, (int) h) != null; + } + + private Section getSection(long hash) { + // Use 32 msb out of long to get the section + final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1); + return sections[sectionIdx]; + } + + public void clear() { + for (Section s : sections) { + s.clear(); + } + } + + public void forEach(EntryProcessor processor) { + for (Section s : sections) { + s.forEach(processor); + } + } + + /** + * @return a new list of all keys (makes a copy) + */ + public List keys() { + List keys = Lists.newArrayListWithExpectedSize((int) size()); + forEach((key, value) -> keys.add(key)); + return keys; + } + + public ConcurrentLongHashSet keysLongHashSet() { + ConcurrentLongHashSet concurrentLongHashSet = new ConcurrentLongHashSet(size()); + forEach((key, value) -> concurrentLongHashSet.add(key)); + return concurrentLongHashSet; + } + + public List values() { + List values = Lists.newArrayListWithExpectedSize((int) size()); + forEach((key, value) -> values.add(value)); + return values; + } + + public interface EntryProcessor { + void accept(long key, V value); + } + + // A section is a portion of the hash map that is covered by a single + @SuppressWarnings("serial") + private static final class Section extends StampedLock { + private long[] keys; + private V[] values; + + private int capacity; + private volatile int size; + private int usedBuckets; + private int resizeThreshold; + + Section(int capacity) { + this.capacity = alignToPowerOfTwo(capacity); + this.keys = new long[this.capacity]; + this.values = (V[]) new Object[this.capacity]; + this.size = 0; + this.usedBuckets = 0; + this.resizeThreshold = (int) (this.capacity * MapFillFactor); + } + + @SuppressWarnings("NonAtomicVolatileUpdate") + V get(long key, int keyHash) { + int bucket = keyHash; + + long stamp = tryOptimisticRead(); + boolean acquiredLock = false; + + try { + while (true) { + int capacity = this.capacity; + bucket = signSafeMod(bucket, capacity); + + // First try optimistic locking + long storedKey = keys[bucket]; + V storedValue = values[bucket]; + + if (!acquiredLock && validate(stamp)) { + // The values we have read are consistent + if (storedKey == key) { + return storedValue != DeletedValue ? storedValue : null; + } else if (storedValue == EmptyValue) { + // Not found + return null; + } + } else { + // Fallback to acquiring read lock + if (!acquiredLock) { + stamp = readLock(); + acquiredLock = true; + storedKey = keys[bucket]; + storedValue = values[bucket]; + } + + if (capacity != this.capacity) { + // There has been a rehashing. We need to restart the search + bucket = keyHash; + continue; + } + + if (storedKey == key) { + return storedValue != DeletedValue ? storedValue : null; + } else if (storedValue == EmptyValue) { + // Not found + return null; + } + } + + ++bucket; + } + } finally { + if (acquiredLock) { + unlockRead(stamp); + } + } + } + + @SuppressWarnings("NonAtomicVolatileUpdate") + V put(long key, V value, int keyHash, boolean onlyIfAbsent, LongFunction valueProvider) { + int bucket = keyHash; + + long stamp = writeLock(); + int capacity = this.capacity; + + // Remember where we find the first available spot + int firstDeletedKey = -1; + + try { + while (true) { + bucket = signSafeMod(bucket, capacity); + + long storedKey = keys[bucket]; + V storedValue = values[bucket]; + + if (storedKey == key) { + if (storedValue == EmptyValue) { + values[bucket] = value != null ? value : valueProvider.apply(key); + ++size; + ++usedBuckets; + return valueProvider != null ? values[bucket] : null; + } else if (storedValue == DeletedValue) { + values[bucket] = value != null ? value : valueProvider.apply(key); + ++size; + return valueProvider != null ? values[bucket] : null; + } else if (!onlyIfAbsent) { + // Over written an old value for same key + values[bucket] = value; + return storedValue; + } else { + return storedValue; + } + } else if (storedValue == EmptyValue) { + // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted + // key, we should write at that position + if (firstDeletedKey != -1) { + bucket = firstDeletedKey; + } else { + ++usedBuckets; + } + + keys[bucket] = key; + values[bucket] = value != null ? value : valueProvider.apply(key); + ++size; + return valueProvider != null ? values[bucket] : null; + } else if (storedValue == DeletedValue) { + // The bucket contained a different deleted key + if (firstDeletedKey == -1) { + firstDeletedKey = bucket; + } + } + + ++bucket; + } + } finally { + if (usedBuckets > resizeThreshold) { + try { + rehash(); + } finally { + unlockWrite(stamp); + } + } else { + unlockWrite(stamp); + } + } + } + + @SuppressWarnings("NonAtomicVolatileUpdate") + private V remove(long key, Object value, int keyHash) { + int bucket = keyHash; + long stamp = writeLock(); + + try { + while (true) { + int capacity = this.capacity; + bucket = signSafeMod(bucket, capacity); + + long storedKey = keys[bucket]; + V storedValue = values[bucket]; + if (storedKey == key) { + if (value == null || value.equals(storedValue)) { + if (storedValue == EmptyValue || storedValue == DeletedValue) { + return null; + } + + --size; + V nextValueInArray = values[signSafeMod(bucket + 1, capacity)]; + if (nextValueInArray == EmptyValue) { + values[bucket] = (V) EmptyValue; + --usedBuckets; + } else { + values[bucket] = (V) DeletedValue; + } + + return storedValue; + } else { + return null; + } + } else if (storedValue == EmptyValue) { + // Key wasn't found + return null; + } + + ++bucket; + } + + } finally { + unlockWrite(stamp); + } + } + + void clear() { + long stamp = writeLock(); + + try { + Arrays.fill(keys, 0); + Arrays.fill(values, EmptyValue); + this.size = 0; + this.usedBuckets = 0; + } finally { + unlockWrite(stamp); + } + } + + public void forEach(EntryProcessor processor) { + long stamp = tryOptimisticRead(); + + int capacity = this.capacity; + long[] keys = this.keys; + V[] values = this.values; + + boolean acquiredReadLock = false; + + try { + + // Validate no rehashing + if (!validate(stamp)) { + // Fallback to read lock + stamp = readLock(); + acquiredReadLock = true; + + capacity = this.capacity; + keys = this.keys; + values = this.values; + } + + // Go through all the buckets for this section + for (int bucket = 0; bucket < capacity; bucket++) { + long storedKey = keys[bucket]; + V storedValue = values[bucket]; + + if (!acquiredReadLock && !validate(stamp)) { + // Fallback to acquiring read lock + stamp = readLock(); + acquiredReadLock = true; + + storedKey = keys[bucket]; + storedValue = values[bucket]; + } + + if (storedValue != DeletedValue && storedValue != EmptyValue) { + processor.accept(storedKey, storedValue); + } + } + } finally { + if (acquiredReadLock) { + unlockRead(stamp); + } + } + } + + private void rehash() { + // Expand the hashmap + int newCapacity = capacity * 2; + long[] newKeys = new long[newCapacity]; + V[] newValues = (V[]) new Object[newCapacity]; + + // Re-hash table + for (int i = 0; i < keys.length; i++) { + long storedKey = keys[i]; + V storedValue = values[i]; + if (storedValue != EmptyValue && storedValue != DeletedValue) { + insertKeyValueNoLock(newKeys, newValues, storedKey, storedValue); + } + } + + capacity = newCapacity; + keys = newKeys; + values = newValues; + usedBuckets = size; + resizeThreshold = (int) (capacity * MapFillFactor); + } + + private static void insertKeyValueNoLock(long[] keys, V[] values, long key, V value) { + int bucket = (int) hash(key); + + while (true) { + bucket = signSafeMod(bucket, keys.length); + + V storedValue = values[bucket]; + + if (storedValue == EmptyValue) { + // The bucket is empty, so we can use it + keys[bucket] = key; + values[bucket] = value; + return; + } + + ++bucket; + } + } + } + + private static final long HashMixer = 0xc6a4a7935bd1e995L; + private static final int R = 47; + + static long hash(long key) { + long hash = key * HashMixer; + hash ^= hash >>> R; + hash *= HashMixer; + return hash; + } + + static int signSafeMod(long n, int Max) { + return (int) n & (Max - 1); + } + + static int alignToPowerOfTwo(int n) { + return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1)); + } +} \ No newline at end of file diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSet.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSet.java new file mode 100644 index 0000000000..17d94b7c51 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSet.java @@ -0,0 +1,423 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.activemq.artemis.utils.collections; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.locks.StampedLock; + +/** + * Concurrent hash set for primitive longs + * + * Provides similar methods as a ConcurrentSet<Long> but since it's an open hash map with linear probing, no node + * allocations are required to store the values. + *

+ * Items MUST be >= 0. + */ +public class ConcurrentLongHashSet { + + private static final long EmptyItem = -1L; + private static final long DeletedItem = -2L; + + private static final float SetFillFactor = 0.66f; + + private static final int DefaultExpectedItems = 256; + private static final int DefaultConcurrencyLevel = 16; + + private final Section[] sections; + + public interface ConsumerLong { + void accept(long item); + } + + public ConcurrentLongHashSet() { + this(DefaultExpectedItems); + } + + public ConcurrentLongHashSet(int expectedItems) { + this(expectedItems, DefaultConcurrencyLevel); + } + + public ConcurrentLongHashSet(int expectedItems, final int numSections) { + checkArgument(numSections > 0); + if (expectedItems < numSections) { + expectedItems = numSections; + } + + int perSectionExpectedItems = expectedItems / numSections; + int perSectionCapacity = (int) (perSectionExpectedItems / SetFillFactor); + this.sections = new Section[numSections]; + + for (int i = 0; i < numSections; i++) { + sections[i] = new Section(perSectionCapacity); + } + } + + public int size() { + int size = 0; + for (Section s : sections) { + size += s.size; + } + return size; + } + + public long capacity() { + long capacity = 0; + for (Section s : sections) { + capacity += s.capacity; + } + return capacity; + } + + public boolean isEmpty() { + for (Section s : sections) { + if (s.size != 0) { + return false; + } + } + + return true; + } + + long getUsedBucketCount() { + long usedBucketCount = 0; + for (Section s : sections) { + usedBucketCount += s.usedBuckets; + } + return usedBucketCount; + } + + public boolean contains(long item) { + checkBiggerEqualZero(item); + long h = hash(item); + return getSection(h).contains(item, (int) h); + } + + public boolean add(long item) { + checkBiggerEqualZero(item); + long h = hash(item); + return getSection(h).add(item, (int) h); + } + + /** + * Remove an existing entry if found + * + * @param item + * @return true if removed or false if item was not present + */ + public boolean remove(long item) { + checkBiggerEqualZero(item); + long h = hash(item); + return getSection(h).remove(item, (int) h); + } + + private Section getSection(long hash) { + // Use 32 msb out of long to get the section + final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1); + return sections[sectionIdx]; + } + + public void clear() { + for (Section s : sections) { + s.clear(); + } + } + + public void forEach(ConsumerLong processor) { + for (Section s : sections) { + s.forEach(processor); + } + } + + /** + * @return a new list of all keys (makes a copy) + */ + public Set items() { + Set items = new HashSet<>(); + forEach(items::add); + return items; + } + + // A section is a portion of the hash map that is covered by a single + @SuppressWarnings("serial") + private static final class Section extends StampedLock { + // Keys and values are stored interleaved in the table array + private long[] table; + + private int capacity; + private volatile int size; + private int usedBuckets; + private int resizeThreshold; + + Section(int capacity) { + this.capacity = alignToPowerOfTwo(capacity); + this.table = new long[this.capacity]; + this.size = 0; + this.usedBuckets = 0; + this.resizeThreshold = (int) (this.capacity * SetFillFactor); + Arrays.fill(table, EmptyItem); + } + + boolean contains(long item, int hash) { + long stamp = tryOptimisticRead(); + boolean acquiredLock = false; + int bucket = signSafeMod(hash, capacity); + + try { + while (true) { + // First try optimistic locking + long storedItem = table[bucket]; + + if (!acquiredLock && validate(stamp)) { + // The values we have read are consistent + if (item == storedItem) { + return true; + } else if (storedItem == EmptyItem) { + // Not found + return false; + } + } else { + // Fallback to acquiring read lock + if (!acquiredLock) { + stamp = readLock(); + acquiredLock = true; + + bucket = signSafeMod(hash, capacity); + storedItem = table[bucket]; + } + + if (item == storedItem) { + return true; + } else if (storedItem == EmptyItem) { + // Not found + return false; + } + } + + bucket = (bucket + 1) & (table.length - 1); + } + } finally { + if (acquiredLock) { + unlockRead(stamp); + } + } + } + + @SuppressWarnings("NonAtomicVolatileUpdate") + boolean add(long item, long hash) { + long stamp = writeLock(); + int bucket = signSafeMod(hash, capacity); + + // Remember where we find the first available spot + int firstDeletedItem = -1; + + try { + while (true) { + long storedItem = table[bucket]; + + if (item == storedItem) { + // Item was already in set + return false; + } else if (storedItem == EmptyItem) { + // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted + // key, we should write at that position + if (firstDeletedItem != -1) { + bucket = firstDeletedItem; + } else { + ++usedBuckets; + } + + table[bucket] = item; + ++size; + return true; + } else if (storedItem == DeletedItem) { + // The bucket contained a different deleted key + if (firstDeletedItem == -1) { + firstDeletedItem = bucket; + } + } + + bucket = (bucket + 1) & (table.length - 1); + } + } finally { + if (usedBuckets > resizeThreshold) { + try { + rehash(); + } finally { + unlockWrite(stamp); + } + } else { + unlockWrite(stamp); + } + } + } + + @SuppressWarnings("NonAtomicVolatileUpdate") + private boolean remove(long item, int hash) { + long stamp = writeLock(); + int bucket = signSafeMod(hash, capacity); + + try { + while (true) { + long storedItem = table[bucket]; + if (item == storedItem) { + --size; + + cleanBucket(bucket); + return true; + + } else if (storedItem == EmptyItem) { + // Key wasn't found + return false; + } + + bucket = (bucket + 1) & (table.length - 1); + } + } finally { + unlockWrite(stamp); + } + } + + private void cleanBucket(int bucket) { + int nextInArray = (bucket + 1) & (table.length - 1); + if (table[nextInArray] == EmptyItem) { + table[bucket] = EmptyItem; + --usedBuckets; + } else { + table[bucket] = DeletedItem; + } + } + + void clear() { + long stamp = writeLock(); + + try { + Arrays.fill(table, EmptyItem); + this.size = 0; + this.usedBuckets = 0; + } finally { + unlockWrite(stamp); + } + } + + public void forEach(ConsumerLong processor) { + long stamp = tryOptimisticRead(); + + long[] table = this.table; + boolean acquiredReadLock = false; + + try { + + // Validate no rehashing + if (!validate(stamp)) { + // Fallback to read lock + stamp = readLock(); + acquiredReadLock = true; + table = this.table; + } + + // Go through all the buckets for this section + for (int bucket = 0; bucket < table.length; bucket++) { + long storedItem = table[bucket]; + + if (!acquiredReadLock && !validate(stamp)) { + // Fallback to acquiring read lock + stamp = readLock(); + acquiredReadLock = true; + + storedItem = table[bucket]; + } + + if (storedItem != DeletedItem && storedItem != EmptyItem) { + processor.accept(storedItem); + } + } + } finally { + if (acquiredReadLock) { + unlockRead(stamp); + } + } + } + + private void rehash() { + // Expand the hashmap + int newCapacity = capacity * 2; + long[] newTable = new long[newCapacity]; + Arrays.fill(newTable, EmptyItem); + + // Re-hash table + for (int i = 0; i < table.length; i++) { + long storedItem = table[i]; + if (storedItem != EmptyItem && storedItem != DeletedItem) { + insertKeyValueNoLock(newTable, newCapacity, storedItem); + } + } + + capacity = newCapacity; + table = newTable; + usedBuckets = size; + resizeThreshold = (int) (capacity * SetFillFactor); + } + + private static void insertKeyValueNoLock(long[] table, int capacity, long item) { + int bucket = signSafeMod(hash(item), capacity); + + while (true) { + long storedKey = table[bucket]; + + if (storedKey == EmptyItem) { + // The bucket is empty, so we can use it + table[bucket] = item; + return; + } + + bucket = (bucket + 1) & (table.length - 1); + } + } + } + + private static final long HashMixer = 0xc6a4a7935bd1e995L; + private static final int R = 47; + + static long hash(long key) { + long hash = key * HashMixer; + hash ^= hash >>> R; + hash *= HashMixer; + return hash; + } + + static int signSafeMod(long n, int Max) { + return (int) (n & (Max - 1)); + } + + static int alignToPowerOfTwo(int n) { + return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1)); + } + + static void checkBiggerEqualZero(long n) { + if (n < 0L) { + throw new IllegalArgumentException("Keys and values must be >= 0"); + } + } +} \ No newline at end of file diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMapTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMapTest.java new file mode 100644 index 0000000000..b08251fe44 --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashMapTest.java @@ -0,0 +1,405 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.LongFunction; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNull; + +public class ConcurrentLongHashMapTest { + + @Test + public void simpleInsertions() { + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(16); + + assertTrue(map.isEmpty()); + assertNull(map.put(1, "one")); + assertFalse(map.isEmpty()); + + assertNull(map.put(2, "two")); + assertNull(map.put(3, "three")); + + assertEquals(map.size(), 3); + + assertEquals(map.get(1), "one"); + assertEquals(map.size(), 3); + + assertEquals(map.remove(1), "one"); + assertEquals(map.size(), 2); + assertEquals(map.get(1), null); + assertEquals(map.get(5), null); + assertEquals(map.size(), 2); + + assertNull(map.put(1, "one")); + assertEquals(map.size(), 3); + assertEquals(map.put(1, "uno"), "one"); + assertEquals(map.size(), 3); + } + + @Test + public void testRemove() { + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(); + + assertTrue(map.isEmpty()); + assertNull(map.put(1, "one")); + assertFalse(map.isEmpty()); + + assertFalse(map.remove(0, "zero")); + assertFalse(map.remove(1, "uno")); + + assertFalse(map.isEmpty()); + assertTrue(map.remove(1, "one")); + assertTrue(map.isEmpty()); + } + + @Test + public void testNegativeUsedBucketCount() { + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(16, 1); + + map.put(0, "zero"); + assertEquals(1, map.getUsedBucketCount()); + map.put(0, "zero1"); + assertEquals(1, map.getUsedBucketCount()); + map.remove(0); + assertEquals(0, map.getUsedBucketCount()); + map.remove(0); + assertEquals(0, map.getUsedBucketCount()); + } + + @Test + public void testRehashing() { + int n = 16; + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(n / 2, 1); + assertEquals(map.capacity(), n); + assertEquals(map.size(), 0); + + for (int i = 0; i < n; i++) { + map.put(i, i); + } + + assertEquals(map.capacity(), 2 * n); + assertEquals(map.size(), n); + } + + @Test + public void testRehashingWithDeletes() { + int n = 16; + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(n / 2, 1); + assertEquals(map.capacity(), n); + assertEquals(map.size(), 0); + + for (int i = 0; i < n / 2; i++) { + map.put(i, i); + } + + for (int i = 0; i < n / 2; i++) { + map.remove(i); + } + + for (int i = n; i < (2 * n); i++) { + map.put(i, i); + } + + assertEquals(map.capacity(), 2 * n); + assertEquals(map.size(), n); + } + + @Test + public void concurrentInsertions() throws Throwable { + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(); + ExecutorService executor = Executors.newCachedThreadPool(); + + final int nThreads = 16; + final int N = 100_000; + String value = "value"; + + List> futures = new ArrayList<>(); + for (int i = 0; i < nThreads; i++) { + final int threadIdx = i; + + futures.add(executor.submit(() -> { + Random random = new Random(); + + for (int j = 0; j < N; j++) { + long key = random.nextLong(); + // Ensure keys are uniques + key -= key % (threadIdx + 1); + + map.put(key, value); + } + })); + } + + for (Future future : futures) { + future.get(); + } + + assertEquals(map.size(), N * nThreads); + + executor.shutdown(); + } + + @Test + public void concurrentInsertionsAndReads() throws Throwable { + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(); + ExecutorService executor = Executors.newCachedThreadPool(); + + final int nThreads = 16; + final int N = 100_000; + String value = "value"; + + List> futures = new ArrayList<>(); + for (int i = 0; i < nThreads; i++) { + final int threadIdx = i; + + futures.add(executor.submit(() -> { + Random random = new Random(); + + for (int j = 0; j < N; j++) { + long key = random.nextLong(); + // Ensure keys are uniques + key -= key % (threadIdx + 1); + + map.put(key, value); + } + })); + } + + for (Future future : futures) { + future.get(); + } + + assertEquals(map.size(), N * nThreads); + + executor.shutdown(); + } + + @Test + public void testIteration() { + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(); + + assertEquals(map.keys(), Collections.emptyList()); + assertEquals(map.values(), Collections.emptyList()); + + map.put(0, "zero"); + + assertEquals(map.keys(), Lists.newArrayList(0L)); + assertEquals(map.values(), Lists.newArrayList("zero")); + + map.remove(0); + + assertEquals(map.keys(), Collections.emptyList()); + assertEquals(map.values(), Collections.emptyList()); + + map.put(0, "zero"); + map.put(1, "one"); + map.put(2, "two"); + + List keys = map.keys(); + Collections.sort(keys); + assertEquals(keys, Lists.newArrayList(0L, 1L, 2L)); + + List values = map.values(); + Collections.sort(values); + assertEquals(values, Lists.newArrayList("one", "two", "zero")); + + map.put(1, "uno"); + + keys = map.keys(); + Collections.sort(keys); + assertEquals(keys, Lists.newArrayList(0L, 1L, 2L)); + + values = map.values(); + Collections.sort(values); + assertEquals(values, Lists.newArrayList("two", "uno", "zero")); + + map.clear(); + assertTrue(map.isEmpty()); + } + + @Test + public void testHashConflictWithDeletion() { + final int Buckets = 16; + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(Buckets, 1); + + // Pick 2 keys that fall into the same bucket + long key1 = 1; + long key2 = 27; + + int bucket1 = ConcurrentLongHashMap.signSafeMod(ConcurrentLongHashMap.hash(key1), Buckets); + int bucket2 = ConcurrentLongHashMap.signSafeMod(ConcurrentLongHashMap.hash(key2), Buckets); + assertEquals(bucket1, bucket2); + + assertEquals(map.put(key1, "value-1"), null); + assertEquals(map.put(key2, "value-2"), null); + assertEquals(map.size(), 2); + + assertEquals(map.remove(key1), "value-1"); + assertEquals(map.size(), 1); + + assertEquals(map.put(key1, "value-1-overwrite"), null); + assertEquals(map.size(), 2); + + assertEquals(map.remove(key1), "value-1-overwrite"); + assertEquals(map.size(), 1); + + assertEquals(map.put(key2, "value-2-overwrite"), "value-2"); + assertEquals(map.get(key2), "value-2-overwrite"); + + assertEquals(map.size(), 1); + assertEquals(map.remove(key2), "value-2-overwrite"); + assertTrue(map.isEmpty()); + } + + @Test + public void testPutIfAbsent() { + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(); + assertEquals(map.putIfAbsent(1, "one"), null); + assertEquals(map.get(1), "one"); + + assertEquals(map.putIfAbsent(1, "uno"), "one"); + assertEquals(map.get(1), "one"); + } + + @Test + public void testComputeIfAbsent() { + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(16, 1); + AtomicInteger counter = new AtomicInteger(); + LongFunction provider = key -> counter.getAndIncrement(); + + assertEquals(map.computeIfAbsent(0, provider).intValue(), 0); + assertEquals(map.get(0).intValue(), 0); + + assertEquals(map.computeIfAbsent(1, provider).intValue(), 1); + assertEquals(map.get(1).intValue(), 1); + + assertEquals(map.computeIfAbsent(1, provider).intValue(), 1); + assertEquals(map.get(1).intValue(), 1); + + assertEquals(map.computeIfAbsent(2, provider).intValue(), 2); + assertEquals(map.get(2).intValue(), 2); + } + + int Iterations = 1; + int ReadIterations = 100; + int N = 1_000_000; + + public void benchConcurrentLongHashMap() throws Exception { + // public static void main(String args[]) { + ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(N, 1); + + for (long i = 0; i < Iterations; i++) { + for (int j = 0; j < N; j++) { + map.put(i, "value"); + } + + for (long h = 0; h < ReadIterations; h++) { + for (int j = 0; j < N; j++) { + map.get(i); + } + } + + for (int j = 0; j < N; j++) { + map.remove(i); + } + } + } + + public void benchConcurrentHashMap() throws Exception { + ConcurrentHashMap map = new ConcurrentHashMap<>(N, 0.66f, 1); + + for (long i = 0; i < Iterations; i++) { + for (int j = 0; j < N; j++) { + map.put(i, "value"); + } + + for (long h = 0; h < ReadIterations; h++) { + for (int j = 0; j < N; j++) { + map.get(i); + } + } + + for (int j = 0; j < N; j++) { + map.remove(i); + } + } + } + + void benchHashMap() throws Exception { + HashMap map = new HashMap<>(N, 0.66f); + + for (long i = 0; i < Iterations; i++) { + for (int j = 0; j < N; j++) { + map.put(i, "value"); + } + + for (long h = 0; h < ReadIterations; h++) { + for (int j = 0; j < N; j++) { + map.get(i); + } + } + + for (int j = 0; j < N; j++) { + map.remove(i); + } + } + } + + public static void main(String[] args) throws Exception { + ConcurrentLongHashMapTest t = new ConcurrentLongHashMapTest(); + + long start = System.nanoTime(); + // t.benchHashMap(); + long end = System.nanoTime(); + + System.out.println("HM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms"); + + start = System.nanoTime(); + t.benchConcurrentHashMap(); + end = System.nanoTime(); + + System.out.println("CHM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms"); + + start = System.nanoTime(); + // t.benchConcurrentLongHashMap(); + end = System.nanoTime(); + + System.out.println("CLHM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms"); + + } +} \ No newline at end of file diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSetTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSetTest.java new file mode 100644 index 0000000000..24337f10bd --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentLongHashSetTest.java @@ -0,0 +1,249 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ConcurrentLongHashSetTest { + + @Test + public void simpleInsertions() { + ConcurrentLongHashSet set = new ConcurrentLongHashSet(16); + + assertTrue(set.isEmpty()); + assertTrue(set.add(1)); + assertFalse(set.isEmpty()); + + assertTrue(set.add(2)); + assertTrue(set.add(3)); + + assertEquals(set.size(), 3); + + assertTrue(set.contains(1)); + assertEquals(set.size(), 3); + + assertTrue(set.remove(1)); + assertEquals(set.size(), 2); + assertFalse(set.contains(1)); + assertFalse(set.contains(5)); + assertEquals(set.size(), 2); + + assertTrue(set.add(1)); + assertEquals(set.size(), 3); + assertFalse(set.add(1)); + assertEquals(set.size(), 3); + } + + @Test + public void testRemove() { + ConcurrentLongHashSet set = new ConcurrentLongHashSet(); + + assertTrue(set.isEmpty()); + assertTrue(set.add(1)); + assertFalse(set.isEmpty()); + + assertFalse(set.remove(0)); + assertFalse(set.isEmpty()); + assertTrue(set.remove(1)); + assertTrue(set.isEmpty()); + } + + @Test + public void testRehashing() { + int n = 16; + ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1); + assertEquals(set.capacity(), n); + assertEquals(set.size(), 0); + + for (int i = 0; i < n; i++) { + set.add(i); + } + + assertEquals(set.capacity(), 2 * n); + assertEquals(set.size(), n); + } + + @Test + public void testRehashingWithDeletes() { + int n = 16; + ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1); + assertEquals(set.capacity(), n); + assertEquals(set.size(), 0); + + for (int i = 0; i < n / 2; i++) { + set.add(i); + } + + for (int i = 0; i < n / 2; i++) { + set.remove(i); + } + + for (int i = n; i < (2 * n); i++) { + set.add(i); + } + + assertEquals(set.capacity(), 2 * n); + assertEquals(set.size(), n); + } + + @Test + public void concurrentInsertions() throws Throwable { + ConcurrentLongHashSet set = new ConcurrentLongHashSet(); + ExecutorService executor = Executors.newCachedThreadPool(); + + final int nThreads = 16; + final int N = 100_000; + + List> futures = new ArrayList<>(); + for (int i = 0; i < nThreads; i++) { + final int threadIdx = i; + + futures.add(executor.submit(() -> { + Random random = new Random(); + + for (int j = 0; j < N; j++) { + long key = Math.abs(random.nextLong()); + // Ensure keys are unique + key -= key % (threadIdx + 1); + + set.add(key); + } + })); + } + + for (Future future : futures) { + future.get(); + } + + assertEquals(set.size(), N * nThreads); + + executor.shutdown(); + } + + @Test + public void concurrentInsertionsAndReads() throws Throwable { + ConcurrentLongHashSet map = new ConcurrentLongHashSet(); + ExecutorService executor = Executors.newCachedThreadPool(); + + final int nThreads = 16; + final int N = 100_000; + + List> futures = new ArrayList<>(); + for (int i = 0; i < nThreads; i++) { + final int threadIdx = i; + + futures.add(executor.submit(() -> { + Random random = new Random(); + + for (int j = 0; j < N; j++) { + long key = Math.abs(random.nextLong()); + // Ensure keys are unique + key -= key % (threadIdx + 1); + + map.add(key); + } + })); + } + + for (Future future : futures) { + future.get(); + } + + assertEquals(map.size(), N * nThreads); + + executor.shutdown(); + } + + @Test + public void testIteration() { + ConcurrentLongHashSet set = new ConcurrentLongHashSet(); + + assertEquals(set.items(), Collections.emptySet()); + + set.add(0L); + + assertEquals(set.items(), Sets.newHashSet(0L)); + + set.remove(0L); + + assertEquals(set.items(), Collections.emptySet()); + + set.add(0L); + set.add(1L); + set.add(2L); + + List values = Lists.newArrayList(set.items()); + Collections.sort(values); + assertEquals(values, Lists.newArrayList(0L, 1L, 2L)); + + set.clear(); + assertTrue(set.isEmpty()); + } + + @Test + public void testHashConflictWithDeletion() { + final int Buckets = 16; + ConcurrentLongHashSet set = new ConcurrentLongHashSet(Buckets, 1); + + // Pick 2 keys that fall into the same bucket + long key1 = 1; + long key2 = 27; + + int bucket1 = ConcurrentLongHashSet.signSafeMod(ConcurrentLongHashSet.hash(key1), Buckets); + int bucket2 = ConcurrentLongHashSet.signSafeMod(ConcurrentLongHashSet.hash(key2), Buckets); + assertEquals(bucket1, bucket2); + + assertTrue(set.add(key1)); + assertTrue(set.add(key2)); + assertEquals(set.size(), 2); + + assertTrue(set.remove(key1)); + assertEquals(set.size(), 1); + + assertTrue(set.add(key1)); + assertEquals(set.size(), 2); + + assertTrue(set.remove(key1)); + assertEquals(set.size(), 1); + + assertFalse(set.add(key2)); + assertTrue(set.contains(key2)); + + assertEquals(set.size(), 1); + assertTrue(set.remove(key2)); + assertTrue(set.isEmpty()); + } + +} \ No newline at end of file diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java index 943077c59f..8de3da6400 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java @@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.journal.impl; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.Set; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; @@ -30,7 +29,7 @@ import org.apache.activemq.artemis.core.journal.EncoderPersister; import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord; -import org.apache.activemq.artemis.utils.ConcurrentHashSet; +import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet; /** * Super class for Journal maintenances such as clean up and Compactor @@ -56,7 +55,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback private ActiveMQBuffer writingChannel; - private final Set recordsSnapshot = new ConcurrentHashSet<>(); + private final ConcurrentLongHashSet recordsSnapshot; protected final List newDataFiles = new ArrayList<>(); @@ -67,14 +66,14 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory, final JournalImpl journal, final JournalFilesRepository filesRepository, - final Set recordsSnapshot, + final ConcurrentLongHashSet recordsSnapshot, final long nextOrderingID) { super(); this.journal = journal; this.filesRepository = filesRepository; this.fileFactory = fileFactory; this.nextOrderingID = nextOrderingID; - this.recordsSnapshot.addAll(recordsSnapshot); + this.recordsSnapshot = recordsSnapshot; } // Public -------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java index 8e5ca2c8f8..5ef240ae2b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java @@ -18,8 +18,6 @@ package org.apache.activemq.artemis.core.journal.impl; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -43,6 +41,7 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRec import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRecordTX; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX; +import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; /** * Journal used at a replicating backup server during the synchronization of data with the 'live' @@ -54,7 +53,7 @@ public final class FileWrapperJournal extends JournalBase { private final ReentrantLock lockAppend = new ReentrantLock(); - private final ConcurrentMap transactions = new ConcurrentHashMap<>(); + private final ConcurrentLongHashMap transactions = new ConcurrentLongHashMap<>(); private final JournalImpl journal; protected volatile JournalFile currentFile; @@ -181,7 +180,7 @@ public final class FileWrapperJournal extends JournalBase { IOCompletion callback, boolean lineUpContext) throws Exception { JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null); - AtomicInteger value = transactions.remove(Long.valueOf(txID)); + AtomicInteger value = transactions.remove(txID); if (value != null) { commitRecord.setNumberOfRecords(value.get()); } @@ -195,7 +194,7 @@ public final class FileWrapperJournal extends JournalBase { boolean sync, IOCompletion callback) throws Exception { JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData); - AtomicInteger value = transactions.get(Long.valueOf(txID)); + AtomicInteger value = transactions.get(txID); if (value != null) { prepareRecord.setNumberOfRecords(value.get()); } @@ -204,7 +203,7 @@ public final class FileWrapperJournal extends JournalBase { private int count(long txID) throws ActiveMQException { AtomicInteger defaultValue = new AtomicInteger(1); - AtomicInteger count = transactions.putIfAbsent(Long.valueOf(txID), defaultValue); + AtomicInteger count = transactions.putIfAbsent(txID, defaultValue); if (count != null) { return count.incrementAndGet(); } @@ -219,7 +218,7 @@ public final class FileWrapperJournal extends JournalBase { @Override public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception { JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID); - AtomicInteger value = transactions.remove(Long.valueOf(txID)); + AtomicInteger value = transactions.remove(txID); if (value != null) { rollbackRecord.setNumberOfRecords(value.get()); } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java index 8b89c3ed96..e3e1e7b469 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java @@ -18,12 +18,8 @@ package org.apache.activemq.artemis.core.journal.impl; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; @@ -41,6 +37,8 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRec import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; +import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet; import org.jboss.logging.Logger; public class JournalCompactor extends AbstractJournalUpdateTask implements JournalRecordProvider { @@ -53,11 +51,11 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ private static final short COMPACT_SPLIT_LINE = 2; // Snapshot of transactions that were pending when the compactor started - private final Map pendingTransactions = new ConcurrentHashMap<>(); + private final ConcurrentLongHashMap pendingTransactions = new ConcurrentLongHashMap<>(); - private final Map newRecords = new HashMap<>(); + private final ConcurrentLongHashMap newRecords = new ConcurrentLongHashMap<>(); - private final Map newTransactions = new HashMap<>(); + private final ConcurrentLongHashMap newTransactions = new ConcurrentLongHashMap<>(); /** * Commands that happened during compacting @@ -120,18 +118,18 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ return newDataFiles; } - public Map getNewRecords() { + public ConcurrentLongHashMap getNewRecords() { return newRecords; } - public Map getNewTransactions() { + public ConcurrentLongHashMap getNewTransactions() { return newTransactions; } public JournalCompactor(final SequentialFileFactory fileFactory, final JournalImpl journal, final JournalFilesRepository filesRepository, - final Set recordsSnapshot, + final ConcurrentLongHashSet recordsSnapshot, final long firstFileID) { super(fileFactory, journal, filesRepository, recordsSnapshot, firstFileID); } @@ -628,7 +626,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ } @Override - public Map getRecords() { + public ConcurrentLongHashMap getRecords() { return newRecords; } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 81ae9c0fee..1758999b99 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -31,8 +31,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; @@ -74,6 +72,8 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalBundle; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ConcurrentHashSet; +import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; +import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet; import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory; @@ -168,12 +168,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal private final JournalFilesRepository filesRepository; // Compacting may replace this structure - private final ConcurrentMap records = new ConcurrentHashMap<>(); + private final ConcurrentLongHashMap records = new ConcurrentLongHashMap<>(); - private final Set pendingRecords = new ConcurrentHashSet<>(); + private final ConcurrentLongHashSet pendingRecords = new ConcurrentLongHashSet(); // Compacting may replace this structure - private final ConcurrentMap transactions = new ConcurrentHashMap<>(); + private final ConcurrentLongHashMap transactions = new ConcurrentLongHashMap<>(); // This will be set only while the JournalCompactor is being executed private volatile JournalCompactor compactor; @@ -345,7 +345,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } @Override - public Map getRecords() { + public ConcurrentLongHashMap getRecords() { return records; } @@ -1487,12 +1487,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal return; } - compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keySet(), dataFilesToProcess.get(0).getFileID()); + compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keysLongHashSet(), dataFilesToProcess.get(0).getFileID()); - for (Map.Entry entry : transactions.entrySet()) { - compactor.addPendingTransaction(entry.getKey(), entry.getValue().getPositiveArray()); - entry.getValue().setCompacting(); - } + transactions.forEach((id, pendingTransaction) -> { + compactor.addPendingTransaction(id, pendingTransaction.getPositiveArray()); + pendingTransaction.setCompacting(); + }); // We will calculate the new records during compacting, what will take the position the records will take // after compacting @@ -1540,9 +1540,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal newDatafiles = localCompactor.getNewDataFiles(); // Restore newRecords created during compacting - for (Map.Entry newRecordEntry : localCompactor.getNewRecords().entrySet()) { - records.put(newRecordEntry.getKey(), newRecordEntry.getValue()); - } + localCompactor.getNewRecords().forEach((id, newRecord) -> { + records.put(id, newRecord); + }); // Restore compacted dataFiles for (int i = newDatafiles.size() - 1; i >= 0; i--) { @@ -1559,9 +1559,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // Replay pending commands (including updates, deletes and commits) - for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values()) { - newTransaction.replaceRecordProvider(this); - } + localCompactor.getNewTransactions().forEach((id, newTransaction) -> newTransaction.replaceRecordProvider(this)); localCompactor.replayPendingCommands(); @@ -1569,7 +1567,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // This has to be done after the replay pending commands, as we need to delete commits // that happened during the compacting - for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values()) { + localCompactor.getNewTransactions().forEach((id, newTransaction) -> { if (logger.isTraceEnabled()) { logger.trace("Merging pending transaction " + newTransaction + " after compacting the journal"); } @@ -1579,7 +1577,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } else { ActiveMQJournalLogger.LOGGER.compactMergeError(newTransaction.getId()); } - } + }); } finally { journalLock.writeLock().unlock(); } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java index 6c5107a752..c9c92f4407 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java @@ -16,7 +16,7 @@ */ package org.apache.activemq.artemis.core.journal.impl; -import java.util.Map; +import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; /** * This is an interface used only internally. @@ -29,5 +29,5 @@ public interface JournalRecordProvider { JournalCompactor getCompactor(); - Map getRecords(); + ConcurrentLongHashMap getRecords(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index 4bf2726adf..6f899f33cc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -371,7 +371,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon * @throws Exception */ private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception { - Long id = Long.valueOf(msg.getId()); + long id = msg.getId(); byte[] data = msg.getData(); SequentialFile channel1; switch (msg.getFileType()) { diff --git a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java index c08f1dddd6..7493949999 100644 --- a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java +++ b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/journal/JournalCleanupCompactStressTest.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.stress.journal; import java.io.File; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -238,12 +237,15 @@ public class JournalCleanupCompactStressTest extends ActiveMQTestBase { reloadJournal(); - Collection records = journal.getRecords().keySet(); - System.out.println("Deleting everything!"); - for (Long delInfo : records) { - journal.appendDeleteRecord(delInfo, false); - } + + journal.getRecords().forEach((id, record) -> { + try { + journal.appendDeleteRecord(id, false); + } catch (Exception e) { + new RuntimeException(e); + } + }); journal.forceMoveNextFile();