From 985559d0869d5eb7b0c99c253fd8f2287afbded7 Mon Sep 17 00:00:00 2001 From: franz1981 Date: Fri, 4 Dec 2020 05:21:40 +0100 Subject: [PATCH 1/3] ARTEMIS-3016 Adding JMH benchmark on duplicate ids cache --- .../jmh/DuplicateIDCacheBenchmark.java | 138 ++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/DuplicateIDCacheBenchmark.java diff --git a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/DuplicateIDCacheBenchmark.java b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/DuplicateIDCacheBenchmark.java new file mode 100644 index 0000000000..21505bf3d8 --- /dev/null +++ b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/DuplicateIDCacheBenchmark.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.performance.jmh; + +import java.util.SplittableRandom; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; +import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; +import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; + +@State(Scope.Benchmark) +@Fork(2) +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 8, time = 1) +public class DuplicateIDCacheBenchmark { + + @Param({"20000"}) + private int size; + @Param({"false", "true"}) + private boolean persist; + + private DuplicateIDCache cache; + + private byte[][] ids; + private int idsMask; + private int missingIdsMask; + private long nextId; + private byte[][] randomEvictedIds; + + @Setup + public void init() throws Exception { + cache = new DuplicateIDCacheImpl(SimpleString.toSimpleString("benchmark"), size, new NullStorageManager(), persist); + final int idSize = findNextHigherPowerOf2(size); + idsMask = idSize - 1; + nextId = 0; + ids = new byte[idSize][]; + // fill the cache + for (int i = 0; i < size; i++) { + final byte[] id = RandomUtil.randomBytes(); + ids[i] = id; + cache.addToCache(id, null, true); + } + // evict the first (idSize - size) elements on the ids array. + // Given that being a FIFO cache isn't a stable contract it's going to validate it too. + final int evicted = idSize - size; + for (int i = 0; i < evicted; i++) { + final byte[] id = RandomUtil.randomBytes(); + ids[size + i] = id; + cache.addToCache(id, null, true); + // check correctness of eviction policy + if (cache.contains(ids[i])) { + throw new AssertionError("This cache isn't using anymore a FIFO eviction strategy or its real capacity is > " + size); + } + } + // always use the same seed! + SplittableRandom random = new SplittableRandom(0); + // set it big enough to trick branch predictors + final int evictedIdsLength = findNextPowerOf2(Math.max(1024, evicted)); + missingIdsMask = evictedIdsLength - 1; + randomEvictedIds = new byte[evictedIdsLength][]; + for (int i = 0; i < evictedIdsLength; i++) { + final int id = random.nextInt(0, evicted); + randomEvictedIds[i] = ids[id]; + // check correctness of eviction policy + if (cache.contains(ids[id])) { + throw new AssertionError("This cache isn't using anymore a FIFO eviction strategy"); + } + } + } + + // it isn't checking what's the max power of 2 number nor if size > 0 + private static int findNextHigherPowerOf2(int size) { + final int nextPow2 = findNextPowerOf2(size); + if (nextPow2 > size) { + return nextPow2; + } + return nextPow2 * 2; + } + + private static int findNextPowerOf2(int size) { + return 1 << (32 - Integer.numberOfLeadingZeros(size - 1)); + } + + private byte[] nextId() { + final long seq = nextId; + final int index = (int) (seq & idsMask); + nextId = seq + 1; + return ids[index]; + } + + private byte[] nextMissingId() { + final long seq = nextId; + final int index = (int) (seq & missingIdsMask); + nextId = seq + 1; + return randomEvictedIds[index]; + } + + @Benchmark + public boolean atomicVerify() throws Exception { + return cache.atomicVerify(nextId(), null); + } + + @Benchmark + public boolean containsMissingId() { + return cache.contains(nextMissingId()); + } + + @TearDown + public void clear() throws Exception { + cache.clear(); + } + +} From b3b5d4893cb1f59b6c0093fd72d9daa7c7cc1d91 Mon Sep 17 00:00:00 2001 From: franz1981 Date: Mon, 30 Nov 2020 16:12:54 +0100 Subject: [PATCH 2/3] ARTEMIS-3016 Reduce DuplicateIDCache memory footprint --- .../artemis/api/core/ObjLongPair.java | 99 +++++++++++ .../activemq/artemis/utils/ByteUtil.java | 46 +++++ .../activemq/artemis/utils/ByteUtilTest.java | 48 +++++- .../postoffice/impl/DuplicateIDCacheImpl.java | 161 +++++++++++------- .../jmh/ByteArrayHashCodeBenchamark.java | 97 +++++++++++ 5 files changed, 385 insertions(+), 66 deletions(-) create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ObjLongPair.java create mode 100644 tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/ByteArrayHashCodeBenchamark.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ObjLongPair.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ObjLongPair.java new file mode 100644 index 0000000000..448d008ef8 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ObjLongPair.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core; + +import java.io.Serializable; +import java.util.Objects; + +/** + * A Pair is a holder for 1 object and a >= 0 primitive long value. + *

+ * This is a utility class. + */ +public class ObjLongPair implements Serializable { + + private static final long serialVersionUID = 7749478219139339853L; + + public static final long NIL = -1; + + public ObjLongPair(final A a, final long b) { + this.a = a; + this.b = b; + if (b < 0 && b != NIL) { + throw new IllegalStateException("b must be >= 0 or == NIL = " + NIL); + } + } + + public ObjLongPair(final A a) { + this.a = a; + this.b = NIL; + } + + private A a; + private long b; + + @Override + public int hashCode() { + if (a == null && b == NIL) { + return super.hashCode(); + } + // it's ok to use b to compute hashCode although NIL + return (a == null ? 0 : a.hashCode()) + 37 * Long.hashCode(b); + } + + @Override + public boolean equals(final Object other) { + if (other == this) { + return true; + } + + if (other instanceof ObjLongPair == false) { + return false; + } + + ObjLongPair pother = (ObjLongPair) other; + + return (Objects.equals(pother.a, a)) && (pother.b == b); + } + + @Override + public String toString() { + return "ObjLongPair[a=" + a + ", b=" + (b == NIL ? "NIL" : b) + "]"; + } + + public void setA(A a) { + if (this.a == a) { + return; + } + this.a = a; + } + + public A getA() { + return a; + } + + public void setB(long b) { + if (b < 0 && b != NIL) { + throw new IllegalStateException("b must be >= 0 or == NIL = " + NIL); + } + this.b = b; + } + + public long getB() { + return b; + } +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java index 8bd07b07c4..94078153e0 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java @@ -275,6 +275,52 @@ public class ByteUtil { } } + public static int hashCode(byte[] bytes) { + if (PlatformDependent.hasUnsafe() && PlatformDependent.isUnaligned()) { + return unsafeHashCode(bytes); + } + return Arrays.hashCode(bytes); + } + + /** + * This hash code computation is borrowed by {@link io.netty.buffer.ByteBufUtil#hashCode(ByteBuf)}. + */ + private static int unsafeHashCode(byte[] bytes) { + if (bytes == null) { + return 0; + } + final int len = bytes.length; + int hashCode = 1; + final int intCount = len >>> 2; + int arrayIndex = 0; + // reading in batch both help hash code computation data dependencies and save memory bandwidth + for (int i = 0; i < intCount; i++) { + hashCode = 31 * hashCode + PlatformDependent.getInt(bytes, arrayIndex); + arrayIndex += Integer.BYTES; + } + final byte remaining = (byte) (len & 3); + if (remaining > 0) { + hashCode = unsafeUnrolledHashCode(bytes, arrayIndex, remaining, hashCode); + } + return hashCode == 0 ? 1 : hashCode; + } + + private static int unsafeUnrolledHashCode(byte[] bytes, int index, int bytesCount, int h) { + // there is still the hash data dependency but is more friendly + // then a plain loop, given that we know no loop is needed here + assert bytesCount > 0 && bytesCount < 4; + h = 31 * h + PlatformDependent.getByte(bytes, index); + if (bytesCount == 1) { + return h; + } + h = 31 * h + PlatformDependent.getByte(bytes, index + 1); + if (bytesCount == 2) { + return h; + } + h = 31 * h + PlatformDependent.getByte(bytes, index + 2); + return h; + } + public static boolean equals(final byte[] left, final byte[] right) { return equals(left, right, 0, right.length); } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java index f95d22733c..ced2062784 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java @@ -17,17 +17,23 @@ package org.apache.activemq.artemis.utils; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.ReadOnlyBufferException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Map; import io.netty.util.internal.PlatformDependent; import org.jboss.logging.Logger; import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; + import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import org.junit.Test; public class ByteUtilTest { @@ -83,6 +89,46 @@ public class ByteUtilTest { } } + @Test + public void testUnsafeUnalignedByteArrayHashCode() { + Assume.assumeTrue(PlatformDependent.hasUnsafe()); + Assume.assumeTrue(PlatformDependent.isUnaligned()); + Map map = new LinkedHashMap<>(); + map.put(new byte[0], 1); + map.put(new byte[]{1}, 32); + map.put(new byte[]{2}, 33); + map.put(new byte[]{0, 1}, 962); + map.put(new byte[]{1, 2}, 994); + if (ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN) { + map.put(new byte[]{0, 1, 2, 3, 4, 5}, 63504931); + map.put(new byte[]{6, 7, 8, 9, 0, 1}, -1603953111); + map.put(new byte[]{-1, -1, -1, (byte) 0xE1}, 1); + } else { + map.put(new byte[]{0, 1, 2, 3, 4, 5}, 1250309600); + map.put(new byte[]{6, 7, 8, 9, 0, 1}, -417148442); + map.put(new byte[]{-1, -1, -1, (byte) 0xE1}, -503316450); + } + for (Map.Entry e : map.entrySet()) { + assertEquals("input = " + Arrays.toString(e.getKey()), e.getValue().intValue(), ByteUtil.hashCode(e.getKey())); + } + } + + @Test + public void testNoUnsafeAlignedByteArrayHashCode() { + Assume.assumeFalse(PlatformDependent.hasUnsafe()); + Assume.assumeFalse(PlatformDependent.isUnaligned()); + ArrayList inputs = new ArrayList<>(); + inputs.add(new byte[0]); + inputs.add(new byte[]{1}); + inputs.add(new byte[]{2}); + inputs.add(new byte[]{0, 1}); + inputs.add(new byte[]{1, 2}); + inputs.add(new byte[]{0, 1, 2, 3, 4, 5}); + inputs.add(new byte[]{6, 7, 8, 9, 0, 1}); + inputs.add(new byte[]{-1, -1, -1, (byte) 0xE1}); + inputs.forEach(input -> assertEquals("input = " + Arrays.toString(input), Arrays.hashCode(input), ByteUtil.hashCode(input))); + } + @Test public void testTextBytesToLongBytesNegative() { try { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java index 7f1604530d..4dfc765026 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java @@ -16,12 +16,15 @@ */ package org.apache.activemq.artemis.core.postoffice.impl; +import java.lang.ref.Reference; +import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; +import org.apache.activemq.artemis.api.core.ObjLongPair; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -33,6 +36,8 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract import org.apache.activemq.artemis.utils.ByteUtil; import org.jboss.logging.Logger; +import static org.apache.activemq.artemis.api.core.ObjLongPair.NIL; + /** * A DuplicateIDCacheImpl * @@ -41,6 +46,23 @@ import org.jboss.logging.Logger; public class DuplicateIDCacheImpl implements DuplicateIDCache { private static final Logger logger = Logger.getLogger(DuplicateIDCacheImpl.class); + // we're not interested into safe publication here: we need to scale, be fast and save "some" GC to happen + private static WeakReference INDEXES = null; + + private static Integer[] boxedInts(int size) { + final Reference indexesRef = INDEXES; + final Integer[] indexes = indexesRef == null ? null : indexesRef.get(); + if (indexes != null && size <= indexes.length) { + return indexes; + } + final int newSize = size + (indexes == null ? 0 : size / 2); + final Integer[] newIndexes = new Integer[newSize]; + if (indexes != null) { + System.arraycopy(indexes, 0, newIndexes, 0, indexes.length); + } + INDEXES = new WeakReference<>(newIndexes); + return newIndexes; + } // ByteHolder, position private final Map cache = new ConcurrentHashMap<>(); @@ -49,7 +71,9 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { // Note - deliberately typed as ArrayList since we want to ensure fast indexed // based array access - private final ArrayList> ids; + private final ArrayList> ids; + + private final Integer[] cachedBoxedInts; private int pos; @@ -69,11 +93,24 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { ids = new ArrayList<>(size); + cachedBoxedInts = boxedInts(size); + this.storageManager = storageManager; this.persist = persist; } + // best effort caching mechanism + private Integer boxed(int index) { + Integer boxedInt = this.cachedBoxedInts[index]; + if (boxedInt == null) { + boxedInt = index; + cachedBoxedInts[index] = boxedInt; + } + assert boxedInt != null; + return boxedInt; + } + @Override public void load(final List> theIds) throws Exception { long txID = -1; @@ -98,9 +135,9 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { } else { ByteArrayHolder bah = new ByteArrayHolder(id.getA()); - Pair pair = new Pair<>(bah, id.getB()); + ObjLongPair pair = new ObjLongPair<>(bah, id.getB()); - cache.put(bah, ids.size()); + cache.put(bah, boxed(ids.size())); ids.add(pair); if (logger.isTraceEnabled()) { @@ -133,7 +170,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { Integer posUsed = cache.remove(bah); if (posUsed != null) { - Pair id; + ObjLongPair id; synchronized (this) { id = ids.get(posUsed.intValue()); @@ -144,7 +181,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { if (logger.isTraceEnabled()) { logger.trace("DuplicateIDCacheImpl(" + this.address + ")::deleteFromCache deleting id=" + describeID(duplicateID, id.getB())); } - id.setB(null); + id.setB(NIL); } } } @@ -161,10 +198,16 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { @Override public boolean contains(final byte[] duplID) { - boolean contains = cache.get(new ByteArrayHolder(duplID)) != null; + return contains(new ByteArrayHolder(duplID)); + } - if (contains) { - logger.trace("DuplicateIDCacheImpl(" + this.address + ")::constains found a duplicate " + describeID(duplID, 0)); + private boolean contains(final ByteArrayHolder duplID) { + boolean contains = cache.containsKey(duplID); + + if (logger.isTraceEnabled()) { + if (contains) { + logger.trace("DuplicateIDCacheImpl(" + this.address + ")::contains found a duplicate " + describeID(duplID.bytes, 0)); + } } return contains; } @@ -181,68 +224,68 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { @Override public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception { - - if (contains(duplID)) { + final ByteArrayHolder holder = new ByteArrayHolder(duplID); + if (contains(holder)) { if (tx != null) { tx.markAsRollbackOnly(new ActiveMQDuplicateIdException()); } return false; - } else { - addToCache(duplID, tx, true); - return true; } - + addToCache(holder, tx, true); + return true; } @Override public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception { - long recordID = -1; + addToCache(new ByteArrayHolder(duplID), tx, instantAdd); + } + private synchronized void addToCache(final ByteArrayHolder holder, + final Transaction tx, + boolean instantAdd) throws Exception { + long recordID = -1; if (tx == null) { if (persist) { recordID = storageManager.generateID(); - storageManager.storeDuplicateID(address, duplID, recordID); + storageManager.storeDuplicateID(address, holder.bytes, recordID); } - addToCacheInMemory(duplID, recordID); + addToCacheInMemory(holder, recordID); } else { if (persist) { recordID = storageManager.generateID(); - storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID); + storageManager.storeDuplicateIDTransactional(tx.getID(), address, holder.bytes, recordID); tx.setContainsPersistent(); } if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID) + ", tx=" + tx); + logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(holder.bytes, recordID) + ", tx=" + tx); } - if (instantAdd) { - tx.addOperation(new AddDuplicateIDOperation(duplID, recordID, false)); + tx.addOperation(new AddDuplicateIDOperation(holder, recordID, false)); } else { // For a tx, it's important that the entry is not added to the cache until commit // since if the client fails then resends them tx we don't want it to get rejected - tx.afterStore(new AddDuplicateIDOperation(duplID, recordID, true)); + tx.afterStore(new AddDuplicateIDOperation(holder, recordID, true)); } } } @Override public void load(final Transaction tx, final byte[] duplID) { - tx.addOperation(new AddDuplicateIDOperation(duplID, tx.getID(), true)); + tx.addOperation(new AddDuplicateIDOperation(new ByteArrayHolder(duplID), tx.getID(), true)); } - private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID) { + private synchronized void addToCacheInMemory(final ByteArrayHolder holder, final long recordID) { if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding " + describeID(duplID, recordID)); + logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding " + describeID(holder.bytes, recordID)); } - ByteArrayHolder holder = new ByteArrayHolder(duplID); + cache.put(holder, boxed(pos)); - cache.put(holder, pos); - - Pair id; + ObjLongPair id; if (pos < ids.size()) { // Need fast array style access here -hence ArrayList typing @@ -260,7 +303,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { // Note we can't use update since journal update doesn't let older records get // reclaimed - if (id.getB() != null) { + if (id.getB() != NIL) { try { storageManager.deleteDuplicateID(id.getB()); } catch (Exception e) { @@ -273,15 +316,14 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { // The recordID could be negative if the duplicateCache is configured to not persist, // -1 would mean null on this case - id.setB(recordID >= 0 ? recordID : null); + id.setB(recordID >= 0 ? recordID : NIL); if (logger.isTraceEnabled()) { logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory replacing old duplicateID by " + describeID(id.getA().bytes, id.getB())); } - holder.pos = pos; } else { - id = new Pair<>(holder, recordID >= 0 ? recordID : null); + id = new ObjLongPair<>(holder, recordID >= 0 ? recordID : NIL); if (logger.isTraceEnabled()) { logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding new duplicateID " + describeID(id.getA().bytes, id.getB())); @@ -289,7 +331,6 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { ids.add(id); - holder.pos = pos; } if (pos++ == cacheSize - 1) { @@ -301,10 +342,12 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { public void clear() throws Exception { logger.debug("DuplicateIDCacheImpl(" + this.address + ")::clear removing duplicate ID data"); synchronized (this) { - if (ids.size() > 0 && persist) { + final int idsSize = ids.size(); + if (idsSize > 0 && persist) { long tx = storageManager.generateID(); - for (Pair id : ids) { - if (id != null) { + for (int i = 0; i < idsSize; i++) { + final ObjLongPair id = ids.get(i); + if (id != null && id.getB() != NIL) { storageManager.deleteDuplicateIDTransactional(tx, id.getB()); } } @@ -319,16 +362,18 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { @Override public List> getMap() { - List> list = new ArrayList<>(); - for (Pair id : ids) { - list.add(new Pair<>(id.getA().bytes, id.getB())); + final int idsSize = ids.size(); + List> copy = new ArrayList<>(idsSize); + for (int i = 0; i < idsSize; i++) { + final ObjLongPair id = ids.get(i); + copy.add(new Pair<>(id.getA().bytes, id.getB() == NIL ? null : id.getB())); } - return list; + return copy; } private final class AddDuplicateIDOperation extends TransactionOperationAbstract { - final byte[] duplID; + final ByteArrayHolder holder; final long recordID; @@ -336,15 +381,15 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { private final boolean afterCommit; - AddDuplicateIDOperation(final byte[] duplID, final long recordID, boolean afterCommit) { - this.duplID = duplID; + AddDuplicateIDOperation(final ByteArrayHolder holder, final long recordID, boolean afterCommit) { + this.holder = holder; this.recordID = recordID; this.afterCommit = afterCommit; } private void process() { if (!done) { - addToCacheInMemory(duplID, recordID); + addToCacheInMemory(holder, recordID); done = true; } @@ -372,32 +417,20 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { private static final class ByteArrayHolder { + private final byte[] bytes; + + private int hash; + ByteArrayHolder(final byte[] bytes) { this.bytes = bytes; } - final byte[] bytes; - - int hash; - - int pos; - @Override public boolean equals(final Object other) { if (other instanceof ByteArrayHolder) { ByteArrayHolder s = (ByteArrayHolder) other; - if (bytes.length != s.bytes.length) { - return false; - } - - for (int i = 0; i < bytes.length; i++) { - if (bytes[i] != s.bytes[i]) { - return false; - } - } - - return true; + return ByteUtil.equals(bytes, s.bytes); } else { return false; } @@ -406,9 +439,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { @Override public int hashCode() { if (hash == 0) { - for (byte b : bytes) { - hash = 31 * hash + b; - } + hash = ByteUtil.hashCode(bytes); } return hash; diff --git a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/ByteArrayHashCodeBenchamark.java b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/ByteArrayHashCodeBenchamark.java new file mode 100644 index 0000000000..80b2d5308c --- /dev/null +++ b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/ByteArrayHashCodeBenchamark.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.performance.jmh; + +import java.util.Arrays; +import java.util.SplittableRandom; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.utils.ByteUtil; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +@State(Scope.Benchmark) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Fork(2) +@Warmup(iterations = 5, time = 200, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 8, time = 200, timeUnit = TimeUnit.MILLISECONDS) +public class ByteArrayHashCodeBenchamark { + + @Param({"7", "8", "36"}) + private int length; + + @Param("0") + private int seed; + + /** + * The higher the less predictable for the CPU branch predictor. + */ + @Param({"4", "10"}) + private int logPermutations; + + @Param({"true"}) + private boolean unsafe; + + private long seq; + private int inputMask; + private byte[][] inputs; + + @Setup + public void init() { + System.setProperty("io.netty.noUnsafe", Boolean.valueOf(!unsafe).toString()); + int inputSize = 1 << logPermutations; + inputs = new byte[inputSize][]; + inputMask = inputs.length - 1; + // splittable random can create repeatable sequence of inputs + SplittableRandom random = new SplittableRandom(seed); + for (int i = 0; i < inputs.length; i++) { + final byte[] bytes = new byte[length]; + for (int b = 0; b < length; b++) { + bytes[b] = (byte) random.nextInt(Byte.MIN_VALUE, Byte.MAX_VALUE + 1); + } + inputs[i] = bytes; + } + } + + private byte[] nextInput() { + final long seq = this.seq; + final int index = (int) (seq & inputMask); + this.seq = seq + 1; + return inputs[index]; + } + + @Benchmark + public int artemisHashCode() { + return ByteUtil.hashCode(nextInput()); + } + + @Benchmark + public int arraysHashCode() { + return Arrays.hashCode(nextInput()); + } + +} From 2b5d99bbd1697728da071ecafa585b0b6168979d Mon Sep 17 00:00:00 2001 From: franz1981 Date: Wed, 2 Dec 2020 21:38:11 +0100 Subject: [PATCH 3/3] ARTEMIS-3016 Refactored duplicate ids cache --- .../core/postoffice/DuplicateIDCache.java | 2 +- .../core/postoffice/impl/ByteArray.java | 50 ++ .../postoffice/impl/DuplicateIDCacheImpl.java | 448 ------------------ .../postoffice/impl/DuplicateIDCaches.java | 39 ++ .../impl/InMemoryDuplicateIDCache.java | 276 +++++++++++ .../core/postoffice/impl/IntegerCache.java | 62 +++ .../impl/PersistentDuplicateIDCache.java | 394 +++++++++++++++ .../core/postoffice/impl/PostOfficeImpl.java | 44 +- .../persistence/DuplicateCacheTest.java | 6 +- .../jmh/DuplicateIDCacheBenchmark.java | 6 +- .../impl/DuplicateDetectionUnitTest.java | 9 +- .../server/impl/fakes/FakePostOffice.java | 5 +- 12 files changed, 860 insertions(+), 481 deletions(-) create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/ByteArray.java delete mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCaches.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/IntegerCache.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java index b384896efa..75cc17b7d3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java @@ -40,7 +40,7 @@ public interface DuplicateIDCache { void deleteFromCache(byte[] duplicateID) throws Exception; - void load(List> theIds) throws Exception; + void load(List> ids) throws Exception; void load(Transaction tx, byte[] duplID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/ByteArray.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/ByteArray.java new file mode 100644 index 0000000000..8f2c67e38f --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/ByteArray.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.postoffice.impl; + +import org.apache.activemq.artemis.utils.ByteUtil; + +final class ByteArray { + + final byte[] bytes; + + private int hash; + + ByteArray(final byte[] bytes) { + this.bytes = bytes; + } + + @Override + public boolean equals(final Object other) { + if (other instanceof ByteArray) { + ByteArray s = (ByteArray) other; + + return ByteUtil.equals(bytes, s.bytes); + } else { + return false; + } + } + + @Override + public int hashCode() { + if (hash == 0) { + hash = ByteUtil.hashCode(bytes); + } + + return hash; + } +} \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java deleted file mode 100644 index 4dfc765026..0000000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java +++ /dev/null @@ -1,448 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.postoffice.impl; - -import java.lang.ref.Reference; -import java.lang.ref.WeakReference; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; -import org.apache.activemq.artemis.api.core.ObjLongPair; -import org.apache.activemq.artemis.api.core.Pair; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.persistence.StorageManager; -import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; -import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.transaction.Transaction; -import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; -import org.apache.activemq.artemis.utils.ByteUtil; -import org.jboss.logging.Logger; - -import static org.apache.activemq.artemis.api.core.ObjLongPair.NIL; - -/** - * A DuplicateIDCacheImpl - * - * A fixed size rotating cache of last X duplicate ids. - */ -public class DuplicateIDCacheImpl implements DuplicateIDCache { - - private static final Logger logger = Logger.getLogger(DuplicateIDCacheImpl.class); - // we're not interested into safe publication here: we need to scale, be fast and save "some" GC to happen - private static WeakReference INDEXES = null; - - private static Integer[] boxedInts(int size) { - final Reference indexesRef = INDEXES; - final Integer[] indexes = indexesRef == null ? null : indexesRef.get(); - if (indexes != null && size <= indexes.length) { - return indexes; - } - final int newSize = size + (indexes == null ? 0 : size / 2); - final Integer[] newIndexes = new Integer[newSize]; - if (indexes != null) { - System.arraycopy(indexes, 0, newIndexes, 0, indexes.length); - } - INDEXES = new WeakReference<>(newIndexes); - return newIndexes; - } - - // ByteHolder, position - private final Map cache = new ConcurrentHashMap<>(); - - private final SimpleString address; - - // Note - deliberately typed as ArrayList since we want to ensure fast indexed - // based array access - private final ArrayList> ids; - - private final Integer[] cachedBoxedInts; - - private int pos; - - private final int cacheSize; - - private final StorageManager storageManager; - - private final boolean persist; - - public DuplicateIDCacheImpl(final SimpleString address, - final int size, - final StorageManager storageManager, - final boolean persist) { - this.address = address; - - cacheSize = size; - - ids = new ArrayList<>(size); - - cachedBoxedInts = boxedInts(size); - - this.storageManager = storageManager; - - this.persist = persist; - } - - // best effort caching mechanism - private Integer boxed(int index) { - Integer boxedInt = this.cachedBoxedInts[index]; - if (boxedInt == null) { - boxedInt = index; - cachedBoxedInts[index] = boxedInt; - } - assert boxedInt != null; - return boxedInt; - } - - @Override - public void load(final List> theIds) throws Exception { - long txID = -1; - - // If we have more IDs than cache size, we shrink the first ones - int deleteCount = theIds.size() - cacheSize; - if (deleteCount < 0) { - deleteCount = 0; - } - - for (Pair id : theIds) { - if (deleteCount > 0) { - if (txID == -1) { - txID = storageManager.generateID(); - } - if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl::load deleting id=" + describeID(id.getA(), id.getB())); - } - - storageManager.deleteDuplicateIDTransactional(txID, id.getB()); - deleteCount--; - } else { - ByteArrayHolder bah = new ByteArrayHolder(id.getA()); - - ObjLongPair pair = new ObjLongPair<>(bah, id.getB()); - - cache.put(bah, boxed(ids.size())); - - ids.add(pair); - if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl::load loading id=" + describeID(id.getA(), id.getB())); - } - } - - } - - if (txID != -1) { - storageManager.commit(txID); - } - - pos = ids.size(); - - if (pos == cacheSize) { - pos = 0; - } - - } - - @Override - public void deleteFromCache(byte[] duplicateID) throws Exception { - if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl::deleteFromCache deleting id=" + describeID(duplicateID, 0)); - } - - ByteArrayHolder bah = new ByteArrayHolder(duplicateID); - - Integer posUsed = cache.remove(bah); - - if (posUsed != null) { - ObjLongPair id; - - synchronized (this) { - id = ids.get(posUsed.intValue()); - - if (id.getA().equals(bah)) { - id.setA(null); - storageManager.deleteDuplicateID(id.getB()); - if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl(" + this.address + ")::deleteFromCache deleting id=" + describeID(duplicateID, id.getB())); - } - id.setB(NIL); - } - } - } - - } - - private String describeID(byte[] duplicateID, long id) { - if (id != 0) { - return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID); - } else { - return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID) + ", id=" + id; - } - } - - @Override - public boolean contains(final byte[] duplID) { - return contains(new ByteArrayHolder(duplID)); - } - - private boolean contains(final ByteArrayHolder duplID) { - boolean contains = cache.containsKey(duplID); - - if (logger.isTraceEnabled()) { - if (contains) { - logger.trace("DuplicateIDCacheImpl(" + this.address + ")::contains found a duplicate " + describeID(duplID.bytes, 0)); - } - } - return contains; - } - - @Override - public void addToCache(final byte[] duplID) throws Exception { - addToCache(duplID, null, false); - } - - @Override - public void addToCache(final byte[] duplID, final Transaction tx) throws Exception { - addToCache(duplID, tx, false); - } - - @Override - public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception { - final ByteArrayHolder holder = new ByteArrayHolder(duplID); - if (contains(holder)) { - if (tx != null) { - tx.markAsRollbackOnly(new ActiveMQDuplicateIdException()); - } - return false; - } - addToCache(holder, tx, true); - return true; - } - - @Override - public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception { - addToCache(new ByteArrayHolder(duplID), tx, instantAdd); - } - - private synchronized void addToCache(final ByteArrayHolder holder, - final Transaction tx, - boolean instantAdd) throws Exception { - long recordID = -1; - if (tx == null) { - if (persist) { - recordID = storageManager.generateID(); - storageManager.storeDuplicateID(address, holder.bytes, recordID); - } - - addToCacheInMemory(holder, recordID); - } else { - if (persist) { - recordID = storageManager.generateID(); - storageManager.storeDuplicateIDTransactional(tx.getID(), address, holder.bytes, recordID); - - tx.setContainsPersistent(); - } - - if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(holder.bytes, recordID) + ", tx=" + tx); - } - - if (instantAdd) { - tx.addOperation(new AddDuplicateIDOperation(holder, recordID, false)); - } else { - // For a tx, it's important that the entry is not added to the cache until commit - // since if the client fails then resends them tx we don't want it to get rejected - tx.afterStore(new AddDuplicateIDOperation(holder, recordID, true)); - } - } - } - - @Override - public void load(final Transaction tx, final byte[] duplID) { - tx.addOperation(new AddDuplicateIDOperation(new ByteArrayHolder(duplID), tx.getID(), true)); - } - - private synchronized void addToCacheInMemory(final ByteArrayHolder holder, final long recordID) { - if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding " + describeID(holder.bytes, recordID)); - } - - cache.put(holder, boxed(pos)); - - ObjLongPair id; - - if (pos < ids.size()) { - // Need fast array style access here -hence ArrayList typing - id = ids.get(pos); - - // The id here might be null if it was explicit deleted - if (id.getA() != null) { - if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory removing excess duplicateDetection " + describeID(id.getA().bytes, id.getB())); - } - - cache.remove(id.getA()); - - // Record already exists - we delete the old one and add the new one - // Note we can't use update since journal update doesn't let older records get - // reclaimed - - if (id.getB() != NIL) { - try { - storageManager.deleteDuplicateID(id.getB()); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorDeletingDuplicateCache(e); - } - } - } - - id.setA(holder); - - // The recordID could be negative if the duplicateCache is configured to not persist, - // -1 would mean null on this case - id.setB(recordID >= 0 ? recordID : NIL); - - if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory replacing old duplicateID by " + describeID(id.getA().bytes, id.getB())); - } - - } else { - id = new ObjLongPair<>(holder, recordID >= 0 ? recordID : NIL); - - if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding new duplicateID " + describeID(id.getA().bytes, id.getB())); - } - - ids.add(id); - - } - - if (pos++ == cacheSize - 1) { - pos = 0; - } - } - - @Override - public void clear() throws Exception { - logger.debug("DuplicateIDCacheImpl(" + this.address + ")::clear removing duplicate ID data"); - synchronized (this) { - final int idsSize = ids.size(); - if (idsSize > 0 && persist) { - long tx = storageManager.generateID(); - for (int i = 0; i < idsSize; i++) { - final ObjLongPair id = ids.get(i); - if (id != null && id.getB() != NIL) { - storageManager.deleteDuplicateIDTransactional(tx, id.getB()); - } - } - storageManager.commit(tx); - } - - ids.clear(); - cache.clear(); - pos = 0; - } - } - - @Override - public List> getMap() { - final int idsSize = ids.size(); - List> copy = new ArrayList<>(idsSize); - for (int i = 0; i < idsSize; i++) { - final ObjLongPair id = ids.get(i); - copy.add(new Pair<>(id.getA().bytes, id.getB() == NIL ? null : id.getB())); - } - return copy; - } - - private final class AddDuplicateIDOperation extends TransactionOperationAbstract { - - final ByteArrayHolder holder; - - final long recordID; - - volatile boolean done; - - private final boolean afterCommit; - - AddDuplicateIDOperation(final ByteArrayHolder holder, final long recordID, boolean afterCommit) { - this.holder = holder; - this.recordID = recordID; - this.afterCommit = afterCommit; - } - - private void process() { - if (!done) { - addToCacheInMemory(holder, recordID); - - done = true; - } - } - - @Override - public void afterCommit(final Transaction tx) { - if (afterCommit) { - process(); - } - } - - @Override - public void beforeCommit(Transaction tx) throws Exception { - if (!afterCommit) { - process(); - } - } - - @Override - public List getRelatedMessageReferences() { - return null; - } - } - - private static final class ByteArrayHolder { - - private final byte[] bytes; - - private int hash; - - ByteArrayHolder(final byte[] bytes) { - this.bytes = bytes; - } - - @Override - public boolean equals(final Object other) { - if (other instanceof ByteArrayHolder) { - ByteArrayHolder s = (ByteArrayHolder) other; - - return ByteUtil.equals(bytes, s.bytes); - } else { - return false; - } - } - - @Override - public int hashCode() { - if (hash == 0) { - hash = ByteUtil.hashCode(bytes); - } - - return hash; - } - } -} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCaches.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCaches.java new file mode 100644 index 0000000000..79b55e8145 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCaches.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.postoffice.impl; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; + +public final class DuplicateIDCaches { + + private DuplicateIDCaches() { + + } + + public static DuplicateIDCache persistent(final SimpleString address, + final int size, + final StorageManager storageManager) { + return new PersistentDuplicateIDCache(address, size, storageManager); + } + + public static DuplicateIDCache inMemory(final SimpleString address, final int size) { + return new InMemoryDuplicateIDCache(address, size); + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java new file mode 100644 index 0000000000..f1f86142df --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.postoffice.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.IntFunction; + +import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; +import org.apache.activemq.artemis.utils.ByteUtil; +import org.jboss.logging.Logger; + +import static org.apache.activemq.artemis.core.postoffice.impl.IntegerCache.boxedInts; + +/** + * {@link InMemoryDuplicateIDCache} and {@link PersistentDuplicateIDCache} impls have been separated for performance + * and memory footprint reasons.
+ * Instead of using a single {@link DuplicateIDCache} impl, we've let 2 different impls to contain just the bare + * minimum data in order to have 2 different memory footprint costs at runtime, while making easier to track dependencies + * eg in-memory cache won't need any {@link StorageManager} because no storage operations are expected to happen. + */ +final class InMemoryDuplicateIDCache implements DuplicateIDCache { + + private static final Logger LOGGER = Logger.getLogger(InMemoryDuplicateIDCache.class); + + private final Map cache = new ConcurrentHashMap<>(); + + private final SimpleString address; + + private final ArrayList ids; + + private final IntFunction cachedBoxedInts; + + private int pos; + + private final int cacheSize; + + InMemoryDuplicateIDCache(final SimpleString address, final int size) { + this.address = address; + + cacheSize = size; + + ids = new ArrayList<>(size); + + cachedBoxedInts = boxedInts(size); + } + + @Override + public void load(List> ids) throws Exception { + LOGGER.debugf("address = %s ignore loading ids: in memory cache won't load previously stored ids", address); + } + + @Override + public void deleteFromCache(byte[] duplicateID) { + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("deleting id = %s", describeID(duplicateID)); + } + + ByteArray bah = new ByteArray(duplicateID); + + Integer posUsed = cache.remove(bah); + + if (posUsed != null) { + ByteArray id; + + synchronized (this) { + final int index = posUsed.intValue(); + id = ids.get(index); + + if (id.equals(bah)) { + ids.set(index, null); + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s deleting id=", address, describeID(duplicateID)); + } + } + } + } + + } + + private static String describeID(byte[] duplicateID) { + return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID); + } + + @Override + public boolean contains(final byte[] duplID) { + return contains(new ByteArray(duplID)); + } + + private boolean contains(final ByteArray id) { + boolean contains = cache.containsKey(id); + + if (LOGGER.isTraceEnabled()) { + if (contains) { + LOGGER.tracef("address = %s found a duplicate ", address, describeID(id.bytes)); + } + } + return contains; + } + + @Override + public void addToCache(final byte[] duplID) throws Exception { + addToCache(duplID, null, false); + } + + @Override + public void addToCache(final byte[] duplID, final Transaction tx) throws Exception { + addToCache(duplID, tx, false); + } + + @Override + public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) { + final ByteArray holder = new ByteArray(duplID); + if (contains(holder)) { + if (tx != null) { + tx.markAsRollbackOnly(new ActiveMQDuplicateIdException()); + } + return false; + } + addToCache(holder, tx, true); + return true; + } + + @Override + public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception { + addToCache(new ByteArray(duplID), tx, instantAdd); + } + + private synchronized void addToCache(final ByteArray holder, final Transaction tx, boolean instantAdd) { + if (tx == null) { + addToCacheInMemory(holder); + } else { + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s adding duplicateID TX operation for %s, tx = %s", address, describeID(holder.bytes), tx); + } + + if (instantAdd) { + tx.addOperation(new AddDuplicateIDOperation(holder, false)); + } else { + // For a tx, it's important that the entry is not added to the cache until commit + // since if the client fails then resends them tx we don't want it to get rejected + tx.afterStore(new AddDuplicateIDOperation(holder, true)); + } + } + } + + @Override + public void load(final Transaction tx, final byte[] duplID) { + tx.addOperation(new AddDuplicateIDOperation(new ByteArray(duplID), true)); + } + + private synchronized void addToCacheInMemory(final ByteArray holder) { + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s adding %s", address, describeID(holder.bytes)); + } + + cache.put(holder, cachedBoxedInts.apply(pos)); + + if (pos < ids.size()) { + // Need fast array style access here -hence ArrayList typing + final ByteArray id = ids.set(pos, holder); + + // The id here might be null if it was explicit deleted + if (id != null) { + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s removing excess duplicateDetection %s", address, describeID(id.bytes)); + } + + cache.remove(id); + } + + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s replacing old duplicateID by %s", describeID(holder.bytes)); + } + + } else { + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s adding new duplicateID %s", describeID(holder.bytes)); + } + + ids.add(holder); + } + + if (pos++ == cacheSize - 1) { + pos = 0; + } + } + + @Override + public synchronized void clear() throws Exception { + if (LOGGER.isDebugEnabled()) { + LOGGER.debugf("address = %s removing duplicate ID data", address); + } + ids.clear(); + cache.clear(); + pos = 0; + } + + @Override + public synchronized List> getMap() { + final int idsSize = ids.size(); + List> copy = new ArrayList<>(idsSize); + for (int i = 0; i < idsSize; i++) { + final ByteArray id = ids.get(i); + // in case the id has been removed + if (id != null) { + copy.add(new Pair<>(id.bytes, null)); + } + } + return copy; + } + + private final class AddDuplicateIDOperation extends TransactionOperationAbstract { + + final ByteArray id; + + volatile boolean done; + + private final boolean afterCommit; + + AddDuplicateIDOperation(final ByteArray id, boolean afterCommit) { + this.id = id; + this.afterCommit = afterCommit; + } + + private void process() { + if (!done) { + addToCacheInMemory(id); + + done = true; + } + } + + @Override + public void afterCommit(final Transaction tx) { + if (afterCommit) { + process(); + } + } + + @Override + public void beforeCommit(Transaction tx) throws Exception { + if (!afterCommit) { + process(); + } + } + + @Override + public List getRelatedMessageReferences() { + return null; + } + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/IntegerCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/IntegerCache.java new file mode 100644 index 0000000000..a726552a54 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/IntegerCache.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.postoffice.impl; + +import java.lang.ref.Reference; +import java.lang.ref.WeakReference; +import java.util.function.IntFunction; + +final class IntegerCache { + + private static final boolean DISABLE_INTEGER_CACHE = Boolean.valueOf(System.getProperty("disable.integer.cache", Boolean.FALSE.toString())); + + // we're not interested into safe publication here: we need to scale, be fast and save "some" GC to happen + private static WeakReference INDEXES = null; + + private static Integer[] ints(int size) { + final Reference indexesRef = INDEXES; + final Integer[] indexes = indexesRef == null ? null : indexesRef.get(); + if (indexes != null && size <= indexes.length) { + return indexes; + } + final int newSize = size + (indexes == null ? 0 : size / 2); + final Integer[] newIndexes = new Integer[newSize]; + if (indexes != null) { + System.arraycopy(indexes, 0, newIndexes, 0, indexes.length); + } + INDEXES = new WeakReference<>(newIndexes); + return newIndexes; + } + + public static IntFunction boxedInts(int size) { + if (DISABLE_INTEGER_CACHE) { + return Integer::valueOf; + } + // use a lambda to have an trusted const field for free + final Integer[] cachedInts = ints(size); + return index -> { + Integer boxedInt = cachedInts[index]; + if (boxedInt == null) { + boxedInt = index; + cachedInts[index] = boxedInt; + } + assert boxedInt != null; + return boxedInt; + }; + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java new file mode 100644 index 0000000000..3e3758d3c9 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.postoffice.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.IntFunction; + +import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; +import org.apache.activemq.artemis.api.core.ObjLongPair; +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; +import org.apache.activemq.artemis.utils.ByteUtil; +import org.jboss.logging.Logger; + +import static org.apache.activemq.artemis.api.core.ObjLongPair.NIL; +import static org.apache.activemq.artemis.core.postoffice.impl.IntegerCache.boxedInts; + +/** + * {@link InMemoryDuplicateIDCache} and {@link PersistentDuplicateIDCache} impls have been separated for performance + * and memory footprint reasons.
+ * Instead of using a single {@link DuplicateIDCache} impl, we've let 2 different impls to contain just the bare + * minimum data in order to have 2 different memory footprint costs at runtime, while making easier to track dependencies + * eg in-memory cache won't need any {@link StorageManager} because no storage operations are expected to happen. + */ +final class PersistentDuplicateIDCache implements DuplicateIDCache { + + private static final Logger LOGGER = Logger.getLogger(PersistentDuplicateIDCache.class); + + private final Map cache = new ConcurrentHashMap<>(); + + private final SimpleString address; + + private final ArrayList> ids; + + private final IntFunction cachedBoxedInts; + + private int pos; + + private final int cacheSize; + + private final StorageManager storageManager; + + PersistentDuplicateIDCache(final SimpleString address, final int size, final StorageManager storageManager) { + this.address = address; + + cacheSize = size; + + ids = new ArrayList<>(size); + + cachedBoxedInts = boxedInts(size); + + this.storageManager = storageManager; + } + + @Override + public synchronized void load(final List> ids) throws Exception { + if (!cache.isEmpty()) { + throw new IllegalStateException("load is valid only on empty cache"); + } + // load only ids that fit this cache: + // - in term of remaining capacity + // - ignoring (and reporting) ids unpaired with record ID + // Then, delete the exceeding ones. + + long txID = -1; + + int toNotBeAdded = ids.size() - cacheSize; + if (toNotBeAdded < 0) { + toNotBeAdded = 0; + } + + for (Pair id : ids) { + if (id.getB() == null) { + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("ignoring id = %s because without record ID", describeID(id.getA())); + } + if (toNotBeAdded > 0) { + toNotBeAdded--; + } + continue; + } + assert id.getB() != null && id.getB().longValue() != NIL; + if (toNotBeAdded > 0) { + if (txID == -1) { + txID = storageManager.generateID(); + } + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("deleting id = %s", describeID(id.getA(), id.getB())); + } + + storageManager.deleteDuplicateIDTransactional(txID, id.getB()); + toNotBeAdded--; + } else { + ByteArray bah = new ByteArray(id.getA()); + + ObjLongPair pair = new ObjLongPair<>(bah, id.getB()); + + cache.put(bah, cachedBoxedInts.apply(this.ids.size())); + + this.ids.add(pair); + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("loading id = %s", describeID(id.getA(), id.getB())); + } + } + + } + + if (txID != -1) { + storageManager.commit(txID); + } + + pos = this.ids.size(); + + if (pos == cacheSize) { + pos = 0; + } + + } + + @Override + public void deleteFromCache(byte[] duplicateID) throws Exception { + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("deleting id = %s", describeID(duplicateID)); + } + + final ByteArray bah = new ByteArray(duplicateID); + + final Integer posUsed = cache.remove(bah); + + if (posUsed != null) { + synchronized (this) { + final ObjLongPair id = ids.get(posUsed.intValue()); + + if (id.getA().equals(bah)) { + final long recordID = id.getB(); + id.setA(null); + id.setB(NIL); + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s deleting id = %s", address, describeID(duplicateID, id.getB())); + } + storageManager.deleteDuplicateID(recordID); + } + } + } + + } + + private static String describeID(byte[] duplicateID) { + return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID); + } + + private static String describeID(byte[] duplicateID, long id) { + return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID) + ", id=" + id; + } + + @Override + public boolean contains(final byte[] duplID) { + return contains(new ByteArray(duplID)); + } + + private boolean contains(final ByteArray duplID) { + boolean contains = cache.containsKey(duplID); + + if (LOGGER.isTraceEnabled()) { + if (contains) { + LOGGER.tracef("address = %s found a duplicate %s", address, describeID(duplID.bytes)); + } + } + return contains; + } + + @Override + public void addToCache(final byte[] duplID) throws Exception { + addToCache(duplID, null, false); + } + + @Override + public void addToCache(final byte[] duplID, final Transaction tx) throws Exception { + addToCache(duplID, tx, false); + } + + @Override + public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception { + final ByteArray holder = new ByteArray(duplID); + if (contains(holder)) { + if (tx != null) { + tx.markAsRollbackOnly(new ActiveMQDuplicateIdException()); + } + return false; + } + addToCache(holder, tx, true); + return true; + } + + @Override + public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception { + addToCache(new ByteArray(duplID), tx, instantAdd); + } + + private synchronized void addToCache(final ByteArray holder, + final Transaction tx, + boolean instantAdd) throws Exception { + final long recordID = storageManager.generateID(); + if (tx == null) { + storageManager.storeDuplicateID(address, holder.bytes, recordID); + + addToCacheInMemory(holder, recordID); + } else { + storageManager.storeDuplicateIDTransactional(tx.getID(), address, holder.bytes, recordID); + + tx.setContainsPersistent(); + + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s adding duplicateID TX operation for %s, tx = %s", address, + describeID(holder.bytes, recordID), tx); + } + + if (instantAdd) { + tx.addOperation(new AddDuplicateIDOperation(holder, recordID, false)); + } else { + // For a tx, it's important that the entry is not added to the cache until commit + // since if the client fails then resends them tx we don't want it to get rejected + tx.afterStore(new AddDuplicateIDOperation(holder, recordID, true)); + } + } + } + + @Override + public void load(final Transaction tx, final byte[] duplID) { + tx.addOperation(new AddDuplicateIDOperation(new ByteArray(duplID), tx.getID(), true)); + } + + private synchronized void addToCacheInMemory(final ByteArray holder, final long recordID) { + Objects.requireNonNull(holder, "holder must be not null"); + if (recordID < 0) { + throw new IllegalArgumentException("recordID must be >= 0"); + } + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s adding %s", address, describeID(holder.bytes, recordID)); + } + + cache.put(holder, cachedBoxedInts.apply(pos)); + + ObjLongPair id; + + if (pos < ids.size()) { + // Need fast array style access here -hence ArrayList typing + id = ids.get(pos); + + // The id here might be null if it was explicit deleted + if (id.getA() != null) { + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s removing excess duplicateDetection %s", address, describeID(id.getA().bytes, id.getB())); + } + + cache.remove(id.getA()); + + assert id.getB() != NIL; + try { + storageManager.deleteDuplicateID(id.getB()); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorDeletingDuplicateCache(e); + } + } + + id.setA(holder); + + id.setB(recordID); + + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s replacing old duplicateID by %s", address, describeID(id.getA().bytes, id.getB())); + } + + } else { + id = new ObjLongPair<>(holder, recordID); + + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s adding new duplicateID %s", address, describeID(id.getA().bytes, id.getB())); + } + + ids.add(id); + + } + + if (pos++ == cacheSize - 1) { + pos = 0; + } + } + + @Override + public synchronized void clear() throws Exception { + LOGGER.debugf("address = %s removing duplicate ID data", address); + final int idsSize = ids.size(); + if (idsSize > 0) { + long tx = storageManager.generateID(); + for (int i = 0; i < idsSize; i++) { + final ObjLongPair id = ids.get(i); + if (id.getA() != null) { + assert id.getB() != NIL; + storageManager.deleteDuplicateIDTransactional(tx, id.getB()); + } + } + storageManager.commit(tx); + } + + ids.clear(); + cache.clear(); + pos = 0; + } + + @Override + public synchronized List> getMap() { + final int idsSize = ids.size(); + List> copy = new ArrayList<>(idsSize); + for (int i = 0; i < idsSize; i++) { + final ObjLongPair id = ids.get(i); + // in case the pair has been removed + if (id.getA() != null) { + assert id.getB() != NIL; + copy.add(new Pair<>(id.getA().bytes, id.getB())); + } + } + return copy; + } + + private final class AddDuplicateIDOperation extends TransactionOperationAbstract { + + final ByteArray holder; + + final long recordID; + + volatile boolean done; + + private final boolean afterCommit; + + AddDuplicateIDOperation(final ByteArray holder, final long recordID, boolean afterCommit) { + this.holder = holder; + this.recordID = recordID; + this.afterCommit = afterCommit; + } + + private void process() { + if (!done) { + addToCacheInMemory(holder, recordID); + + done = true; + } + } + + @Override + public void afterCommit(final Transaction tx) { + if (afterCommit) { + process(); + } + } + + @Override + public void beforeCommit(Transaction tx) throws Exception { + if (!afterCommit) { + process(); + } + } + + @Override + public List getRelatedMessageReferences() { + return null; + } + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 187ee584e6..07c0bdf2ad 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -16,6 +16,25 @@ */ package org.apache.activemq.artemis.core.postoffice.impl; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; + import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; @@ -82,25 +101,6 @@ import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.jboss.logging.Logger; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Stream; - import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf; /** @@ -1315,7 +1315,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding DuplicateIDCache cache = duplicateIDCaches.get(address); if (cache == null) { - cache = new DuplicateIDCacheImpl(address, idCacheSize, storageManager, persistIDCache); + if (persistIDCache) { + cache = DuplicateIDCaches.persistent(address, idCacheSize, storageManager); + } else { + cache = DuplicateIDCaches.inMemory(address, idCacheSize); + } DuplicateIDCache oldCache = duplicateIDCaches.putIfAbsent(address, cache); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java index 8300bdb355..39fe106999 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java @@ -24,7 +24,7 @@ import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; -import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl; +import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RetryRule; @@ -52,7 +52,7 @@ public class DuplicateCacheTest extends StorageManagerTestBase { public void testDuplicate() throws Exception { createStorage(); - DuplicateIDCache cache = new DuplicateIDCacheImpl(new SimpleString("test"), 2000, journal, true); + DuplicateIDCache cache = DuplicateIDCaches.persistent(new SimpleString("test"), 2000, journal); TransactionImpl tx = new TransactionImpl(journal); @@ -108,7 +108,7 @@ public class DuplicateCacheTest extends StorageManagerTestBase { public void testDuplicateNonPersistent() throws Exception { createStorage(); - DuplicateIDCache cache = new DuplicateIDCacheImpl(new SimpleString("test"), 2000, journal, false); + DuplicateIDCache cache = DuplicateIDCaches.inMemory(new SimpleString("test"), 2000); TransactionImpl tx = new TransactionImpl(journal); diff --git a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/DuplicateIDCacheBenchmark.java b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/DuplicateIDCacheBenchmark.java index 21505bf3d8..0917267a49 100644 --- a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/DuplicateIDCacheBenchmark.java +++ b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/DuplicateIDCacheBenchmark.java @@ -21,7 +21,7 @@ import java.util.SplittableRandom; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; -import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl; +import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches; import org.apache.activemq.artemis.utils.RandomUtil; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Fork; @@ -54,7 +54,9 @@ public class DuplicateIDCacheBenchmark { @Setup public void init() throws Exception { - cache = new DuplicateIDCacheImpl(SimpleString.toSimpleString("benchmark"), size, new NullStorageManager(), persist); + cache = persist ? + DuplicateIDCaches.persistent(SimpleString.toSimpleString("benchmark"), size, new NullStorageManager()) : + DuplicateIDCaches.inMemory(SimpleString.toSimpleString("benchmark"), size); final int idSize = findNextHigherPowerOf2(size); idsMask = idSize - 1; nextId = 0; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java index 569f46983a..76c861952f 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java @@ -31,8 +31,9 @@ import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; +import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.PostOffice; -import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl; +import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches; import org.apache.activemq.artemis.core.server.impl.PostOfficeJournalLoader; import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl; import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakePostOffice; @@ -40,8 +41,8 @@ import org.apache.activemq.artemis.tests.unit.util.FakePagingManager; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ExecutorFactory; -import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer; import org.junit.After; import org.junit.Assert; @@ -106,7 +107,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase { Assert.assertEquals(0, mapDups.size()); - DuplicateIDCacheImpl cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true); + DuplicateIDCache cacheID = DuplicateIDCaches.persistent(ADDRESS, 10, journal); for (int i = 0; i < 100; i++) { cacheID.addToCache(RandomUtil.randomBytes()); @@ -126,7 +127,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase { Assert.assertEquals(10, values.size()); - cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true); + cacheID = DuplicateIDCaches.persistent(ADDRESS, 10, journal); cacheID.load(values); for (int i = 0; i < 100; i++) { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index 5220a1a911..d7aa3cceae 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -28,14 +28,13 @@ import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; -import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl; +import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; @@ -201,7 +200,7 @@ public class FakePostOffice implements PostOffice { @Override public DuplicateIDCache getDuplicateIDCache(final SimpleString address) { - return new DuplicateIDCacheImpl(address, 2000, new NullStorageManager(), false); + return DuplicateIDCaches.inMemory(address, 2000); } @Override