ARTEMIS-1156: FIX: Long Autoboxing occurring on Hot Path

Building on ARTEMIS-905 JCtools ConcurrentMap replacement  first proposed but currently parked by @franz1981, replace the collections with primitive key concurrent collections to avoid auto boxing.

The goal of this is to reduce/remove autoboxing on the hot path.
We are just adding jctools to the broker (should not be in client dependencies)
Like wise targeting specific use case with specific implementation rather than a blanket replace all.

Using collections from Bookkeeper, reduces outside tlab allocation, on resizing compared to JCTools, which occurs frequently on testing.
This commit is contained in:
Michael André Pearce 2017-05-12 07:14:47 +01:00 committed by Clebert Suconic
parent b7b79e5dfd
commit c1d55aa84f
11 changed files with 1628 additions and 51 deletions

View File

@ -0,0 +1,504 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.activemq.artemis.utils.collections;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.StampedLock;
import java.util.function.LongFunction;
import com.google.common.collect.Lists;
/**
* Map from long to an Object.
*
* Provides similar methods as a ConcurrentMap<long,Object> with 2 differences:
* <ol>
* <li>No boxing/unboxing from long -> Long
* <li>Open hash map with linear probing, no node allocations to store the values
* </ol>
*
* @param <V>
*/
@SuppressWarnings("unchecked")
public class ConcurrentLongHashMap<V> {
private static final Object EmptyValue = null;
private static final Object DeletedValue = new Object();
private static final float MapFillFactor = 0.66f;
private static final int DefaultExpectedItems = 256;
private static final int DefaultConcurrencyLevel = 16;
private final Section<V>[] sections;
public ConcurrentLongHashMap() {
this(DefaultExpectedItems);
}
public ConcurrentLongHashMap(int expectedItems) {
this(expectedItems, DefaultConcurrencyLevel);
}
public ConcurrentLongHashMap(int expectedItems, int numSections) {
checkArgument(numSections > 0);
if (expectedItems < numSections) {
expectedItems = numSections;
}
int perSectionExpectedItems = expectedItems / numSections;
int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
this.sections = (Section<V>[]) new Section[numSections];
for (int i = 0; i < numSections; i++) {
sections[i] = new Section<>(perSectionCapacity);
}
}
public int size() {
int size = 0;
for (Section<V> s : sections) {
size += s.size;
}
return size;
}
long getUsedBucketCount() {
long usedBucketCount = 0;
for (Section<V> s : sections) {
usedBucketCount += s.usedBuckets;
}
return usedBucketCount;
}
public long capacity() {
long capacity = 0;
for (Section<V> s : sections) {
capacity += s.capacity;
}
return capacity;
}
public boolean isEmpty() {
for (Section<V> s : sections) {
if (s.size != 0) {
return false;
}
}
return true;
}
public V get(long key) {
long h = hash(key);
return getSection(h).get(key, (int) h);
}
public boolean containsKey(long key) {
return get(key) != null;
}
public V put(long key, V value) {
checkNotNull(value);
long h = hash(key);
return getSection(h).put(key, value, (int) h, false, null);
}
public V putIfAbsent(long key, V value) {
checkNotNull(value);
long h = hash(key);
return getSection(h).put(key, value, (int) h, true, null);
}
public V computeIfAbsent(long key, LongFunction<V> provider) {
checkNotNull(provider);
long h = hash(key);
return getSection(h).put(key, null, (int) h, true, provider);
}
public V remove(long key) {
long h = hash(key);
return getSection(h).remove(key, null, (int) h);
}
public boolean remove(long key, Object value) {
checkNotNull(value);
long h = hash(key);
return getSection(h).remove(key, value, (int) h) != null;
}
private Section<V> getSection(long hash) {
// Use 32 msb out of long to get the section
final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
return sections[sectionIdx];
}
public void clear() {
for (Section<V> s : sections) {
s.clear();
}
}
public void forEach(EntryProcessor<V> processor) {
for (Section<V> s : sections) {
s.forEach(processor);
}
}
/**
* @return a new list of all keys (makes a copy)
*/
public List<Long> keys() {
List<Long> keys = Lists.newArrayListWithExpectedSize((int) size());
forEach((key, value) -> keys.add(key));
return keys;
}
public ConcurrentLongHashSet keysLongHashSet() {
ConcurrentLongHashSet concurrentLongHashSet = new ConcurrentLongHashSet(size());
forEach((key, value) -> concurrentLongHashSet.add(key));
return concurrentLongHashSet;
}
public List<V> values() {
List<V> values = Lists.newArrayListWithExpectedSize((int) size());
forEach((key, value) -> values.add(value));
return values;
}
public interface EntryProcessor<V> {
void accept(long key, V value);
}
// A section is a portion of the hash map that is covered by a single
@SuppressWarnings("serial")
private static final class Section<V> extends StampedLock {
private long[] keys;
private V[] values;
private int capacity;
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
Section(int capacity) {
this.capacity = alignToPowerOfTwo(capacity);
this.keys = new long[this.capacity];
this.values = (V[]) new Object[this.capacity];
this.size = 0;
this.usedBuckets = 0;
this.resizeThreshold = (int) (this.capacity * MapFillFactor);
}
@SuppressWarnings("NonAtomicVolatileUpdate")
V get(long key, int keyHash) {
int bucket = keyHash;
long stamp = tryOptimisticRead();
boolean acquiredLock = false;
try {
while (true) {
int capacity = this.capacity;
bucket = signSafeMod(bucket, capacity);
// First try optimistic locking
long storedKey = keys[bucket];
V storedValue = values[bucket];
if (!acquiredLock && validate(stamp)) {
// The values we have read are consistent
if (storedKey == key) {
return storedValue != DeletedValue ? storedValue : null;
} else if (storedValue == EmptyValue) {
// Not found
return null;
}
} else {
// Fallback to acquiring read lock
if (!acquiredLock) {
stamp = readLock();
acquiredLock = true;
storedKey = keys[bucket];
storedValue = values[bucket];
}
if (capacity != this.capacity) {
// There has been a rehashing. We need to restart the search
bucket = keyHash;
continue;
}
if (storedKey == key) {
return storedValue != DeletedValue ? storedValue : null;
} else if (storedValue == EmptyValue) {
// Not found
return null;
}
}
++bucket;
}
} finally {
if (acquiredLock) {
unlockRead(stamp);
}
}
}
@SuppressWarnings("NonAtomicVolatileUpdate")
V put(long key, V value, int keyHash, boolean onlyIfAbsent, LongFunction<V> valueProvider) {
int bucket = keyHash;
long stamp = writeLock();
int capacity = this.capacity;
// Remember where we find the first available spot
int firstDeletedKey = -1;
try {
while (true) {
bucket = signSafeMod(bucket, capacity);
long storedKey = keys[bucket];
V storedValue = values[bucket];
if (storedKey == key) {
if (storedValue == EmptyValue) {
values[bucket] = value != null ? value : valueProvider.apply(key);
++size;
++usedBuckets;
return valueProvider != null ? values[bucket] : null;
} else if (storedValue == DeletedValue) {
values[bucket] = value != null ? value : valueProvider.apply(key);
++size;
return valueProvider != null ? values[bucket] : null;
} else if (!onlyIfAbsent) {
// Over written an old value for same key
values[bucket] = value;
return storedValue;
} else {
return storedValue;
}
} else if (storedValue == EmptyValue) {
// Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
// key, we should write at that position
if (firstDeletedKey != -1) {
bucket = firstDeletedKey;
} else {
++usedBuckets;
}
keys[bucket] = key;
values[bucket] = value != null ? value : valueProvider.apply(key);
++size;
return valueProvider != null ? values[bucket] : null;
} else if (storedValue == DeletedValue) {
// The bucket contained a different deleted key
if (firstDeletedKey == -1) {
firstDeletedKey = bucket;
}
}
++bucket;
}
} finally {
if (usedBuckets > resizeThreshold) {
try {
rehash();
} finally {
unlockWrite(stamp);
}
} else {
unlockWrite(stamp);
}
}
}
@SuppressWarnings("NonAtomicVolatileUpdate")
private V remove(long key, Object value, int keyHash) {
int bucket = keyHash;
long stamp = writeLock();
try {
while (true) {
int capacity = this.capacity;
bucket = signSafeMod(bucket, capacity);
long storedKey = keys[bucket];
V storedValue = values[bucket];
if (storedKey == key) {
if (value == null || value.equals(storedValue)) {
if (storedValue == EmptyValue || storedValue == DeletedValue) {
return null;
}
--size;
V nextValueInArray = values[signSafeMod(bucket + 1, capacity)];
if (nextValueInArray == EmptyValue) {
values[bucket] = (V) EmptyValue;
--usedBuckets;
} else {
values[bucket] = (V) DeletedValue;
}
return storedValue;
} else {
return null;
}
} else if (storedValue == EmptyValue) {
// Key wasn't found
return null;
}
++bucket;
}
} finally {
unlockWrite(stamp);
}
}
void clear() {
long stamp = writeLock();
try {
Arrays.fill(keys, 0);
Arrays.fill(values, EmptyValue);
this.size = 0;
this.usedBuckets = 0;
} finally {
unlockWrite(stamp);
}
}
public void forEach(EntryProcessor<V> processor) {
long stamp = tryOptimisticRead();
int capacity = this.capacity;
long[] keys = this.keys;
V[] values = this.values;
boolean acquiredReadLock = false;
try {
// Validate no rehashing
if (!validate(stamp)) {
// Fallback to read lock
stamp = readLock();
acquiredReadLock = true;
capacity = this.capacity;
keys = this.keys;
values = this.values;
}
// Go through all the buckets for this section
for (int bucket = 0; bucket < capacity; bucket++) {
long storedKey = keys[bucket];
V storedValue = values[bucket];
if (!acquiredReadLock && !validate(stamp)) {
// Fallback to acquiring read lock
stamp = readLock();
acquiredReadLock = true;
storedKey = keys[bucket];
storedValue = values[bucket];
}
if (storedValue != DeletedValue && storedValue != EmptyValue) {
processor.accept(storedKey, storedValue);
}
}
} finally {
if (acquiredReadLock) {
unlockRead(stamp);
}
}
}
private void rehash() {
// Expand the hashmap
int newCapacity = capacity * 2;
long[] newKeys = new long[newCapacity];
V[] newValues = (V[]) new Object[newCapacity];
// Re-hash table
for (int i = 0; i < keys.length; i++) {
long storedKey = keys[i];
V storedValue = values[i];
if (storedValue != EmptyValue && storedValue != DeletedValue) {
insertKeyValueNoLock(newKeys, newValues, storedKey, storedValue);
}
}
capacity = newCapacity;
keys = newKeys;
values = newValues;
usedBuckets = size;
resizeThreshold = (int) (capacity * MapFillFactor);
}
private static <V> void insertKeyValueNoLock(long[] keys, V[] values, long key, V value) {
int bucket = (int) hash(key);
while (true) {
bucket = signSafeMod(bucket, keys.length);
V storedValue = values[bucket];
if (storedValue == EmptyValue) {
// The bucket is empty, so we can use it
keys[bucket] = key;
values[bucket] = value;
return;
}
++bucket;
}
}
}
private static final long HashMixer = 0xc6a4a7935bd1e995L;
private static final int R = 47;
static long hash(long key) {
long hash = key * HashMixer;
hash ^= hash >>> R;
hash *= HashMixer;
return hash;
}
static int signSafeMod(long n, int Max) {
return (int) n & (Max - 1);
}
static int alignToPowerOfTwo(int n) {
return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
}
}

