This commit is contained in:
Clebert Suconic 2018-02-26 17:38:36 -05:00
commit bfb6500c69
2 changed files with 28 additions and 12 deletions

View File

@ -20,17 +20,17 @@
*/ */
package org.apache.activemq.artemis.utils.collections; package org.apache.activemq.artemis.utils.collections;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.StampedLock; import java.util.concurrent.locks.StampedLock;
import java.util.function.LongFunction; import java.util.function.LongFunction;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/** /**
* Map from long to an Object. * Map from long to an Object.
* *
@ -81,6 +81,9 @@ public class ConcurrentLongHashMap<V> {
public int size() { public int size() {
int size = 0; int size = 0;
for (Section<V> s : sections) { for (Section<V> s : sections) {
//read-acquire s.size that was write-released by s.unlockWrite
s.tryOptimisticRead();
//a stale value won't hurt: anyway it's subject to concurrent modifications
size += s.size; size += s.size;
} }
return size; return size;
@ -104,6 +107,9 @@ public class ConcurrentLongHashMap<V> {
public boolean isEmpty() { public boolean isEmpty() {
for (Section<V> s : sections) { for (Section<V> s : sections) {
//read-acquire s.size that was write-released by s.unlockWrite
s.tryOptimisticRead();
//a stale value won't hurt: anyway it's subject to concurrent modifications
if (s.size != 0) { if (s.size != 0) {
return false; return false;
} }
@ -196,11 +202,13 @@ public class ConcurrentLongHashMap<V> {
// A section is a portion of the hash map that is covered by a single // A section is a portion of the hash map that is covered by a single
@SuppressWarnings("serial") @SuppressWarnings("serial")
private static final class Section<V> extends StampedLock { private static final class Section<V> extends StampedLock {
private static final AtomicIntegerFieldUpdater<Section> CAPACITY_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Section.class, "capacity");
private long[] keys; private long[] keys;
private V[] values; private V[] values;
private volatile int capacity; private volatile int capacity;
private volatile int size; private int size;
private int usedBuckets; private int usedBuckets;
private int resizeThreshold; private int resizeThreshold;
@ -460,8 +468,8 @@ public class ConcurrentLongHashMap<V> {
keys = newKeys; keys = newKeys;
values = newValues; values = newValues;
usedBuckets = size; usedBuckets = size;
capacity = newCapacity; CAPACITY_UPDATER.lazySet(this, newCapacity);
resizeThreshold = (int) (capacity * MapFillFactor); resizeThreshold = (int) (newCapacity * MapFillFactor);
} }
private static <V> void insertKeyValueNoLock(long[] keys, V[] values, long key, V value) { private static <V> void insertKeyValueNoLock(long[] keys, V[] values, long key, V value) {

View File

@ -20,13 +20,14 @@
*/ */
package org.apache.activemq.artemis.utils.collections; package org.apache.activemq.artemis.utils.collections;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.StampedLock; import java.util.concurrent.locks.StampedLock;
import static com.google.common.base.Preconditions.checkArgument;
/** /**
* Concurrent hash set for primitive longs * Concurrent hash set for primitive longs
* *
@ -77,6 +78,9 @@ public class ConcurrentLongHashSet {
public int size() { public int size() {
int size = 0; int size = 0;
for (Section s : sections) { for (Section s : sections) {
//read-acquire s.size that was write-released by s.unlockWrite
s.tryOptimisticRead();
//a stale value won't hurt: anyway it's subject to concurrent modifications
size += s.size; size += s.size;
} }
return size; return size;
@ -92,6 +96,9 @@ public class ConcurrentLongHashSet {
public boolean isEmpty() { public boolean isEmpty() {
for (Section s : sections) { for (Section s : sections) {
//read-acquire s.size that was write-released by s.unlockWrite
s.tryOptimisticRead();
//a stale value won't hurt: anyway it's subject to concurrent modifications
if (s.size != 0) { if (s.size != 0) {
return false; return false;
} }
@ -162,11 +169,12 @@ public class ConcurrentLongHashSet {
// A section is a portion of the hash map that is covered by a single // A section is a portion of the hash map that is covered by a single
@SuppressWarnings("serial") @SuppressWarnings("serial")
private static final class Section extends StampedLock { private static final class Section extends StampedLock {
private static final AtomicIntegerFieldUpdater<Section> CAPACITY_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Section.class, "capacity");
// Keys and values are stored interleaved in the table array // Keys and values are stored interleaved in the table array
private long[] table; private long[] table;
private volatile int capacity; private volatile int capacity;
private volatile int size; private int size;
private int usedBuckets; private int usedBuckets;
private int resizeThreshold; private int resizeThreshold;
@ -376,8 +384,8 @@ public class ConcurrentLongHashSet {
table = newTable; table = newTable;
usedBuckets = size; usedBuckets = size;
capacity = newCapacity; CAPACITY_UPDATER.lazySet(this, newCapacity);
resizeThreshold = (int) (capacity * SetFillFactor); resizeThreshold = (int) (newCapacity * SetFillFactor);
} }
private static void insertKeyValueNoLock(long[] table, int capacity, long item) { private static void insertKeyValueNoLock(long[] table, int capacity, long item) {