diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ObjLongPair.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ObjLongPair.java
new file mode 100644
index 0000000000..448d008ef8
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ObjLongPair.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.api.core;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * A Pair is a holder for 1 object and a >= 0 primitive long value.
+ *
+ * This is a utility class.
+ */
+public class ObjLongPair implements Serializable {
+
+ private static final long serialVersionUID = 7749478219139339853L;
+
+ public static final long NIL = -1;
+
+ public ObjLongPair(final A a, final long b) {
+ this.a = a;
+ this.b = b;
+ if (b < 0 && b != NIL) {
+ throw new IllegalStateException("b must be >= 0 or == NIL = " + NIL);
+ }
+ }
+
+ public ObjLongPair(final A a) {
+ this.a = a;
+ this.b = NIL;
+ }
+
+ private A a;
+ private long b;
+
+ @Override
+ public int hashCode() {
+ if (a == null && b == NIL) {
+ return super.hashCode();
+ }
+ // it's ok to use b to compute hashCode although NIL
+ return (a == null ? 0 : a.hashCode()) + 37 * Long.hashCode(b);
+ }
+
+ @Override
+ public boolean equals(final Object other) {
+ if (other == this) {
+ return true;
+ }
+
+ if (other instanceof ObjLongPair == false) {
+ return false;
+ }
+
+ ObjLongPair pother = (ObjLongPair) other;
+
+ return (Objects.equals(pother.a, a)) && (pother.b == b);
+ }
+
+ @Override
+ public String toString() {
+ return "ObjLongPair[a=" + a + ", b=" + (b == NIL ? "NIL" : b) + "]";
+ }
+
+ public void setA(A a) {
+ if (this.a == a) {
+ return;
+ }
+ this.a = a;
+ }
+
+ public A getA() {
+ return a;
+ }
+
+ public void setB(long b) {
+ if (b < 0 && b != NIL) {
+ throw new IllegalStateException("b must be >= 0 or == NIL = " + NIL);
+ }
+ this.b = b;
+ }
+
+ public long getB() {
+ return b;
+ }
+}
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
index 8bd07b07c4..94078153e0 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
@@ -275,6 +275,52 @@ public class ByteUtil {
}
}
+ public static int hashCode(byte[] bytes) {
+ if (PlatformDependent.hasUnsafe() && PlatformDependent.isUnaligned()) {
+ return unsafeHashCode(bytes);
+ }
+ return Arrays.hashCode(bytes);
+ }
+
+ /**
+ * This hash code computation is borrowed by {@link io.netty.buffer.ByteBufUtil#hashCode(ByteBuf)}.
+ */
+ private static int unsafeHashCode(byte[] bytes) {
+ if (bytes == null) {
+ return 0;
+ }
+ final int len = bytes.length;
+ int hashCode = 1;
+ final int intCount = len >>> 2;
+ int arrayIndex = 0;
+ // reading in batch both help hash code computation data dependencies and save memory bandwidth
+ for (int i = 0; i < intCount; i++) {
+ hashCode = 31 * hashCode + PlatformDependent.getInt(bytes, arrayIndex);
+ arrayIndex += Integer.BYTES;
+ }
+ final byte remaining = (byte) (len & 3);
+ if (remaining > 0) {
+ hashCode = unsafeUnrolledHashCode(bytes, arrayIndex, remaining, hashCode);
+ }
+ return hashCode == 0 ? 1 : hashCode;
+ }
+
+ private static int unsafeUnrolledHashCode(byte[] bytes, int index, int bytesCount, int h) {
+ // there is still the hash data dependency but is more friendly
+ // then a plain loop, given that we know no loop is needed here
+ assert bytesCount > 0 && bytesCount < 4;
+ h = 31 * h + PlatformDependent.getByte(bytes, index);
+ if (bytesCount == 1) {
+ return h;
+ }
+ h = 31 * h + PlatformDependent.getByte(bytes, index + 1);
+ if (bytesCount == 2) {
+ return h;
+ }
+ h = 31 * h + PlatformDependent.getByte(bytes, index + 2);
+ return h;
+ }
+
public static boolean equals(final byte[] left, final byte[] right) {
return equals(left, right, 0, right.length);
}
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
index f95d22733c..ced2062784 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
@@ -17,17 +17,23 @@
package org.apache.activemq.artemis.utils;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.ReadOnlyBufferException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
import io.netty.util.internal.PlatformDependent;
import org.jboss.logging.Logger;
import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import org.junit.Test;
public class ByteUtilTest {
@@ -83,6 +89,46 @@ public class ByteUtilTest {
}
}
+ @Test
+ public void testUnsafeUnalignedByteArrayHashCode() {
+ Assume.assumeTrue(PlatformDependent.hasUnsafe());
+ Assume.assumeTrue(PlatformDependent.isUnaligned());
+ Map map = new LinkedHashMap<>();
+ map.put(new byte[0], 1);
+ map.put(new byte[]{1}, 32);
+ map.put(new byte[]{2}, 33);
+ map.put(new byte[]{0, 1}, 962);
+ map.put(new byte[]{1, 2}, 994);
+ if (ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN) {
+ map.put(new byte[]{0, 1, 2, 3, 4, 5}, 63504931);
+ map.put(new byte[]{6, 7, 8, 9, 0, 1}, -1603953111);
+ map.put(new byte[]{-1, -1, -1, (byte) 0xE1}, 1);
+ } else {
+ map.put(new byte[]{0, 1, 2, 3, 4, 5}, 1250309600);
+ map.put(new byte[]{6, 7, 8, 9, 0, 1}, -417148442);
+ map.put(new byte[]{-1, -1, -1, (byte) 0xE1}, -503316450);
+ }
+ for (Map.Entry e : map.entrySet()) {
+ assertEquals("input = " + Arrays.toString(e.getKey()), e.getValue().intValue(), ByteUtil.hashCode(e.getKey()));
+ }
+ }
+
+ @Test
+ public void testNoUnsafeAlignedByteArrayHashCode() {
+ Assume.assumeFalse(PlatformDependent.hasUnsafe());
+ Assume.assumeFalse(PlatformDependent.isUnaligned());
+ ArrayList inputs = new ArrayList<>();
+ inputs.add(new byte[0]);
+ inputs.add(new byte[]{1});
+ inputs.add(new byte[]{2});
+ inputs.add(new byte[]{0, 1});
+ inputs.add(new byte[]{1, 2});
+ inputs.add(new byte[]{0, 1, 2, 3, 4, 5});
+ inputs.add(new byte[]{6, 7, 8, 9, 0, 1});
+ inputs.add(new byte[]{-1, -1, -1, (byte) 0xE1});
+ inputs.forEach(input -> assertEquals("input = " + Arrays.toString(input), Arrays.hashCode(input), ByteUtil.hashCode(input)));
+ }
+
@Test
public void testTextBytesToLongBytesNegative() {
try {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
index b384896efa..75cc17b7d3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
@@ -40,7 +40,7 @@ public interface DuplicateIDCache {
void deleteFromCache(byte[] duplicateID) throws Exception;
- void load(List> theIds) throws Exception;
+ void load(List> ids) throws Exception;
void load(Transaction tx, byte[] duplID);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/ByteArray.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/ByteArray.java
new file mode 100644
index 0000000000..8f2c67e38f
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/ByteArray.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.postoffice.impl;
+
+import org.apache.activemq.artemis.utils.ByteUtil;
+
+final class ByteArray {
+
+ final byte[] bytes;
+
+ private int hash;
+
+ ByteArray(final byte[] bytes) {
+ this.bytes = bytes;
+ }
+
+ @Override
+ public boolean equals(final Object other) {
+ if (other instanceof ByteArray) {
+ ByteArray s = (ByteArray) other;
+
+ return ByteUtil.equals(bytes, s.bytes);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ if (hash == 0) {
+ hash = ByteUtil.hashCode(bytes);
+ }
+
+ return hash;
+ }
+}
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
deleted file mode 100644
index 7f1604530d..0000000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
+++ /dev/null
@@ -1,417 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.postoffice.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
-import org.apache.activemq.artemis.api.core.Pair;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
-import org.apache.activemq.artemis.utils.ByteUtil;
-import org.jboss.logging.Logger;
-
-/**
- * A DuplicateIDCacheImpl
- *
- * A fixed size rotating cache of last X duplicate ids.
- */
-public class DuplicateIDCacheImpl implements DuplicateIDCache {
-
- private static final Logger logger = Logger.getLogger(DuplicateIDCacheImpl.class);
-
- // ByteHolder, position
- private final Map cache = new ConcurrentHashMap<>();
-
- private final SimpleString address;
-
- // Note - deliberately typed as ArrayList since we want to ensure fast indexed
- // based array access
- private final ArrayList> ids;
-
- private int pos;
-
- private final int cacheSize;
-
- private final StorageManager storageManager;
-
- private final boolean persist;
-
- public DuplicateIDCacheImpl(final SimpleString address,
- final int size,
- final StorageManager storageManager,
- final boolean persist) {
- this.address = address;
-
- cacheSize = size;
-
- ids = new ArrayList<>(size);
-
- this.storageManager = storageManager;
-
- this.persist = persist;
- }
-
- @Override
- public void load(final List> theIds) throws Exception {
- long txID = -1;
-
- // If we have more IDs than cache size, we shrink the first ones
- int deleteCount = theIds.size() - cacheSize;
- if (deleteCount < 0) {
- deleteCount = 0;
- }
-
- for (Pair id : theIds) {
- if (deleteCount > 0) {
- if (txID == -1) {
- txID = storageManager.generateID();
- }
- if (logger.isTraceEnabled()) {
- logger.trace("DuplicateIDCacheImpl::load deleting id=" + describeID(id.getA(), id.getB()));
- }
-
- storageManager.deleteDuplicateIDTransactional(txID, id.getB());
- deleteCount--;
- } else {
- ByteArrayHolder bah = new ByteArrayHolder(id.getA());
-
- Pair pair = new Pair<>(bah, id.getB());
-
- cache.put(bah, ids.size());
-
- ids.add(pair);
- if (logger.isTraceEnabled()) {
- logger.trace("DuplicateIDCacheImpl::load loading id=" + describeID(id.getA(), id.getB()));
- }
- }
-
- }
-
- if (txID != -1) {
- storageManager.commit(txID);
- }
-
- pos = ids.size();
-
- if (pos == cacheSize) {
- pos = 0;
- }
-
- }
-
- @Override
- public void deleteFromCache(byte[] duplicateID) throws Exception {
- if (logger.isTraceEnabled()) {
- logger.trace("DuplicateIDCacheImpl::deleteFromCache deleting id=" + describeID(duplicateID, 0));
- }
-
- ByteArrayHolder bah = new ByteArrayHolder(duplicateID);
-
- Integer posUsed = cache.remove(bah);
-
- if (posUsed != null) {
- Pair id;
-
- synchronized (this) {
- id = ids.get(posUsed.intValue());
-
- if (id.getA().equals(bah)) {
- id.setA(null);
- storageManager.deleteDuplicateID(id.getB());
- if (logger.isTraceEnabled()) {
- logger.trace("DuplicateIDCacheImpl(" + this.address + ")::deleteFromCache deleting id=" + describeID(duplicateID, id.getB()));
- }
- id.setB(null);
- }
- }
- }
-
- }
-
- private String describeID(byte[] duplicateID, long id) {
- if (id != 0) {
- return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID);
- } else {
- return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID) + ", id=" + id;
- }
- }
-
- @Override
- public boolean contains(final byte[] duplID) {
- boolean contains = cache.get(new ByteArrayHolder(duplID)) != null;
-
- if (contains) {
- logger.trace("DuplicateIDCacheImpl(" + this.address + ")::constains found a duplicate " + describeID(duplID, 0));
- }
- return contains;
- }
-
- @Override
- public void addToCache(final byte[] duplID) throws Exception {
- addToCache(duplID, null, false);
- }
-
- @Override
- public void addToCache(final byte[] duplID, final Transaction tx) throws Exception {
- addToCache(duplID, tx, false);
- }
-
- @Override
- public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception {
-
- if (contains(duplID)) {
- if (tx != null) {
- tx.markAsRollbackOnly(new ActiveMQDuplicateIdException());
- }
- return false;
- } else {
- addToCache(duplID, tx, true);
- return true;
- }
-
- }
-
- @Override
- public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {
- long recordID = -1;
-
- if (tx == null) {
- if (persist) {
- recordID = storageManager.generateID();
- storageManager.storeDuplicateID(address, duplID, recordID);
- }
-
- addToCacheInMemory(duplID, recordID);
- } else {
- if (persist) {
- recordID = storageManager.generateID();
- storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID);
-
- tx.setContainsPersistent();
- }
-
- if (logger.isTraceEnabled()) {
- logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID) + ", tx=" + tx);
- }
-
-
- if (instantAdd) {
- tx.addOperation(new AddDuplicateIDOperation(duplID, recordID, false));
- } else {
- // For a tx, it's important that the entry is not added to the cache until commit
- // since if the client fails then resends them tx we don't want it to get rejected
- tx.afterStore(new AddDuplicateIDOperation(duplID, recordID, true));
- }
- }
- }
-
- @Override
- public void load(final Transaction tx, final byte[] duplID) {
- tx.addOperation(new AddDuplicateIDOperation(duplID, tx.getID(), true));
- }
-
- private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID) {
- if (logger.isTraceEnabled()) {
- logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding " + describeID(duplID, recordID));
- }
-
- ByteArrayHolder holder = new ByteArrayHolder(duplID);
-
- cache.put(holder, pos);
-
- Pair id;
-
- if (pos < ids.size()) {
- // Need fast array style access here -hence ArrayList typing
- id = ids.get(pos);
-
- // The id here might be null if it was explicit deleted
- if (id.getA() != null) {
- if (logger.isTraceEnabled()) {
- logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory removing excess duplicateDetection " + describeID(id.getA().bytes, id.getB()));
- }
-
- cache.remove(id.getA());
-
- // Record already exists - we delete the old one and add the new one
- // Note we can't use update since journal update doesn't let older records get
- // reclaimed
-
- if (id.getB() != null) {
- try {
- storageManager.deleteDuplicateID(id.getB());
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.errorDeletingDuplicateCache(e);
- }
- }
- }
-
- id.setA(holder);
-
- // The recordID could be negative if the duplicateCache is configured to not persist,
- // -1 would mean null on this case
- id.setB(recordID >= 0 ? recordID : null);
-
- if (logger.isTraceEnabled()) {
- logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory replacing old duplicateID by " + describeID(id.getA().bytes, id.getB()));
- }
-
- holder.pos = pos;
- } else {
- id = new Pair<>(holder, recordID >= 0 ? recordID : null);
-
- if (logger.isTraceEnabled()) {
- logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding new duplicateID " + describeID(id.getA().bytes, id.getB()));
- }
-
- ids.add(id);
-
- holder.pos = pos;
- }
-
- if (pos++ == cacheSize - 1) {
- pos = 0;
- }
- }
-
- @Override
- public void clear() throws Exception {
- logger.debug("DuplicateIDCacheImpl(" + this.address + ")::clear removing duplicate ID data");
- synchronized (this) {
- if (ids.size() > 0 && persist) {
- long tx = storageManager.generateID();
- for (Pair id : ids) {
- if (id != null) {
- storageManager.deleteDuplicateIDTransactional(tx, id.getB());
- }
- }
- storageManager.commit(tx);
- }
-
- ids.clear();
- cache.clear();
- pos = 0;
- }
- }
-
- @Override
- public List> getMap() {
- List> list = new ArrayList<>();
- for (Pair id : ids) {
- list.add(new Pair<>(id.getA().bytes, id.getB()));
- }
- return list;
- }
-
- private final class AddDuplicateIDOperation extends TransactionOperationAbstract {
-
- final byte[] duplID;
-
- final long recordID;
-
- volatile boolean done;
-
- private final boolean afterCommit;
-
- AddDuplicateIDOperation(final byte[] duplID, final long recordID, boolean afterCommit) {
- this.duplID = duplID;
- this.recordID = recordID;
- this.afterCommit = afterCommit;
- }
-
- private void process() {
- if (!done) {
- addToCacheInMemory(duplID, recordID);
-
- done = true;
- }
- }
-
- @Override
- public void afterCommit(final Transaction tx) {
- if (afterCommit) {
- process();
- }
- }
-
- @Override
- public void beforeCommit(Transaction tx) throws Exception {
- if (!afterCommit) {
- process();
- }
- }
-
- @Override
- public List getRelatedMessageReferences() {
- return null;
- }
- }
-
- private static final class ByteArrayHolder {
-
- ByteArrayHolder(final byte[] bytes) {
- this.bytes = bytes;
- }
-
- final byte[] bytes;
-
- int hash;
-
- int pos;
-
- @Override
- public boolean equals(final Object other) {
- if (other instanceof ByteArrayHolder) {
- ByteArrayHolder s = (ByteArrayHolder) other;
-
- if (bytes.length != s.bytes.length) {
- return false;
- }
-
- for (int i = 0; i < bytes.length; i++) {
- if (bytes[i] != s.bytes[i]) {
- return false;
- }
- }
-
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- if (hash == 0) {
- for (byte b : bytes) {
- hash = 31 * hash + b;
- }
- }
-
- return hash;
- }
- }
-}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCaches.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCaches.java
new file mode 100644
index 0000000000..79b55e8145
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCaches.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.postoffice.impl;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
+
+public final class DuplicateIDCaches {
+
+ private DuplicateIDCaches() {
+
+ }
+
+ public static DuplicateIDCache persistent(final SimpleString address,
+ final int size,
+ final StorageManager storageManager) {
+ return new PersistentDuplicateIDCache(address, size, storageManager);
+ }
+
+ public static DuplicateIDCache inMemory(final SimpleString address, final int size) {
+ return new InMemoryDuplicateIDCache(address, size);
+ }
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
new file mode 100644
index 0000000000..f1f86142df
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.postoffice.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.IntFunction;
+
+import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.jboss.logging.Logger;
+
+import static org.apache.activemq.artemis.core.postoffice.impl.IntegerCache.boxedInts;
+
+/**
+ * {@link InMemoryDuplicateIDCache} and {@link PersistentDuplicateIDCache} impls have been separated for performance
+ * and memory footprint reasons.
+ * Instead of using a single {@link DuplicateIDCache} impl, we've let 2 different impls to contain just the bare
+ * minimum data in order to have 2 different memory footprint costs at runtime, while making easier to track dependencies
+ * eg in-memory cache won't need any {@link StorageManager} because no storage operations are expected to happen.
+ */
+final class InMemoryDuplicateIDCache implements DuplicateIDCache {
+
+ private static final Logger LOGGER = Logger.getLogger(InMemoryDuplicateIDCache.class);
+
+ private final Map cache = new ConcurrentHashMap<>();
+
+ private final SimpleString address;
+
+ private final ArrayList ids;
+
+ private final IntFunction cachedBoxedInts;
+
+ private int pos;
+
+ private final int cacheSize;
+
+ InMemoryDuplicateIDCache(final SimpleString address, final int size) {
+ this.address = address;
+
+ cacheSize = size;
+
+ ids = new ArrayList<>(size);
+
+ cachedBoxedInts = boxedInts(size);
+ }
+
+ @Override
+ public void load(List> ids) throws Exception {
+ LOGGER.debugf("address = %s ignore loading ids: in memory cache won't load previously stored ids", address);
+ }
+
+ @Override
+ public void deleteFromCache(byte[] duplicateID) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.tracef("deleting id = %s", describeID(duplicateID));
+ }
+
+ ByteArray bah = new ByteArray(duplicateID);
+
+ Integer posUsed = cache.remove(bah);
+
+ if (posUsed != null) {
+ ByteArray id;
+
+ synchronized (this) {
+ final int index = posUsed.intValue();
+ id = ids.get(index);
+
+ if (id.equals(bah)) {
+ ids.set(index, null);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.tracef("address = %s deleting id=", address, describeID(duplicateID));
+ }
+ }
+ }
+ }
+
+ }
+
+ private static String describeID(byte[] duplicateID) {
+ return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID);
+ }
+
+ @Override
+ public boolean contains(final byte[] duplID) {
+ return contains(new ByteArray(duplID));
+ }
+
+ private boolean contains(final ByteArray id) {
+ boolean contains = cache.containsKey(id);
+
+ if (LOGGER.isTraceEnabled()) {
+ if (contains) {
+ LOGGER.tracef("address = %s found a duplicate ", address, describeID(id.bytes));
+ }
+ }
+ return contains;
+ }
+
+ @Override
+ public void addToCache(final byte[] duplID) throws Exception {
+ addToCache(duplID, null, false);
+ }
+
+ @Override
+ public void addToCache(final byte[] duplID, final Transaction tx) throws Exception {
+ addToCache(duplID, tx, false);
+ }
+
+ @Override
+ public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) {
+ final ByteArray holder = new ByteArray(duplID);
+ if (contains(holder)) {
+ if (tx != null) {
+ tx.markAsRollbackOnly(new ActiveMQDuplicateIdException());
+ }
+ return false;
+ }
+ addToCache(holder, tx, true);
+ return true;
+ }
+
+ @Override
+ public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {
+ addToCache(new ByteArray(duplID), tx, instantAdd);
+ }
+
+ private synchronized void addToCache(final ByteArray holder, final Transaction tx, boolean instantAdd) {
+ if (tx == null) {
+ addToCacheInMemory(holder);
+ } else {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.tracef("address = %s adding duplicateID TX operation for %s, tx = %s", address, describeID(holder.bytes), tx);
+ }
+
+ if (instantAdd) {
+ tx.addOperation(new AddDuplicateIDOperation(holder, false));
+ } else {
+ // For a tx, it's important that the entry is not added to the cache until commit
+ // since if the client fails then resends them tx we don't want it to get rejected
+ tx.afterStore(new AddDuplicateIDOperation(holder, true));
+ }
+ }
+ }
+
+ @Override
+ public void load(final Transaction tx, final byte[] duplID) {
+ tx.addOperation(new AddDuplicateIDOperation(new ByteArray(duplID), true));
+ }
+
+ private synchronized void addToCacheInMemory(final ByteArray holder) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.tracef("address = %s adding %s", address, describeID(holder.bytes));
+ }
+
+ cache.put(holder, cachedBoxedInts.apply(pos));
+
+ if (pos < ids.size()) {
+ // Need fast array style access here -hence ArrayList typing
+ final ByteArray id = ids.set(pos, holder);
+
+ // The id here might be null if it was explicit deleted
+ if (id != null) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.tracef("address = %s removing excess duplicateDetection %s", address, describeID(id.bytes));
+ }
+
+ cache.remove(id);
+ }
+
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.tracef("address = %s replacing old duplicateID by %s", describeID(holder.bytes));
+ }
+
+ } else {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.tracef("address = %s adding new duplicateID %s", describeID(holder.bytes));
+ }
+
+ ids.add(holder);
+ }
+
+ if (pos++ == cacheSize - 1) {
+ pos = 0;
+ }
+ }
+
+ @Override
+ public synchronized void clear() throws Exception {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debugf("address = %s removing duplicate ID data", address);
+ }
+ ids.clear();
+ cache.clear();
+ pos = 0;
+ }
+
+ @Override
+ public synchronized List> getMap() {
+ final int idsSize = ids.size();
+ List> copy = new ArrayList<>(idsSize);
+ for (int i = 0; i < idsSize; i++) {
+ final ByteArray id = ids.get(i);
+ // in case the id has been removed
+ if (id != null) {
+ copy.add(new Pair<>(id.bytes, null));
+ }
+ }
+ return copy;
+ }
+
+ private final class AddDuplicateIDOperation extends TransactionOperationAbstract {
+
+ final ByteArray id;
+
+ volatile boolean done;
+
+ private final boolean afterCommit;
+
+ AddDuplicateIDOperation(final ByteArray id, boolean afterCommit) {
+ this.id = id;
+ this.afterCommit = afterCommit;
+ }
+
+ private void process() {
+ if (!done) {
+ addToCacheInMemory(id);
+
+ done = true;
+ }
+ }
+
+ @Override
+ public void afterCommit(final Transaction tx) {
+ if (afterCommit) {
+ process();
+ }
+ }
+
+ @Override
+ public void beforeCommit(Transaction tx) throws Exception {
+ if (!afterCommit) {
+ process();
+ }
+ }
+
+ @Override
+ public List getRelatedMessageReferences() {
+ return null;
+ }
+ }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/IntegerCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/IntegerCache.java
new file mode 100644
index 0000000000..a726552a54
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/IntegerCache.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.postoffice.impl;
+
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
+import java.util.function.IntFunction;
+
+final class IntegerCache {
+
+ private static final boolean DISABLE_INTEGER_CACHE = Boolean.valueOf(System.getProperty("disable.integer.cache", Boolean.FALSE.toString()));
+
+ // we're not interested into safe publication here: we need to scale, be fast and save "some" GC to happen
+ private static WeakReference INDEXES = null;
+
+ private static Integer[] ints(int size) {
+ final Reference indexesRef = INDEXES;
+ final Integer[] indexes = indexesRef == null ? null : indexesRef.get();
+ if (indexes != null && size <= indexes.length) {
+ return indexes;
+ }
+ final int newSize = size + (indexes == null ? 0 : size / 2);
+ final Integer[] newIndexes = new Integer[newSize];
+ if (indexes != null) {
+ System.arraycopy(indexes, 0, newIndexes, 0, indexes.length);
+ }
+ INDEXES = new WeakReference<>(newIndexes);
+ return newIndexes;
+ }
+
+ public static IntFunction boxedInts(int size) {
+ if (DISABLE_INTEGER_CACHE) {
+ return Integer::valueOf;
+ }
+ // use a lambda to have an trusted const field for free
+ final Integer[] cachedInts = ints(size);
+ return index -> {
+ Integer boxedInt = cachedInts[index];
+ if (boxedInt == null) {
+ boxedInt = index;
+ cachedInts[index] = boxedInt;
+ }
+ assert boxedInt != null;
+ return boxedInt;
+ };
+ }
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
new file mode 100644
index 0000000000..3e3758d3c9
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.postoffice.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.IntFunction;
+
+import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
+import org.apache.activemq.artemis.api.core.ObjLongPair;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.jboss.logging.Logger;
+
+import static org.apache.activemq.artemis.api.core.ObjLongPair.NIL;
+import static org.apache.activemq.artemis.core.postoffice.impl.IntegerCache.boxedInts;
+
+/**
+ * {@link InMemoryDuplicateIDCache} and {@link PersistentDuplicateIDCache} impls have been separated for performance
+ * and memory footprint reasons.
+ * Instead of using a single {@link DuplicateIDCache} impl, we've let 2 different impls to contain just the bare
+ * minimum data in order to have 2 different memory footprint costs at runtime, while making easier to track dependencies
+ * eg in-memory cache won't need any {@link StorageManager} because no storage operations are expected to happen.
+ */
+final class PersistentDuplicateIDCache implements DuplicateIDCache {
+
+ private static final Logger LOGGER = Logger.getLogger(PersistentDuplicateIDCache.class);
+
+ private final Map cache = new ConcurrentHashMap<>();
+
+ private final SimpleString address;
+
+ private final ArrayList> ids;
+
+ private final IntFunction cachedBoxedInts;
+
+ private int pos;
+
+ private final int cacheSize;
+
+ private final StorageManager storageManager;
+
+ PersistentDuplicateIDCache(final SimpleString address, final int size, final StorageManager storageManager) {
+ this.address = address;
+
+ cacheSize = size;
+
+ ids = new ArrayList<>(size);
+
+ cachedBoxedInts = boxedInts(size);
+
+ this.storageManager = storageManager;
+ }
+
+ @Override
+ public synchronized void load(final List> ids) throws Exception {
+ if (!cache.isEmpty()) {
+ throw new IllegalStateException("load is valid only on empty cache");
+ }
+ // load only ids that fit this cache:
+ // - in term of remaining capacity
+ // - ignoring (and reporting) ids unpaired with record ID
+ // Then, delete the exceeding ones.
+
+ long txID = -1;
+
+ int toNotBeAdded = ids.size() - cacheSize;
+ if (toNotBeAdded < 0) {
+ toNotBeAdded = 0;
+ }
+
+ for (Pair id : ids) {
+ if (id.getB() == null) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.tracef("ignoring id = %s because without record ID", describeID(id.getA()));
+ }
+ if (toNotBeAdded > 0) {
+ toNotBeAdded--;
+ }
+ continue;
+ }
+ assert id.getB() != null && id.getB().longValue() != NIL;
+ if (toNotBeAdded > 0) {
+ if (txID == -1) {
+ txID = storageManager.generateID();
+ }
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.tracef("deleting id = %s", describeID(id.getA(), id.getB()));
+ }
+
+ storageManager.deleteDuplicateIDTransactional(txID, id.getB());
+ toNotBeAdded--;
+ } else {
+ ByteArray bah = new ByteArray(id.getA());
+
+ ObjLongPair pair = new ObjLongPair<>(bah, id.getB());
+
+ cache.put(bah, cachedBoxedInts.apply(this.ids.size()));
+
+ this.ids.add(pair);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.tracef("loading id = %s", describeID(id.getA(), id.getB()));
+ }
+ }
+
+ }
+
+ if (txID != -1) {
+ storageManager.commit(txID);
+ }
+
+ pos = this.ids.size();
+
+ if (pos == cacheSize) {
+ pos = 0;
+ }
+
+ }
+
+ @Override
+ public void deleteFromCache(byte[] duplicateID) throws Exception {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.tracef("deleting id = %s", describeID(duplicateID));
+ }
+
+ final ByteArray bah = new ByteArray(duplicateID);
+
+ final Integer posUsed = cache.remove(bah);
+
+ if (posUsed != null) {
+ synchronized (this) {
+ final ObjLongPair id = ids.get(posUsed.intValue());
+
+ if (id.getA().equals(bah)) {
+ final long recordID = id.getB();
+ id.setA(null);
+ id.setB(NIL);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.tracef("address = %s deleting id = %s", address, describeID(duplicateID, id.getB()));
+ }
+ storageManager.deleteDuplicateID(recordID);
+ }
+ }
+ }
+
+ }
+
+ private static String describeID(byte[] duplicateID) {
+ return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID);
+ }
+
+ private static String describeID(byte[] duplicateID, long id) {
+ return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID) + ", id=" + id;
+ }
+
+ @Override
+ public boolean contains(final byte[] duplID) {
+ return contains(new ByteArray(duplID));
+ }
+
+ private boolean contains(final ByteArray duplID) {
+ boolean contains = cache.containsKey(duplID);
+
+ if (LOGGER.isTraceEnabled()) {
+ if (contains) {
+ LOGGER.tracef("address = %s found a duplicate %s", address, describeID(duplID.bytes));
+ }
+ }
+ return contains;
+ }
+
+ @Override
+ public void addToCache(final byte[] duplID) throws Exception {
+ addToCache(duplID, null, false);
+ }
+
+ @Override
+ public void addToCache(final byte[] duplID, final Transaction tx) throws Exception {
+ addToCache(duplID, tx, false);
+ }
+
+ @Override
+ public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception {
+ final ByteArray holder = new ByteArray(duplID);
+ if (contains(holder)) {
+ if (tx != null) {
+ tx.markAsRollbackOnly(new ActiveMQDuplicateIdException());
+ }
+ return false;
+ }
+ addToCache(holder, tx, true);
+ return true;
+ }
+
+ @Override
+ public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {
+ addToCache(new ByteArray(duplID), tx, instantAdd);
+ }
+
+ private synchronized void addToCache(final ByteArray holder,
+ final Transaction tx,
+ boolean instantAdd) throws Exception {
+ final long recordID = storageManager.generateID();
+ if (tx == null) {
+ storageManager.storeDuplicateID(address, holder.bytes, recordID);
+
+ addToCacheInMemory(holder, recordID);
+ } else {
+ storageManager.storeDuplicateIDTransactional(tx.getID(), address, holder.bytes, recordID);
+
+ tx.setContainsPersistent();
+
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.tracef("address = %s adding duplicateID TX operation for %s, tx = %s", address,
+ describeID(holder.bytes, recordID), tx);
+ }
+
+ if (instantAdd) {
+ tx.addOperation(new AddDuplicateIDOperation(holder, recordID, false));
+ } else {
+ // For a tx, it's important that the entry is not added to the cache until commit
+ // since if the client fails then resends them tx we don't want it to get rejected
+ tx.afterStore(new AddDuplicateIDOperation(holder, recordID, true));
+ }
+ }
+ }
+
+ @Override
+ public void load(final Transaction tx, final byte[] duplID) {
+ tx.addOperation(new AddDuplicateIDOperation(new ByteArray(duplID), tx.getID(), true));
+ }
+
+ private synchronized void addToCacheInMemory(final ByteArray holder, final long recordID) {
+ Objects.requireNonNull(holder, "holder must be not null");
+ if (recordID < 0) {
+ throw new IllegalArgumentException("recordID must be >= 0");
+ }
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.tracef("address = %s adding %s", address, describeID(holder.bytes, recordID));
+ }
+
+ cache.put(holder, cachedBoxedInts.apply(pos));
+
+ ObjLongPair id;
+
+ if (pos < ids.size()) {
+ // Need fast array style access here -hence ArrayList typing
+ id = ids.get(pos);
+
+ // The id here might be null if it was explicit deleted
+ if (id.getA() != null) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.tracef("address = %s removing excess duplicateDetection %s", address, describeID(id.getA().bytes, id.getB()));
+ }
+
+ cache.remove(id.getA());
+
+ assert id.getB() != NIL;
+ try {
+ storageManager.deleteDuplicateID(id.getB());
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.errorDeletingDuplicateCache(e);
+ }
+ }
+
+ id.setA(holder);
+
+ id.setB(recordID);
+
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.tracef("address = %s replacing old duplicateID by %s", address, describeID(id.getA().bytes, id.getB()));
+ }
+
+ } else {
+ id = new ObjLongPair<>(holder, recordID);
+
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.tracef("address = %s adding new duplicateID %s", address, describeID(id.getA().bytes, id.getB()));
+ }
+
+ ids.add(id);
+
+ }
+
+ if (pos++ == cacheSize - 1) {
+ pos = 0;
+ }
+ }
+
+ @Override
+ public synchronized void clear() throws Exception {
+ LOGGER.debugf("address = %s removing duplicate ID data", address);
+ final int idsSize = ids.size();
+ if (idsSize > 0) {
+ long tx = storageManager.generateID();
+ for (int i = 0; i < idsSize; i++) {
+ final ObjLongPair id = ids.get(i);
+ if (id.getA() != null) {
+ assert id.getB() != NIL;
+ storageManager.deleteDuplicateIDTransactional(tx, id.getB());
+ }
+ }
+ storageManager.commit(tx);
+ }
+
+ ids.clear();
+ cache.clear();
+ pos = 0;
+ }
+
+ @Override
+ public synchronized List> getMap() {
+ final int idsSize = ids.size();
+ List> copy = new ArrayList<>(idsSize);
+ for (int i = 0; i < idsSize; i++) {
+ final ObjLongPair id = ids.get(i);
+ // in case the pair has been removed
+ if (id.getA() != null) {
+ assert id.getB() != NIL;
+ copy.add(new Pair<>(id.getA().bytes, id.getB()));
+ }
+ }
+ return copy;
+ }
+
+ private final class AddDuplicateIDOperation extends TransactionOperationAbstract {
+
+ final ByteArray holder;
+
+ final long recordID;
+
+ volatile boolean done;
+
+ private final boolean afterCommit;
+
+ AddDuplicateIDOperation(final ByteArray holder, final long recordID, boolean afterCommit) {
+ this.holder = holder;
+ this.recordID = recordID;
+ this.afterCommit = afterCommit;
+ }
+
+ private void process() {
+ if (!done) {
+ addToCacheInMemory(holder, recordID);
+
+ done = true;
+ }
+ }
+
+ @Override
+ public void afterCommit(final Transaction tx) {
+ if (afterCommit) {
+ process();
+ }
+ }
+
+ @Override
+ public void beforeCommit(Transaction tx) throws Exception {
+ if (!afterCommit) {
+ process();
+ }
+ }
+
+ @Override
+ public List getRelatedMessageReferences() {
+ return null;
+ }
+ }
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 187ee584e6..07c0bdf2ad 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -16,6 +16,25 @@
*/
package org.apache.activemq.artemis.core.postoffice.impl;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
@@ -82,25 +101,6 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Stream;
-
import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
/**
@@ -1315,7 +1315,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
DuplicateIDCache cache = duplicateIDCaches.get(address);
if (cache == null) {
- cache = new DuplicateIDCacheImpl(address, idCacheSize, storageManager, persistIDCache);
+ if (persistIDCache) {
+ cache = DuplicateIDCaches.persistent(address, idCacheSize, storageManager);
+ } else {
+ cache = DuplicateIDCaches.inMemory(address, idCacheSize);
+ }
DuplicateIDCache oldCache = duplicateIDCaches.putIfAbsent(address, cache);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java
index 8300bdb355..39fe106999 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java
@@ -24,7 +24,7 @@ import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
-import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.RetryRule;
@@ -52,7 +52,7 @@ public class DuplicateCacheTest extends StorageManagerTestBase {
public void testDuplicate() throws Exception {
createStorage();
- DuplicateIDCache cache = new DuplicateIDCacheImpl(new SimpleString("test"), 2000, journal, true);
+ DuplicateIDCache cache = DuplicateIDCaches.persistent(new SimpleString("test"), 2000, journal);
TransactionImpl tx = new TransactionImpl(journal);
@@ -108,7 +108,7 @@ public class DuplicateCacheTest extends StorageManagerTestBase {
public void testDuplicateNonPersistent() throws Exception {
createStorage();
- DuplicateIDCache cache = new DuplicateIDCacheImpl(new SimpleString("test"), 2000, journal, false);
+ DuplicateIDCache cache = DuplicateIDCaches.inMemory(new SimpleString("test"), 2000);
TransactionImpl tx = new TransactionImpl(journal);
diff --git a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/ByteArrayHashCodeBenchamark.java b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/ByteArrayHashCodeBenchamark.java
new file mode 100644
index 0000000000..80b2d5308c
--- /dev/null
+++ b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/ByteArrayHashCodeBenchamark.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.performance.jmh;
+
+import java.util.Arrays;
+import java.util.SplittableRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+@State(Scope.Benchmark)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Fork(2)
+@Warmup(iterations = 5, time = 200, timeUnit = TimeUnit.MILLISECONDS)
+@Measurement(iterations = 8, time = 200, timeUnit = TimeUnit.MILLISECONDS)
+public class ByteArrayHashCodeBenchamark {
+
+ @Param({"7", "8", "36"})
+ private int length;
+
+ @Param("0")
+ private int seed;
+
+ /**
+ * The higher the less predictable for the CPU branch predictor.
+ */
+ @Param({"4", "10"})
+ private int logPermutations;
+
+ @Param({"true"})
+ private boolean unsafe;
+
+ private long seq;
+ private int inputMask;
+ private byte[][] inputs;
+
+ @Setup
+ public void init() {
+ System.setProperty("io.netty.noUnsafe", Boolean.valueOf(!unsafe).toString());
+ int inputSize = 1 << logPermutations;
+ inputs = new byte[inputSize][];
+ inputMask = inputs.length - 1;
+ // splittable random can create repeatable sequence of inputs
+ SplittableRandom random = new SplittableRandom(seed);
+ for (int i = 0; i < inputs.length; i++) {
+ final byte[] bytes = new byte[length];
+ for (int b = 0; b < length; b++) {
+ bytes[b] = (byte) random.nextInt(Byte.MIN_VALUE, Byte.MAX_VALUE + 1);
+ }
+ inputs[i] = bytes;
+ }
+ }
+
+ private byte[] nextInput() {
+ final long seq = this.seq;
+ final int index = (int) (seq & inputMask);
+ this.seq = seq + 1;
+ return inputs[index];
+ }
+
+ @Benchmark
+ public int artemisHashCode() {
+ return ByteUtil.hashCode(nextInput());
+ }
+
+ @Benchmark
+ public int arraysHashCode() {
+ return Arrays.hashCode(nextInput());
+ }
+
+}
diff --git a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/DuplicateIDCacheBenchmark.java b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/DuplicateIDCacheBenchmark.java
new file mode 100644
index 0000000000..0917267a49
--- /dev/null
+++ b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/DuplicateIDCacheBenchmark.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.performance.jmh;
+
+import java.util.SplittableRandom;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
+import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+
+@State(Scope.Benchmark)
+@Fork(2)
+@Warmup(iterations = 5, time = 1)
+@Measurement(iterations = 8, time = 1)
+public class DuplicateIDCacheBenchmark {
+
+ @Param({"20000"})
+ private int size;
+ @Param({"false", "true"})
+ private boolean persist;
+
+ private DuplicateIDCache cache;
+
+ private byte[][] ids;
+ private int idsMask;
+ private int missingIdsMask;
+ private long nextId;
+ private byte[][] randomEvictedIds;
+
+ @Setup
+ public void init() throws Exception {
+ cache = persist ?
+ DuplicateIDCaches.persistent(SimpleString.toSimpleString("benchmark"), size, new NullStorageManager()) :
+ DuplicateIDCaches.inMemory(SimpleString.toSimpleString("benchmark"), size);
+ final int idSize = findNextHigherPowerOf2(size);
+ idsMask = idSize - 1;
+ nextId = 0;
+ ids = new byte[idSize][];
+ // fill the cache
+ for (int i = 0; i < size; i++) {
+ final byte[] id = RandomUtil.randomBytes();
+ ids[i] = id;
+ cache.addToCache(id, null, true);
+ }
+ // evict the first (idSize - size) elements on the ids array.
+ // Given that being a FIFO cache isn't a stable contract it's going to validate it too.
+ final int evicted = idSize - size;
+ for (int i = 0; i < evicted; i++) {
+ final byte[] id = RandomUtil.randomBytes();
+ ids[size + i] = id;
+ cache.addToCache(id, null, true);
+ // check correctness of eviction policy
+ if (cache.contains(ids[i])) {
+ throw new AssertionError("This cache isn't using anymore a FIFO eviction strategy or its real capacity is > " + size);
+ }
+ }
+ // always use the same seed!
+ SplittableRandom random = new SplittableRandom(0);
+ // set it big enough to trick branch predictors
+ final int evictedIdsLength = findNextPowerOf2(Math.max(1024, evicted));
+ missingIdsMask = evictedIdsLength - 1;
+ randomEvictedIds = new byte[evictedIdsLength][];
+ for (int i = 0; i < evictedIdsLength; i++) {
+ final int id = random.nextInt(0, evicted);
+ randomEvictedIds[i] = ids[id];
+ // check correctness of eviction policy
+ if (cache.contains(ids[id])) {
+ throw new AssertionError("This cache isn't using anymore a FIFO eviction strategy");
+ }
+ }
+ }
+
+ // it isn't checking what's the max power of 2 number nor if size > 0
+ private static int findNextHigherPowerOf2(int size) {
+ final int nextPow2 = findNextPowerOf2(size);
+ if (nextPow2 > size) {
+ return nextPow2;
+ }
+ return nextPow2 * 2;
+ }
+
+ private static int findNextPowerOf2(int size) {
+ return 1 << (32 - Integer.numberOfLeadingZeros(size - 1));
+ }
+
+ private byte[] nextId() {
+ final long seq = nextId;
+ final int index = (int) (seq & idsMask);
+ nextId = seq + 1;
+ return ids[index];
+ }
+
+ private byte[] nextMissingId() {
+ final long seq = nextId;
+ final int index = (int) (seq & missingIdsMask);
+ nextId = seq + 1;
+ return randomEvictedIds[index];
+ }
+
+ @Benchmark
+ public boolean atomicVerify() throws Exception {
+ return cache.atomicVerify(nextId(), null);
+ }
+
+ @Benchmark
+ public boolean containsMissingId() {
+ return cache.contains(nextMissingId());
+ }
+
+ @TearDown
+ public void clear() throws Exception {
+ cache.clear();
+ }
+
+}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
index 569f46983a..76c861952f 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
@@ -31,8 +31,9 @@ import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
-import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches;
import org.apache.activemq.artemis.core.server.impl.PostOfficeJournalLoader;
import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl;
import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakePostOffice;
@@ -40,8 +41,8 @@ import org.apache.activemq.artemis.tests.unit.util.FakePagingManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ExecutorFactory;
-import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
import org.junit.After;
import org.junit.Assert;
@@ -106,7 +107,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
Assert.assertEquals(0, mapDups.size());
- DuplicateIDCacheImpl cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true);
+ DuplicateIDCache cacheID = DuplicateIDCaches.persistent(ADDRESS, 10, journal);
for (int i = 0; i < 100; i++) {
cacheID.addToCache(RandomUtil.randomBytes());
@@ -126,7 +127,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
Assert.assertEquals(10, values.size());
- cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true);
+ cacheID = DuplicateIDCaches.persistent(ADDRESS, 10, journal);
cacheID.load(values);
for (int i = 0; i < 100; i++) {
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 5220a1a911..d7aa3cceae 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -28,14 +28,13 @@ import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
-import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -201,7 +200,7 @@ public class FakePostOffice implements PostOffice {
@Override
public DuplicateIDCache getDuplicateIDCache(final SimpleString address) {
- return new DuplicateIDCacheImpl(address, 2000, new NullStorageManager(), false);
+ return DuplicateIDCaches.inMemory(address, 2000);
}
@Override