View File

@ -0,0 +1,423 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.activemq.artemis.utils.collections;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.StampedLock;
/**
* Concurrent hash set for primitive longs
*
* Provides similar methods as a ConcurrentSet&lt;Long&gt; but since it's an open hash map with linear probing, no node
* allocations are required to store the values.
* <p>
* Items <strong>MUST</strong> be >= 0.
*/
public class ConcurrentLongHashSet {
private static final long EmptyItem = -1L;
private static final long DeletedItem = -2L;
private static final float SetFillFactor = 0.66f;
private static final int DefaultExpectedItems = 256;
private static final int DefaultConcurrencyLevel = 16;
private final Section[] sections;
public interface ConsumerLong {
void accept(long item);
}
public ConcurrentLongHashSet() {
this(DefaultExpectedItems);
}
public ConcurrentLongHashSet(int expectedItems) {
this(expectedItems, DefaultConcurrencyLevel);
}
public ConcurrentLongHashSet(int expectedItems, final int numSections) {
checkArgument(numSections > 0);
if (expectedItems < numSections) {
expectedItems = numSections;
}
int perSectionExpectedItems = expectedItems / numSections;
int perSectionCapacity = (int) (perSectionExpectedItems / SetFillFactor);
this.sections = new Section[numSections];
for (int i = 0; i < numSections; i++) {
sections[i] = new Section(perSectionCapacity);
}
}
public int size() {
int size = 0;
for (Section s : sections) {
size += s.size;
}
return size;
}
public long capacity() {
long capacity = 0;
for (Section s : sections) {
capacity += s.capacity;
}
return capacity;
}
public boolean isEmpty() {
for (Section s : sections) {
if (s.size != 0) {
return false;
}
}
return true;
}
long getUsedBucketCount() {
long usedBucketCount = 0;
for (Section s : sections) {
usedBucketCount += s.usedBuckets;
}
return usedBucketCount;
}
public boolean contains(long item) {
checkBiggerEqualZero(item);
long h = hash(item);
return getSection(h).contains(item, (int) h);
}
public boolean add(long item) {
checkBiggerEqualZero(item);
long h = hash(item);
return getSection(h).add(item, (int) h);
}
/**
* Remove an existing entry if found
*
* @param item
* @return true if removed or false if item was not present
*/
public boolean remove(long item) {
checkBiggerEqualZero(item);
long h = hash(item);
return getSection(h).remove(item, (int) h);
}
private Section getSection(long hash) {
// Use 32 msb out of long to get the section
final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
return sections[sectionIdx];
}
public void clear() {
for (Section s : sections) {
s.clear();
}
}
public void forEach(ConsumerLong processor) {
for (Section s : sections) {
s.forEach(processor);
}
}
/**
* @return a new list of all keys (makes a copy)
*/
public Set<Long> items() {
Set<Long> items = new HashSet<>();
forEach(items::add);
return items;
}
// A section is a portion of the hash map that is covered by a single
@SuppressWarnings("serial")
private static final class Section extends StampedLock {
// Keys and values are stored interleaved in the table array
private long[] table;
private int capacity;
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
Section(int capacity) {
this.capacity = alignToPowerOfTwo(capacity);
this.table = new long[this.capacity];
this.size = 0;
this.usedBuckets = 0;
this.resizeThreshold = (int) (this.capacity * SetFillFactor);
Arrays.fill(table, EmptyItem);
}
boolean contains(long item, int hash) {
long stamp = tryOptimisticRead();
boolean acquiredLock = false;
int bucket = signSafeMod(hash, capacity);
try {
while (true) {
// First try optimistic locking
long storedItem = table[bucket];
if (!acquiredLock && validate(stamp)) {
// The values we have read are consistent
if (item == storedItem) {
return true;
} else if (storedItem == EmptyItem) {
// Not found
return false;
}
} else {
// Fallback to acquiring read lock
if (!acquiredLock) {
stamp = readLock();
acquiredLock = true;
bucket = signSafeMod(hash, capacity);
storedItem = table[bucket];
}
if (item == storedItem) {
return true;
} else if (storedItem == EmptyItem) {
// Not found
return false;
}
}
bucket = (bucket + 1) & (table.length - 1);
}
} finally {
if (acquiredLock) {
unlockRead(stamp);
}
}
}
@SuppressWarnings("NonAtomicVolatileUpdate")
boolean add(long item, long hash) {
long stamp = writeLock();
int bucket = signSafeMod(hash, capacity);
// Remember where we find the first available spot
int firstDeletedItem = -1;
try {
while (true) {
long storedItem = table[bucket];
if (item == storedItem) {
// Item was already in set
return false;
} else if (storedItem == EmptyItem) {
// Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
// key, we should write at that position
if (firstDeletedItem != -1) {
bucket = firstDeletedItem;
} else {
++usedBuckets;
}
table[bucket] = item;
++size;
return true;
} else if (storedItem == DeletedItem) {
// The bucket contained a different deleted key
if (firstDeletedItem == -1) {
firstDeletedItem = bucket;
}
}
bucket = (bucket + 1) & (table.length - 1);
}
} finally {
if (usedBuckets > resizeThreshold) {
try {
rehash();
} finally {
unlockWrite(stamp);
}
} else {
unlockWrite(stamp);
}
}
}
@SuppressWarnings("NonAtomicVolatileUpdate")
private boolean remove(long item, int hash) {
long stamp = writeLock();
int bucket = signSafeMod(hash, capacity);
try {
while (true) {
long storedItem = table[bucket];
if (item == storedItem) {
--size;
cleanBucket(bucket);
return true;
} else if (storedItem == EmptyItem) {
// Key wasn't found
return false;
}
bucket = (bucket + 1) & (table.length - 1);
}
} finally {
unlockWrite(stamp);
}
}
private void cleanBucket(int bucket) {
int nextInArray = (bucket + 1) & (table.length - 1);
if (table[nextInArray] == EmptyItem) {
table[bucket] = EmptyItem;
--usedBuckets;
} else {
table[bucket] = DeletedItem;
}
}
void clear() {
long stamp = writeLock();
try {
Arrays.fill(table, EmptyItem);
this.size = 0;
this.usedBuckets = 0;
} finally {
unlockWrite(stamp);
}
}
public void forEach(ConsumerLong processor) {
long stamp = tryOptimisticRead();
long[] table = this.table;
boolean acquiredReadLock = false;
try {
// Validate no rehashing
if (!validate(stamp)) {
// Fallback to read lock
stamp = readLock();
acquiredReadLock = true;
table = this.table;
}
// Go through all the buckets for this section
for (int bucket = 0; bucket < table.length; bucket++) {
long storedItem = table[bucket];
if (!acquiredReadLock && !validate(stamp)) {
// Fallback to acquiring read lock
stamp = readLock();
acquiredReadLock = true;
storedItem = table[bucket];
}
if (storedItem != DeletedItem && storedItem != EmptyItem) {
processor.accept(storedItem);
}
}
} finally {
if (acquiredReadLock) {
unlockRead(stamp);
}
}
}
private void rehash() {
// Expand the hashmap
int newCapacity = capacity * 2;
long[] newTable = new long[newCapacity];
Arrays.fill(newTable, EmptyItem);
// Re-hash table
for (int i = 0; i < table.length; i++) {
long storedItem = table[i];
if (storedItem != EmptyItem && storedItem != DeletedItem) {
insertKeyValueNoLock(newTable, newCapacity, storedItem);
}
}
capacity = newCapacity;
table = newTable;
usedBuckets = size;
resizeThreshold = (int) (capacity * SetFillFactor);
}
private static void insertKeyValueNoLock(long[] table, int capacity, long item) {
int bucket = signSafeMod(hash(item), capacity);
while (true) {
long storedKey = table[bucket];
if (storedKey == EmptyItem) {
// The bucket is empty, so we can use it
table[bucket] = item;
return;
}
bucket = (bucket + 1) & (table.length - 1);
}
}
}
private static final long HashMixer = 0xc6a4a7935bd1e995L;
private static final int R = 47;
static long hash(long key) {
long hash = key * HashMixer;
hash ^= hash >>> R;
hash *= HashMixer;
return hash;
}
static int signSafeMod(long n, int Max) {
return (int) (n & (Max - 1));
}
static int alignToPowerOfTwo(int n) {
return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
}
static void checkBiggerEqualZero(long n) {
if (n < 0L) {
throw new IllegalArgumentException("Keys and values must be >= 0");
}
}
}

View File

@ -0,0 +1,405 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.activemq.artemis.utils.collections;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongFunction;
import com.google.common.collect.Lists;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
public class ConcurrentLongHashMapTest {
@Test
public void simpleInsertions() {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16);
assertTrue(map.isEmpty());
assertNull(map.put(1, "one"));
assertFalse(map.isEmpty());
assertNull(map.put(2, "two"));
assertNull(map.put(3, "three"));
assertEquals(map.size(), 3);
assertEquals(map.get(1), "one");
assertEquals(map.size(), 3);
assertEquals(map.remove(1), "one");
assertEquals(map.size(), 2);
assertEquals(map.get(1), null);
assertEquals(map.get(5), null);
assertEquals(map.size(), 2);
assertNull(map.put(1, "one"));
assertEquals(map.size(), 3);
assertEquals(map.put(1, "uno"), "one");
assertEquals(map.size(), 3);
}
@Test
public void testRemove() {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
assertTrue(map.isEmpty());
assertNull(map.put(1, "one"));
assertFalse(map.isEmpty());
assertFalse(map.remove(0, "zero"));
assertFalse(map.remove(1, "uno"));
assertFalse(map.isEmpty());
assertTrue(map.remove(1, "one"));
assertTrue(map.isEmpty());
}
@Test
public void testNegativeUsedBucketCount() {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);
map.put(0, "zero");
assertEquals(1, map.getUsedBucketCount());
map.put(0, "zero1");
assertEquals(1, map.getUsedBucketCount());
map.remove(0);
assertEquals(0, map.getUsedBucketCount());
map.remove(0);
assertEquals(0, map.getUsedBucketCount());
}
@Test
public void testRehashing() {
int n = 16;
ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
assertEquals(map.capacity(), n);
assertEquals(map.size(), 0);
for (int i = 0; i < n; i++) {
map.put(i, i);
}
assertEquals(map.capacity(), 2 * n);
assertEquals(map.size(), n);
}
@Test
public void testRehashingWithDeletes() {
int n = 16;
ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
assertEquals(map.capacity(), n);
assertEquals(map.size(), 0);
for (int i = 0; i < n / 2; i++) {
map.put(i, i);
}
for (int i = 0; i < n / 2; i++) {
map.remove(i);
}
for (int i = n; i < (2 * n); i++) {
map.put(i, i);
}
assertEquals(map.capacity(), 2 * n);
assertEquals(map.size(), n);
}
@Test
public void concurrentInsertions() throws Throwable {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
final int N = 100_000;
String value = "value";
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < nThreads; i++) {
final int threadIdx = i;
futures.add(executor.submit(() -> {
Random random = new Random();
for (int j = 0; j < N; j++) {
long key = random.nextLong();
// Ensure keys are uniques
key -= key % (threadIdx + 1);
map.put(key, value);
}
}));
}
for (Future<?> future : futures) {
future.get();
}
assertEquals(map.size(), N * nThreads);
executor.shutdown();
}
@Test
public void concurrentInsertionsAndReads() throws Throwable {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
final int N = 100_000;
String value = "value";
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < nThreads; i++) {
final int threadIdx = i;
futures.add(executor.submit(() -> {
Random random = new Random();
for (int j = 0; j < N; j++) {
long key = random.nextLong();
// Ensure keys are uniques
key -= key % (threadIdx + 1);
map.put(key, value);
}
}));
}
for (Future<?> future : futures) {
future.get();
}
assertEquals(map.size(), N * nThreads);
executor.shutdown();
}
@Test
public void testIteration() {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
assertEquals(map.keys(), Collections.emptyList());
assertEquals(map.values(), Collections.emptyList());
map.put(0, "zero");
assertEquals(map.keys(), Lists.newArrayList(0L));
assertEquals(map.values(), Lists.newArrayList("zero"));
map.remove(0);
assertEquals(map.keys(), Collections.emptyList());
assertEquals(map.values(), Collections.emptyList());
map.put(0, "zero");
map.put(1, "one");
map.put(2, "two");
List<Long> keys = map.keys();
Collections.sort(keys);
assertEquals(keys, Lists.newArrayList(0L, 1L, 2L));
List<String> values = map.values();
Collections.sort(values);
assertEquals(values, Lists.newArrayList("one", "two", "zero"));
map.put(1, "uno");
keys = map.keys();
Collections.sort(keys);
assertEquals(keys, Lists.newArrayList(0L, 1L, 2L));
values = map.values();
Collections.sort(values);
assertEquals(values, Lists.newArrayList("two", "uno", "zero"));
map.clear();
assertTrue(map.isEmpty());
}
@Test
public void testHashConflictWithDeletion() {
final int Buckets = 16;
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(Buckets, 1);
// Pick 2 keys that fall into the same bucket
long key1 = 1;
long key2 = 27;
int bucket1 = ConcurrentLongHashMap.signSafeMod(ConcurrentLongHashMap.hash(key1), Buckets);
int bucket2 = ConcurrentLongHashMap.signSafeMod(ConcurrentLongHashMap.hash(key2), Buckets);
assertEquals(bucket1, bucket2);
assertEquals(map.put(key1, "value-1"), null);
assertEquals(map.put(key2, "value-2"), null);
assertEquals(map.size(), 2);
assertEquals(map.remove(key1), "value-1");
assertEquals(map.size(), 1);
assertEquals(map.put(key1, "value-1-overwrite"), null);
assertEquals(map.size(), 2);
assertEquals(map.remove(key1), "value-1-overwrite");
assertEquals(map.size(), 1);
assertEquals(map.put(key2, "value-2-overwrite"), "value-2");
assertEquals(map.get(key2), "value-2-overwrite");
assertEquals(map.size(), 1);
assertEquals(map.remove(key2), "value-2-overwrite");
assertTrue(map.isEmpty());
}
@Test
public void testPutIfAbsent() {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
assertEquals(map.putIfAbsent(1, "one"), null);
assertEquals(map.get(1), "one");
assertEquals(map.putIfAbsent(1, "uno"), "one");
assertEquals(map.get(1), "one");
}
@Test
public void testComputeIfAbsent() {
ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(16, 1);
AtomicInteger counter = new AtomicInteger();
LongFunction<Integer> provider = key -> counter.getAndIncrement();
assertEquals(map.computeIfAbsent(0, provider).intValue(), 0);
assertEquals(map.get(0).intValue(), 0);
assertEquals(map.computeIfAbsent(1, provider).intValue(), 1);
assertEquals(map.get(1).intValue(), 1);
assertEquals(map.computeIfAbsent(1, provider).intValue(), 1);
assertEquals(map.get(1).intValue(), 1);
assertEquals(map.computeIfAbsent(2, provider).intValue(), 2);
assertEquals(map.get(2).intValue(), 2);
}
int Iterations = 1;
int ReadIterations = 100;
int N = 1_000_000;
public void benchConcurrentLongHashMap() throws Exception {
// public static void main(String args[]) {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(N, 1);
for (long i = 0; i < Iterations; i++) {
for (int j = 0; j < N; j++) {
map.put(i, "value");
}
for (long h = 0; h < ReadIterations; h++) {
for (int j = 0; j < N; j++) {
map.get(i);
}
}
for (int j = 0; j < N; j++) {
map.remove(i);
}
}
}
public void benchConcurrentHashMap() throws Exception {
ConcurrentHashMap<Long, String> map = new ConcurrentHashMap<>(N, 0.66f, 1);
for (long i = 0; i < Iterations; i++) {
for (int j = 0; j < N; j++) {
map.put(i, "value");
}
for (long h = 0; h < ReadIterations; h++) {
for (int j = 0; j < N; j++) {
map.get(i);
}
}
for (int j = 0; j < N; j++) {
map.remove(i);
}
}
}
void benchHashMap() throws Exception {
HashMap<Long, String> map = new HashMap<>(N, 0.66f);
for (long i = 0; i < Iterations; i++) {
for (int j = 0; j < N; j++) {
map.put(i, "value");
}
for (long h = 0; h < ReadIterations; h++) {
for (int j = 0; j < N; j++) {
map.get(i);
}
}
for (int j = 0; j < N; j++) {
map.remove(i);
}
}
}
public static void main(String[] args) throws Exception {
ConcurrentLongHashMapTest t = new ConcurrentLongHashMapTest();
long start = System.nanoTime();
// t.benchHashMap();
long end = System.nanoTime();
System.out.println("HM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
start = System.nanoTime();
t.benchConcurrentHashMap();
end = System.nanoTime();
System.out.println("CHM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
start = System.nanoTime();
// t.benchConcurrentLongHashMap();
end = System.nanoTime();
System.out.println("CLHM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
}
}

View File

@ -0,0 +1,249 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.activemq.artemis.utils.collections;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class ConcurrentLongHashSetTest {
@Test
public void simpleInsertions() {
ConcurrentLongHashSet set = new ConcurrentLongHashSet(16);
assertTrue(set.isEmpty());
assertTrue(set.add(1));
assertFalse(set.isEmpty());
assertTrue(set.add(2));
assertTrue(set.add(3));
assertEquals(set.size(), 3);
assertTrue(set.contains(1));
assertEquals(set.size(), 3);
assertTrue(set.remove(1));
assertEquals(set.size(), 2);
assertFalse(set.contains(1));
assertFalse(set.contains(5));
assertEquals(set.size(), 2);
assertTrue(set.add(1));
assertEquals(set.size(), 3);
assertFalse(set.add(1));
assertEquals(set.size(), 3);
}
@Test
public void testRemove() {
ConcurrentLongHashSet set = new ConcurrentLongHashSet();
assertTrue(set.isEmpty());
assertTrue(set.add(1));
assertFalse(set.isEmpty());
assertFalse(set.remove(0));
assertFalse(set.isEmpty());
assertTrue(set.remove(1));
assertTrue(set.isEmpty());
}
@Test
public void testRehashing() {
int n = 16;
ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1);
assertEquals(set.capacity(), n);
assertEquals(set.size(), 0);
for (int i = 0; i < n; i++) {
set.add(i);
}
assertEquals(set.capacity(), 2 * n);
assertEquals(set.size(), n);
}
@Test
public void testRehashingWithDeletes() {
int n = 16;
ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1);
assertEquals(set.capacity(), n);
assertEquals(set.size(), 0);
for (int i = 0; i < n / 2; i++) {
set.add(i);
}
for (int i = 0; i < n / 2; i++) {
set.remove(i);
}
for (int i = n; i < (2 * n); i++) {
set.add(i);
}
assertEquals(set.capacity(), 2 * n);
assertEquals(set.size(), n);
}
@Test
public void concurrentInsertions() throws Throwable {
ConcurrentLongHashSet set = new ConcurrentLongHashSet();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
final int N = 100_000;
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < nThreads; i++) {
final int threadIdx = i;
futures.add(executor.submit(() -> {
Random random = new Random();
for (int j = 0; j < N; j++) {
long key = Math.abs(random.nextLong());
// Ensure keys are unique
key -= key % (threadIdx + 1);
set.add(key);
}
}));
}
for (Future<?> future : futures) {
future.get();
}
assertEquals(set.size(), N * nThreads);
executor.shutdown();
}
@Test
public void concurrentInsertionsAndReads() throws Throwable {
ConcurrentLongHashSet map = new ConcurrentLongHashSet();
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 16;
final int N = 100_000;
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < nThreads; i++) {
final int threadIdx = i;
futures.add(executor.submit(() -> {
Random random = new Random();
for (int j = 0; j < N; j++) {
long key = Math.abs(random.nextLong());
// Ensure keys are unique
key -= key % (threadIdx + 1);
map.add(key);
}
}));
}
for (Future<?> future : futures) {
future.get();
}
assertEquals(map.size(), N * nThreads);
executor.shutdown();
}
@Test
public void testIteration() {
ConcurrentLongHashSet set = new ConcurrentLongHashSet();
assertEquals(set.items(), Collections.emptySet());
set.add(0L);
assertEquals(set.items(), Sets.newHashSet(0L));
set.remove(0L);
assertEquals(set.items(), Collections.emptySet());
set.add(0L);
set.add(1L);
set.add(2L);
List<Long> values = Lists.newArrayList(set.items());
Collections.sort(values);
assertEquals(values, Lists.newArrayList(0L, 1L, 2L));
set.clear();
assertTrue(set.isEmpty());
}
@Test
public void testHashConflictWithDeletion() {
final int Buckets = 16;
ConcurrentLongHashSet set = new ConcurrentLongHashSet(Buckets, 1);
// Pick 2 keys that fall into the same bucket
long key1 = 1;
long key2 = 27;
int bucket1 = ConcurrentLongHashSet.signSafeMod(ConcurrentLongHashSet.hash(key1), Buckets);
int bucket2 = ConcurrentLongHashSet.signSafeMod(ConcurrentLongHashSet.hash(key2), Buckets);
assertEquals(bucket1, bucket2);
assertTrue(set.add(key1));
assertTrue(set.add(key2));
assertEquals(set.size(), 2);
assertTrue(set.remove(key1));
assertEquals(set.size(), 1);
assertTrue(set.add(key1));
assertEquals(set.size(), 2);
assertTrue(set.remove(key1));
assertEquals(set.size(), 1);
assertFalse(set.add(key2));
assertTrue(set.contains(key2));
assertEquals(set.size(), 1);
assertTrue(set.remove(key2));
assertTrue(set.isEmpty());
}
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.journal.impl;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@ -30,7 +29,7 @@ import org.apache.activemq.artemis.core.journal.EncoderPersister;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
/** /**
* Super class for Journal maintenances such as clean up and Compactor * Super class for Journal maintenances such as clean up and Compactor
@ -56,7 +55,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
private ActiveMQBuffer writingChannel; private ActiveMQBuffer writingChannel;
private final Set<Long> recordsSnapshot = new ConcurrentHashSet<>(); private final ConcurrentLongHashSet recordsSnapshot;
protected final List<JournalFile> newDataFiles = new ArrayList<>(); protected final List<JournalFile> newDataFiles = new ArrayList<>();
@ -67,14 +66,14 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory, protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory,
final JournalImpl journal, final JournalImpl journal,
final JournalFilesRepository filesRepository, final JournalFilesRepository filesRepository,
final Set<Long> recordsSnapshot, final ConcurrentLongHashSet recordsSnapshot,
final long nextOrderingID) { final long nextOrderingID) {
super(); super();
this.journal = journal; this.journal = journal;
this.filesRepository = filesRepository; this.filesRepository = filesRepository;
this.fileFactory = fileFactory; this.fileFactory = fileFactory;
this.nextOrderingID = nextOrderingID; this.nextOrderingID = nextOrderingID;
this.recordsSnapshot.addAll(recordsSnapshot); this.recordsSnapshot = recordsSnapshot;
} }
// Public -------------------------------------------------------- // Public --------------------------------------------------------

View File

@ -18,8 +18,6 @@ package org.apache.activemq.artemis.core.journal.impl;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -43,6 +41,7 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRec
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRecordTX; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRecordTX;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
/** /**
* Journal used at a replicating backup server during the synchronization of data with the 'live' * Journal used at a replicating backup server during the synchronization of data with the 'live'
@ -54,7 +53,7 @@ public final class FileWrapperJournal extends JournalBase {
private final ReentrantLock lockAppend = new ReentrantLock(); private final ReentrantLock lockAppend = new ReentrantLock();
private final ConcurrentMap<Long, AtomicInteger> transactions = new ConcurrentHashMap<>(); private final ConcurrentLongHashMap<AtomicInteger> transactions = new ConcurrentLongHashMap<>();
private final JournalImpl journal; private final JournalImpl journal;
protected volatile JournalFile currentFile; protected volatile JournalFile currentFile;
@ -181,7 +180,7 @@ public final class FileWrapperJournal extends JournalBase {
IOCompletion callback, IOCompletion callback,
boolean lineUpContext) throws Exception { boolean lineUpContext) throws Exception {
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null); JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
AtomicInteger value = transactions.remove(Long.valueOf(txID)); AtomicInteger value = transactions.remove(txID);
if (value != null) { if (value != null) {
commitRecord.setNumberOfRecords(value.get()); commitRecord.setNumberOfRecords(value.get());
} }
@ -195,7 +194,7 @@ public final class FileWrapperJournal extends JournalBase {
boolean sync, boolean sync,
IOCompletion callback) throws Exception { IOCompletion callback) throws Exception {
JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData); JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
AtomicInteger value = transactions.get(Long.valueOf(txID)); AtomicInteger value = transactions.get(txID);
if (value != null) { if (value != null) {
prepareRecord.setNumberOfRecords(value.get()); prepareRecord.setNumberOfRecords(value.get());
} }
@ -204,7 +203,7 @@ public final class FileWrapperJournal extends JournalBase {
private int count(long txID) throws ActiveMQException { private int count(long txID) throws ActiveMQException {
AtomicInteger defaultValue = new AtomicInteger(1); AtomicInteger defaultValue = new AtomicInteger(1);
AtomicInteger count = transactions.putIfAbsent(Long.valueOf(txID), defaultValue); AtomicInteger count = transactions.putIfAbsent(txID, defaultValue);
if (count != null) { if (count != null) {
return count.incrementAndGet(); return count.incrementAndGet();
} }
@ -219,7 +218,7 @@ public final class FileWrapperJournal extends JournalBase {
@Override @Override
public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception { public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID); JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
AtomicInteger value = transactions.remove(Long.valueOf(txID)); AtomicInteger value = transactions.remove(txID);
if (value != null) { if (value != null) {
rollbackRecord.setNumberOfRecords(value.get()); rollbackRecord.setNumberOfRecords(value.get());
} }

View File

@ -18,12 +18,8 @@ package org.apache.activemq.artemis.core.journal.impl;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@ -41,6 +37,8 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalDeleteRec
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalRollbackRecordTX;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
public class JournalCompactor extends AbstractJournalUpdateTask implements JournalRecordProvider { public class JournalCompactor extends AbstractJournalUpdateTask implements JournalRecordProvider {
@ -53,11 +51,11 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
private static final short COMPACT_SPLIT_LINE = 2; private static final short COMPACT_SPLIT_LINE = 2;
// Snapshot of transactions that were pending when the compactor started // Snapshot of transactions that were pending when the compactor started
private final Map<Long, PendingTransaction> pendingTransactions = new ConcurrentHashMap<>(); private final ConcurrentLongHashMap<PendingTransaction> pendingTransactions = new ConcurrentLongHashMap<>();
private final Map<Long, JournalRecord> newRecords = new HashMap<>(); private final ConcurrentLongHashMap<JournalRecord> newRecords = new ConcurrentLongHashMap<>();
private final Map<Long, JournalTransaction> newTransactions = new HashMap<>(); private final ConcurrentLongHashMap<JournalTransaction> newTransactions = new ConcurrentLongHashMap<>();
/** /**
* Commands that happened during compacting * Commands that happened during compacting
@ -120,18 +118,18 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
return newDataFiles; return newDataFiles;
} }
public Map<Long, JournalRecord> getNewRecords() { public ConcurrentLongHashMap<JournalRecord> getNewRecords() {
return newRecords; return newRecords;
} }
public Map<Long, JournalTransaction> getNewTransactions() { public ConcurrentLongHashMap<JournalTransaction> getNewTransactions() {
return newTransactions; return newTransactions;
} }
public JournalCompactor(final SequentialFileFactory fileFactory, public JournalCompactor(final SequentialFileFactory fileFactory,
final JournalImpl journal, final JournalImpl journal,
final JournalFilesRepository filesRepository, final JournalFilesRepository filesRepository,
final Set<Long> recordsSnapshot, final ConcurrentLongHashSet recordsSnapshot,
final long firstFileID) { final long firstFileID) {
super(fileFactory, journal, filesRepository, recordsSnapshot, firstFileID); super(fileFactory, journal, filesRepository, recordsSnapshot, firstFileID);
} }
@ -628,7 +626,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
} }
@Override @Override
public Map<Long, JournalRecord> getRecords() { public ConcurrentLongHashMap<JournalRecord> getRecords() {
return newRecords; return newRecords;
} }

View File

@ -31,8 +31,6 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
@ -74,6 +72,8 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
@ -168,12 +168,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private final JournalFilesRepository filesRepository; private final JournalFilesRepository filesRepository;
// Compacting may replace this structure // Compacting may replace this structure
private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<>(); private final ConcurrentLongHashMap<JournalRecord> records = new ConcurrentLongHashMap<>();
private final Set<Long> pendingRecords = new ConcurrentHashSet<>(); private final ConcurrentLongHashSet pendingRecords = new ConcurrentLongHashSet();
// Compacting may replace this structure // Compacting may replace this structure
private final ConcurrentMap<Long, JournalTransaction> transactions = new ConcurrentHashMap<>(); private final ConcurrentLongHashMap<JournalTransaction> transactions = new ConcurrentLongHashMap<>();
// This will be set only while the JournalCompactor is being executed // This will be set only while the JournalCompactor is being executed
private volatile JournalCompactor compactor; private volatile JournalCompactor compactor;
@ -345,7 +345,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} }
@Override @Override
public Map<Long, JournalRecord> getRecords() { public ConcurrentLongHashMap<JournalRecord> getRecords() {
return records; return records;
} }
@ -1487,12 +1487,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
return; return;
} }
compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keySet(), dataFilesToProcess.get(0).getFileID()); compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keysLongHashSet(), dataFilesToProcess.get(0).getFileID());
for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet()) { transactions.forEach((id, pendingTransaction) -> {
compactor.addPendingTransaction(entry.getKey(), entry.getValue().getPositiveArray()); compactor.addPendingTransaction(id, pendingTransaction.getPositiveArray());
entry.getValue().setCompacting(); pendingTransaction.setCompacting();
} });
// We will calculate the new records during compacting, what will take the position the records will take // We will calculate the new records during compacting, what will take the position the records will take
// after compacting // after compacting
@ -1540,9 +1540,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
newDatafiles = localCompactor.getNewDataFiles(); newDatafiles = localCompactor.getNewDataFiles();
// Restore newRecords created during compacting // Restore newRecords created during compacting
for (Map.Entry<Long, JournalRecord> newRecordEntry : localCompactor.getNewRecords().entrySet()) { localCompactor.getNewRecords().forEach((id, newRecord) -> {
records.put(newRecordEntry.getKey(), newRecordEntry.getValue()); records.put(id, newRecord);
} });
// Restore compacted dataFiles // Restore compacted dataFiles
for (int i = newDatafiles.size() - 1; i >= 0; i--) { for (int i = newDatafiles.size() - 1; i >= 0; i--) {
@ -1559,9 +1559,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// Replay pending commands (including updates, deletes and commits) // Replay pending commands (including updates, deletes and commits)
for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values()) { localCompactor.getNewTransactions().forEach((id, newTransaction) -> newTransaction.replaceRecordProvider(this));
newTransaction.replaceRecordProvider(this);
}
localCompactor.replayPendingCommands(); localCompactor.replayPendingCommands();
@ -1569,7 +1567,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// This has to be done after the replay pending commands, as we need to delete commits // This has to be done after the replay pending commands, as we need to delete commits
// that happened during the compacting // that happened during the compacting
for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values()) { localCompactor.getNewTransactions().forEach((id, newTransaction) -> {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Merging pending transaction " + newTransaction + " after compacting the journal"); logger.trace("Merging pending transaction " + newTransaction + " after compacting the journal");
} }
@ -1579,7 +1577,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} else { } else {
ActiveMQJournalLogger.LOGGER.compactMergeError(newTransaction.getId()); ActiveMQJournalLogger.LOGGER.compactMergeError(newTransaction.getId());
} }
} });
} finally { } finally {
journalLock.writeLock().unlock(); journalLock.writeLock().unlock();
} }

View File

@ -16,7 +16,7 @@
*/ */
package org.apache.activemq.artemis.core.journal.impl; package org.apache.activemq.artemis.core.journal.impl;
import java.util.Map; import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
/** /**
* This is an interface used only internally. * This is an interface used only internally.
@ -29,5 +29,5 @@ public interface JournalRecordProvider {
JournalCompactor getCompactor(); JournalCompactor getCompactor();
Map<Long, JournalRecord> getRecords(); ConcurrentLongHashMap<JournalRecord> getRecords();
} }

View File

@ -371,7 +371,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
* @throws Exception * @throws Exception
*/ */
private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception { private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception {
Long id = Long.valueOf(msg.getId()); long id = msg.getId();
byte[] data = msg.getData(); byte[] data = msg.getData();
SequentialFile channel1; SequentialFile channel1;
switch (msg.getFileType()) { switch (msg.getFileType()) {

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.stress.journal;
import java.io.File; import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -238,12 +237,15 @@ public class JournalCleanupCompactStressTest extends ActiveMQTestBase {
reloadJournal(); reloadJournal();
Collection<Long> records = journal.getRecords().keySet();
System.out.println("Deleting everything!"); System.out.println("Deleting everything!");
for (Long delInfo : records) {
journal.appendDeleteRecord(delInfo, false); journal.getRecords().forEach((id, record) -> {
} try {
journal.appendDeleteRecord(id, false);
} catch (Exception e) {
new RuntimeException(e);
}
});
journal.forceMoveNextFile(); journal.forceMoveNextFile();