From 2b5d99bbd1697728da071ecafa585b0b6168979d Mon Sep 17 00:00:00 2001 From: franz1981 Date: Wed, 2 Dec 2020 21:38:11 +0100 Subject: [PATCH] ARTEMIS-3016 Refactored duplicate ids cache --- .../core/postoffice/DuplicateIDCache.java | 2 +- .../core/postoffice/impl/ByteArray.java | 50 ++ .../postoffice/impl/DuplicateIDCacheImpl.java | 448 ------------------ .../postoffice/impl/DuplicateIDCaches.java | 39 ++ .../impl/InMemoryDuplicateIDCache.java | 276 +++++++++++ .../core/postoffice/impl/IntegerCache.java | 62 +++ .../impl/PersistentDuplicateIDCache.java | 394 +++++++++++++++ .../core/postoffice/impl/PostOfficeImpl.java | 44 +- .../persistence/DuplicateCacheTest.java | 6 +- .../jmh/DuplicateIDCacheBenchmark.java | 6 +- .../impl/DuplicateDetectionUnitTest.java | 9 +- .../server/impl/fakes/FakePostOffice.java | 5 +- 12 files changed, 860 insertions(+), 481 deletions(-) create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/ByteArray.java delete mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCaches.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/IntegerCache.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java index b384896efa..75cc17b7d3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java @@ -40,7 +40,7 @@ public interface DuplicateIDCache { void deleteFromCache(byte[] duplicateID) throws Exception; - void load(List> theIds) throws Exception; + void load(List> ids) throws Exception; void load(Transaction tx, byte[] duplID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/ByteArray.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/ByteArray.java new file mode 100644 index 0000000000..8f2c67e38f --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/ByteArray.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.postoffice.impl; + +import org.apache.activemq.artemis.utils.ByteUtil; + +final class ByteArray { + + final byte[] bytes; + + private int hash; + + ByteArray(final byte[] bytes) { + this.bytes = bytes; + } + + @Override + public boolean equals(final Object other) { + if (other instanceof ByteArray) { + ByteArray s = (ByteArray) other; + + return ByteUtil.equals(bytes, s.bytes); + } else { + return false; + } + } + + @Override + public int hashCode() { + if (hash == 0) { + hash = ByteUtil.hashCode(bytes); + } + + return hash; + } +} \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java deleted file mode 100644 index 4dfc765026..0000000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java +++ /dev/null @@ -1,448 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.postoffice.impl; - -import java.lang.ref.Reference; -import java.lang.ref.WeakReference; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; -import org.apache.activemq.artemis.api.core.ObjLongPair; -import org.apache.activemq.artemis.api.core.Pair; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.persistence.StorageManager; -import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; -import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.transaction.Transaction; -import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; -import org.apache.activemq.artemis.utils.ByteUtil; -import org.jboss.logging.Logger; - -import static org.apache.activemq.artemis.api.core.ObjLongPair.NIL; - -/** - * A DuplicateIDCacheImpl - * - * A fixed size rotating cache of last X duplicate ids. - */ -public class DuplicateIDCacheImpl implements DuplicateIDCache { - - private static final Logger logger = Logger.getLogger(DuplicateIDCacheImpl.class); - // we're not interested into safe publication here: we need to scale, be fast and save "some" GC to happen - private static WeakReference INDEXES = null; - - private static Integer[] boxedInts(int size) { - final Reference indexesRef = INDEXES; - final Integer[] indexes = indexesRef == null ? null : indexesRef.get(); - if (indexes != null && size <= indexes.length) { - return indexes; - } - final int newSize = size + (indexes == null ? 0 : size / 2); - final Integer[] newIndexes = new Integer[newSize]; - if (indexes != null) { - System.arraycopy(indexes, 0, newIndexes, 0, indexes.length); - } - INDEXES = new WeakReference<>(newIndexes); - return newIndexes; - } - - // ByteHolder, position - private final Map cache = new ConcurrentHashMap<>(); - - private final SimpleString address; - - // Note - deliberately typed as ArrayList since we want to ensure fast indexed - // based array access - private final ArrayList> ids; - - private final Integer[] cachedBoxedInts; - - private int pos; - - private final int cacheSize; - - private final StorageManager storageManager; - - private final boolean persist; - - public DuplicateIDCacheImpl(final SimpleString address, - final int size, - final StorageManager storageManager, - final boolean persist) { - this.address = address; - - cacheSize = size; - - ids = new ArrayList<>(size); - - cachedBoxedInts = boxedInts(size); - - this.storageManager = storageManager; - - this.persist = persist; - } - - // best effort caching mechanism - private Integer boxed(int index) { - Integer boxedInt = this.cachedBoxedInts[index]; - if (boxedInt == null) { - boxedInt = index; - cachedBoxedInts[index] = boxedInt; - } - assert boxedInt != null; - return boxedInt; - } - - @Override - public void load(final List> theIds) throws Exception { - long txID = -1; - - // If we have more IDs than cache size, we shrink the first ones - int deleteCount = theIds.size() - cacheSize; - if (deleteCount < 0) { - deleteCount = 0; - } - - for (Pair id : theIds) { - if (deleteCount > 0) { - if (txID == -1) { - txID = storageManager.generateID(); - } - if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl::load deleting id=" + describeID(id.getA(), id.getB())); - } - - storageManager.deleteDuplicateIDTransactional(txID, id.getB()); - deleteCount--; - } else { - ByteArrayHolder bah = new ByteArrayHolder(id.getA()); - - ObjLongPair pair = new ObjLongPair<>(bah, id.getB()); - - cache.put(bah, boxed(ids.size())); - - ids.add(pair); - if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl::load loading id=" + describeID(id.getA(), id.getB())); - } - } - - } - - if (txID != -1) { - storageManager.commit(txID); - } - - pos = ids.size(); - - if (pos == cacheSize) { - pos = 0; - } - - } - - @Override - public void deleteFromCache(byte[] duplicateID) throws Exception { - if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl::deleteFromCache deleting id=" + describeID(duplicateID, 0)); - } - - ByteArrayHolder bah = new ByteArrayHolder(duplicateID); - - Integer posUsed = cache.remove(bah); - - if (posUsed != null) { - ObjLongPair id; - - synchronized (this) { - id = ids.get(posUsed.intValue()); - - if (id.getA().equals(bah)) { - id.setA(null); - storageManager.deleteDuplicateID(id.getB()); - if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl(" + this.address + ")::deleteFromCache deleting id=" + describeID(duplicateID, id.getB())); - } - id.setB(NIL); - } - } - } - - } - - private String describeID(byte[] duplicateID, long id) { - if (id != 0) { - return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID); - } else { - return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID) + ", id=" + id; - } - } - - @Override - public boolean contains(final byte[] duplID) { - return contains(new ByteArrayHolder(duplID)); - } - - private boolean contains(final ByteArrayHolder duplID) { - boolean contains = cache.containsKey(duplID); - - if (logger.isTraceEnabled()) { - if (contains) { - logger.trace("DuplicateIDCacheImpl(" + this.address + ")::contains found a duplicate " + describeID(duplID.bytes, 0)); - } - } - return contains; - } - - @Override - public void addToCache(final byte[] duplID) throws Exception { - addToCache(duplID, null, false); - } - - @Override - public void addToCache(final byte[] duplID, final Transaction tx) throws Exception { - addToCache(duplID, tx, false); - } - - @Override - public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception { - final ByteArrayHolder holder = new ByteArrayHolder(duplID); - if (contains(holder)) { - if (tx != null) { - tx.markAsRollbackOnly(new ActiveMQDuplicateIdException()); - } - return false; - } - addToCache(holder, tx, true); - return true; - } - - @Override - public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception { - addToCache(new ByteArrayHolder(duplID), tx, instantAdd); - } - - private synchronized void addToCache(final ByteArrayHolder holder, - final Transaction tx, - boolean instantAdd) throws Exception { - long recordID = -1; - if (tx == null) { - if (persist) { - recordID = storageManager.generateID(); - storageManager.storeDuplicateID(address, holder.bytes, recordID); - } - - addToCacheInMemory(holder, recordID); - } else { - if (persist) { - recordID = storageManager.generateID(); - storageManager.storeDuplicateIDTransactional(tx.getID(), address, holder.bytes, recordID); - - tx.setContainsPersistent(); - } - - if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(holder.bytes, recordID) + ", tx=" + tx); - } - - if (instantAdd) { - tx.addOperation(new AddDuplicateIDOperation(holder, recordID, false)); - } else { - // For a tx, it's important that the entry is not added to the cache until commit - // since if the client fails then resends them tx we don't want it to get rejected - tx.afterStore(new AddDuplicateIDOperation(holder, recordID, true)); - } - } - } - - @Override - public void load(final Transaction tx, final byte[] duplID) { - tx.addOperation(new AddDuplicateIDOperation(new ByteArrayHolder(duplID), tx.getID(), true)); - } - - private synchronized void addToCacheInMemory(final ByteArrayHolder holder, final long recordID) { - if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding " + describeID(holder.bytes, recordID)); - } - - cache.put(holder, boxed(pos)); - - ObjLongPair id; - - if (pos < ids.size()) { - // Need fast array style access here -hence ArrayList typing - id = ids.get(pos); - - // The id here might be null if it was explicit deleted - if (id.getA() != null) { - if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory removing excess duplicateDetection " + describeID(id.getA().bytes, id.getB())); - } - - cache.remove(id.getA()); - - // Record already exists - we delete the old one and add the new one - // Note we can't use update since journal update doesn't let older records get - // reclaimed - - if (id.getB() != NIL) { - try { - storageManager.deleteDuplicateID(id.getB()); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorDeletingDuplicateCache(e); - } - } - } - - id.setA(holder); - - // The recordID could be negative if the duplicateCache is configured to not persist, - // -1 would mean null on this case - id.setB(recordID >= 0 ? recordID : NIL); - - if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory replacing old duplicateID by " + describeID(id.getA().bytes, id.getB())); - } - - } else { - id = new ObjLongPair<>(holder, recordID >= 0 ? recordID : NIL); - - if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding new duplicateID " + describeID(id.getA().bytes, id.getB())); - } - - ids.add(id); - - } - - if (pos++ == cacheSize - 1) { - pos = 0; - } - } - - @Override - public void clear() throws Exception { - logger.debug("DuplicateIDCacheImpl(" + this.address + ")::clear removing duplicate ID data"); - synchronized (this) { - final int idsSize = ids.size(); - if (idsSize > 0 && persist) { - long tx = storageManager.generateID(); - for (int i = 0; i < idsSize; i++) { - final ObjLongPair id = ids.get(i); - if (id != null && id.getB() != NIL) { - storageManager.deleteDuplicateIDTransactional(tx, id.getB()); - } - } - storageManager.commit(tx); - } - - ids.clear(); - cache.clear(); - pos = 0; - } - } - - @Override - public List> getMap() { - final int idsSize = ids.size(); - List> copy = new ArrayList<>(idsSize); - for (int i = 0; i < idsSize; i++) { - final ObjLongPair id = ids.get(i); - copy.add(new Pair<>(id.getA().bytes, id.getB() == NIL ? null : id.getB())); - } - return copy; - } - - private final class AddDuplicateIDOperation extends TransactionOperationAbstract { - - final ByteArrayHolder holder; - - final long recordID; - - volatile boolean done; - - private final boolean afterCommit; - - AddDuplicateIDOperation(final ByteArrayHolder holder, final long recordID, boolean afterCommit) { - this.holder = holder; - this.recordID = recordID; - this.afterCommit = afterCommit; - } - - private void process() { - if (!done) { - addToCacheInMemory(holder, recordID); - - done = true; - } - } - - @Override - public void afterCommit(final Transaction tx) { - if (afterCommit) { - process(); - } - } - - @Override - public void beforeCommit(Transaction tx) throws Exception { - if (!afterCommit) { - process(); - } - } - - @Override - public List getRelatedMessageReferences() { - return null; - } - } - - private static final class ByteArrayHolder { - - private final byte[] bytes; - - private int hash; - - ByteArrayHolder(final byte[] bytes) { - this.bytes = bytes; - } - - @Override - public boolean equals(final Object other) { - if (other instanceof ByteArrayHolder) { - ByteArrayHolder s = (ByteArrayHolder) other; - - return ByteUtil.equals(bytes, s.bytes); - } else { - return false; - } - } - - @Override - public int hashCode() { - if (hash == 0) { - hash = ByteUtil.hashCode(bytes); - } - - return hash; - } - } -} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCaches.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCaches.java new file mode 100644 index 0000000000..79b55e8145 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCaches.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.postoffice.impl; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; + +public final class DuplicateIDCaches { + + private DuplicateIDCaches() { + + } + + public static DuplicateIDCache persistent(final SimpleString address, + final int size, + final StorageManager storageManager) { + return new PersistentDuplicateIDCache(address, size, storageManager); + } + + public static DuplicateIDCache inMemory(final SimpleString address, final int size) { + return new InMemoryDuplicateIDCache(address, size); + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java new file mode 100644 index 0000000000..f1f86142df --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.postoffice.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.IntFunction; + +import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; +import org.apache.activemq.artemis.utils.ByteUtil; +import org.jboss.logging.Logger; + +import static org.apache.activemq.artemis.core.postoffice.impl.IntegerCache.boxedInts; + +/** + * {@link InMemoryDuplicateIDCache} and {@link PersistentDuplicateIDCache} impls have been separated for performance + * and memory footprint reasons.
+ * Instead of using a single {@link DuplicateIDCache} impl, we've let 2 different impls to contain just the bare + * minimum data in order to have 2 different memory footprint costs at runtime, while making easier to track dependencies + * eg in-memory cache won't need any {@link StorageManager} because no storage operations are expected to happen. + */ +final class InMemoryDuplicateIDCache implements DuplicateIDCache { + + private static final Logger LOGGER = Logger.getLogger(InMemoryDuplicateIDCache.class); + + private final Map cache = new ConcurrentHashMap<>(); + + private final SimpleString address; + + private final ArrayList ids; + + private final IntFunction cachedBoxedInts; + + private int pos; + + private final int cacheSize; + + InMemoryDuplicateIDCache(final SimpleString address, final int size) { + this.address = address; + + cacheSize = size; + + ids = new ArrayList<>(size); + + cachedBoxedInts = boxedInts(size); + } + + @Override + public void load(List> ids) throws Exception { + LOGGER.debugf("address = %s ignore loading ids: in memory cache won't load previously stored ids", address); + } + + @Override + public void deleteFromCache(byte[] duplicateID) { + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("deleting id = %s", describeID(duplicateID)); + } + + ByteArray bah = new ByteArray(duplicateID); + + Integer posUsed = cache.remove(bah); + + if (posUsed != null) { + ByteArray id; + + synchronized (this) { + final int index = posUsed.intValue(); + id = ids.get(index); + + if (id.equals(bah)) { + ids.set(index, null); + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s deleting id=", address, describeID(duplicateID)); + } + } + } + } + + } + + private static String describeID(byte[] duplicateID) { + return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID); + } + + @Override + public boolean contains(final byte[] duplID) { + return contains(new ByteArray(duplID)); + } + + private boolean contains(final ByteArray id) { + boolean contains = cache.containsKey(id); + + if (LOGGER.isTraceEnabled()) { + if (contains) { + LOGGER.tracef("address = %s found a duplicate ", address, describeID(id.bytes)); + } + } + return contains; + } + + @Override + public void addToCache(final byte[] duplID) throws Exception { + addToCache(duplID, null, false); + } + + @Override + public void addToCache(final byte[] duplID, final Transaction tx) throws Exception { + addToCache(duplID, tx, false); + } + + @Override + public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) { + final ByteArray holder = new ByteArray(duplID); + if (contains(holder)) { + if (tx != null) { + tx.markAsRollbackOnly(new ActiveMQDuplicateIdException()); + } + return false; + } + addToCache(holder, tx, true); + return true; + } + + @Override + public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception { + addToCache(new ByteArray(duplID), tx, instantAdd); + } + + private synchronized void addToCache(final ByteArray holder, final Transaction tx, boolean instantAdd) { + if (tx == null) { + addToCacheInMemory(holder); + } else { + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s adding duplicateID TX operation for %s, tx = %s", address, describeID(holder.bytes), tx); + } + + if (instantAdd) { + tx.addOperation(new AddDuplicateIDOperation(holder, false)); + } else { + // For a tx, it's important that the entry is not added to the cache until commit + // since if the client fails then resends them tx we don't want it to get rejected + tx.afterStore(new AddDuplicateIDOperation(holder, true)); + } + } + } + + @Override + public void load(final Transaction tx, final byte[] duplID) { + tx.addOperation(new AddDuplicateIDOperation(new ByteArray(duplID), true)); + } + + private synchronized void addToCacheInMemory(final ByteArray holder) { + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s adding %s", address, describeID(holder.bytes)); + } + + cache.put(holder, cachedBoxedInts.apply(pos)); + + if (pos < ids.size()) { + // Need fast array style access here -hence ArrayList typing + final ByteArray id = ids.set(pos, holder); + + // The id here might be null if it was explicit deleted + if (id != null) { + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s removing excess duplicateDetection %s", address, describeID(id.bytes)); + } + + cache.remove(id); + } + + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s replacing old duplicateID by %s", describeID(holder.bytes)); + } + + } else { + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s adding new duplicateID %s", describeID(holder.bytes)); + } + + ids.add(holder); + } + + if (pos++ == cacheSize - 1) { + pos = 0; + } + } + + @Override + public synchronized void clear() throws Exception { + if (LOGGER.isDebugEnabled()) { + LOGGER.debugf("address = %s removing duplicate ID data", address); + } + ids.clear(); + cache.clear(); + pos = 0; + } + + @Override + public synchronized List> getMap() { + final int idsSize = ids.size(); + List> copy = new ArrayList<>(idsSize); + for (int i = 0; i < idsSize; i++) { + final ByteArray id = ids.get(i); + // in case the id has been removed + if (id != null) { + copy.add(new Pair<>(id.bytes, null)); + } + } + return copy; + } + + private final class AddDuplicateIDOperation extends TransactionOperationAbstract { + + final ByteArray id; + + volatile boolean done; + + private final boolean afterCommit; + + AddDuplicateIDOperation(final ByteArray id, boolean afterCommit) { + this.id = id; + this.afterCommit = afterCommit; + } + + private void process() { + if (!done) { + addToCacheInMemory(id); + + done = true; + } + } + + @Override + public void afterCommit(final Transaction tx) { + if (afterCommit) { + process(); + } + } + + @Override + public void beforeCommit(Transaction tx) throws Exception { + if (!afterCommit) { + process(); + } + } + + @Override + public List getRelatedMessageReferences() { + return null; + } + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/IntegerCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/IntegerCache.java new file mode 100644 index 0000000000..a726552a54 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/IntegerCache.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.postoffice.impl; + +import java.lang.ref.Reference; +import java.lang.ref.WeakReference; +import java.util.function.IntFunction; + +final class IntegerCache { + + private static final boolean DISABLE_INTEGER_CACHE = Boolean.valueOf(System.getProperty("disable.integer.cache", Boolean.FALSE.toString())); + + // we're not interested into safe publication here: we need to scale, be fast and save "some" GC to happen + private static WeakReference INDEXES = null; + + private static Integer[] ints(int size) { + final Reference indexesRef = INDEXES; + final Integer[] indexes = indexesRef == null ? null : indexesRef.get(); + if (indexes != null && size <= indexes.length) { + return indexes; + } + final int newSize = size + (indexes == null ? 0 : size / 2); + final Integer[] newIndexes = new Integer[newSize]; + if (indexes != null) { + System.arraycopy(indexes, 0, newIndexes, 0, indexes.length); + } + INDEXES = new WeakReference<>(newIndexes); + return newIndexes; + } + + public static IntFunction boxedInts(int size) { + if (DISABLE_INTEGER_CACHE) { + return Integer::valueOf; + } + // use a lambda to have an trusted const field for free + final Integer[] cachedInts = ints(size); + return index -> { + Integer boxedInt = cachedInts[index]; + if (boxedInt == null) { + boxedInt = index; + cachedInts[index] = boxedInt; + } + assert boxedInt != null; + return boxedInt; + }; + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java new file mode 100644 index 0000000000..3e3758d3c9 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.postoffice.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.IntFunction; + +import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; +import org.apache.activemq.artemis.api.core.ObjLongPair; +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; +import org.apache.activemq.artemis.utils.ByteUtil; +import org.jboss.logging.Logger; + +import static org.apache.activemq.artemis.api.core.ObjLongPair.NIL; +import static org.apache.activemq.artemis.core.postoffice.impl.IntegerCache.boxedInts; + +/** + * {@link InMemoryDuplicateIDCache} and {@link PersistentDuplicateIDCache} impls have been separated for performance + * and memory footprint reasons.
+ * Instead of using a single {@link DuplicateIDCache} impl, we've let 2 different impls to contain just the bare + * minimum data in order to have 2 different memory footprint costs at runtime, while making easier to track dependencies + * eg in-memory cache won't need any {@link StorageManager} because no storage operations are expected to happen. + */ +final class PersistentDuplicateIDCache implements DuplicateIDCache { + + private static final Logger LOGGER = Logger.getLogger(PersistentDuplicateIDCache.class); + + private final Map cache = new ConcurrentHashMap<>(); + + private final SimpleString address; + + private final ArrayList> ids; + + private final IntFunction cachedBoxedInts; + + private int pos; + + private final int cacheSize; + + private final StorageManager storageManager; + + PersistentDuplicateIDCache(final SimpleString address, final int size, final StorageManager storageManager) { + this.address = address; + + cacheSize = size; + + ids = new ArrayList<>(size); + + cachedBoxedInts = boxedInts(size); + + this.storageManager = storageManager; + } + + @Override + public synchronized void load(final List> ids) throws Exception { + if (!cache.isEmpty()) { + throw new IllegalStateException("load is valid only on empty cache"); + } + // load only ids that fit this cache: + // - in term of remaining capacity + // - ignoring (and reporting) ids unpaired with record ID + // Then, delete the exceeding ones. + + long txID = -1; + + int toNotBeAdded = ids.size() - cacheSize; + if (toNotBeAdded < 0) { + toNotBeAdded = 0; + } + + for (Pair id : ids) { + if (id.getB() == null) { + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("ignoring id = %s because without record ID", describeID(id.getA())); + } + if (toNotBeAdded > 0) { + toNotBeAdded--; + } + continue; + } + assert id.getB() != null && id.getB().longValue() != NIL; + if (toNotBeAdded > 0) { + if (txID == -1) { + txID = storageManager.generateID(); + } + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("deleting id = %s", describeID(id.getA(), id.getB())); + } + + storageManager.deleteDuplicateIDTransactional(txID, id.getB()); + toNotBeAdded--; + } else { + ByteArray bah = new ByteArray(id.getA()); + + ObjLongPair pair = new ObjLongPair<>(bah, id.getB()); + + cache.put(bah, cachedBoxedInts.apply(this.ids.size())); + + this.ids.add(pair); + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("loading id = %s", describeID(id.getA(), id.getB())); + } + } + + } + + if (txID != -1) { + storageManager.commit(txID); + } + + pos = this.ids.size(); + + if (pos == cacheSize) { + pos = 0; + } + + } + + @Override + public void deleteFromCache(byte[] duplicateID) throws Exception { + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("deleting id = %s", describeID(duplicateID)); + } + + final ByteArray bah = new ByteArray(duplicateID); + + final Integer posUsed = cache.remove(bah); + + if (posUsed != null) { + synchronized (this) { + final ObjLongPair id = ids.get(posUsed.intValue()); + + if (id.getA().equals(bah)) { + final long recordID = id.getB(); + id.setA(null); + id.setB(NIL); + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s deleting id = %s", address, describeID(duplicateID, id.getB())); + } + storageManager.deleteDuplicateID(recordID); + } + } + } + + } + + private static String describeID(byte[] duplicateID) { + return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID); + } + + private static String describeID(byte[] duplicateID, long id) { + return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID) + ", id=" + id; + } + + @Override + public boolean contains(final byte[] duplID) { + return contains(new ByteArray(duplID)); + } + + private boolean contains(final ByteArray duplID) { + boolean contains = cache.containsKey(duplID); + + if (LOGGER.isTraceEnabled()) { + if (contains) { + LOGGER.tracef("address = %s found a duplicate %s", address, describeID(duplID.bytes)); + } + } + return contains; + } + + @Override + public void addToCache(final byte[] duplID) throws Exception { + addToCache(duplID, null, false); + } + + @Override + public void addToCache(final byte[] duplID, final Transaction tx) throws Exception { + addToCache(duplID, tx, false); + } + + @Override + public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception { + final ByteArray holder = new ByteArray(duplID); + if (contains(holder)) { + if (tx != null) { + tx.markAsRollbackOnly(new ActiveMQDuplicateIdException()); + } + return false; + } + addToCache(holder, tx, true); + return true; + } + + @Override + public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception { + addToCache(new ByteArray(duplID), tx, instantAdd); + } + + private synchronized void addToCache(final ByteArray holder, + final Transaction tx, + boolean instantAdd) throws Exception { + final long recordID = storageManager.generateID(); + if (tx == null) { + storageManager.storeDuplicateID(address, holder.bytes, recordID); + + addToCacheInMemory(holder, recordID); + } else { + storageManager.storeDuplicateIDTransactional(tx.getID(), address, holder.bytes, recordID); + + tx.setContainsPersistent(); + + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s adding duplicateID TX operation for %s, tx = %s", address, + describeID(holder.bytes, recordID), tx); + } + + if (instantAdd) { + tx.addOperation(new AddDuplicateIDOperation(holder, recordID, false)); + } else { + // For a tx, it's important that the entry is not added to the cache until commit + // since if the client fails then resends them tx we don't want it to get rejected + tx.afterStore(new AddDuplicateIDOperation(holder, recordID, true)); + } + } + } + + @Override + public void load(final Transaction tx, final byte[] duplID) { + tx.addOperation(new AddDuplicateIDOperation(new ByteArray(duplID), tx.getID(), true)); + } + + private synchronized void addToCacheInMemory(final ByteArray holder, final long recordID) { + Objects.requireNonNull(holder, "holder must be not null"); + if (recordID < 0) { + throw new IllegalArgumentException("recordID must be >= 0"); + } + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s adding %s", address, describeID(holder.bytes, recordID)); + } + + cache.put(holder, cachedBoxedInts.apply(pos)); + + ObjLongPair id; + + if (pos < ids.size()) { + // Need fast array style access here -hence ArrayList typing + id = ids.get(pos); + + // The id here might be null if it was explicit deleted + if (id.getA() != null) { + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s removing excess duplicateDetection %s", address, describeID(id.getA().bytes, id.getB())); + } + + cache.remove(id.getA()); + + assert id.getB() != NIL; + try { + storageManager.deleteDuplicateID(id.getB()); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorDeletingDuplicateCache(e); + } + } + + id.setA(holder); + + id.setB(recordID); + + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s replacing old duplicateID by %s", address, describeID(id.getA().bytes, id.getB())); + } + + } else { + id = new ObjLongPair<>(holder, recordID); + + if (LOGGER.isTraceEnabled()) { + LOGGER.tracef("address = %s adding new duplicateID %s", address, describeID(id.getA().bytes, id.getB())); + } + + ids.add(id); + + } + + if (pos++ == cacheSize - 1) { + pos = 0; + } + } + + @Override + public synchronized void clear() throws Exception { + LOGGER.debugf("address = %s removing duplicate ID data", address); + final int idsSize = ids.size(); + if (idsSize > 0) { + long tx = storageManager.generateID(); + for (int i = 0; i < idsSize; i++) { + final ObjLongPair id = ids.get(i); + if (id.getA() != null) { + assert id.getB() != NIL; + storageManager.deleteDuplicateIDTransactional(tx, id.getB()); + } + } + storageManager.commit(tx); + } + + ids.clear(); + cache.clear(); + pos = 0; + } + + @Override + public synchronized List> getMap() { + final int idsSize = ids.size(); + List> copy = new ArrayList<>(idsSize); + for (int i = 0; i < idsSize; i++) { + final ObjLongPair id = ids.get(i); + // in case the pair has been removed + if (id.getA() != null) { + assert id.getB() != NIL; + copy.add(new Pair<>(id.getA().bytes, id.getB())); + } + } + return copy; + } + + private final class AddDuplicateIDOperation extends TransactionOperationAbstract { + + final ByteArray holder; + + final long recordID; + + volatile boolean done; + + private final boolean afterCommit; + + AddDuplicateIDOperation(final ByteArray holder, final long recordID, boolean afterCommit) { + this.holder = holder; + this.recordID = recordID; + this.afterCommit = afterCommit; + } + + private void process() { + if (!done) { + addToCacheInMemory(holder, recordID); + + done = true; + } + } + + @Override + public void afterCommit(final Transaction tx) { + if (afterCommit) { + process(); + } + } + + @Override + public void beforeCommit(Transaction tx) throws Exception { + if (!afterCommit) { + process(); + } + } + + @Override + public List getRelatedMessageReferences() { + return null; + } + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 187ee584e6..07c0bdf2ad 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -16,6 +16,25 @@ */ package org.apache.activemq.artemis.core.postoffice.impl; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; + import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; @@ -82,25 +101,6 @@ import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.jboss.logging.Logger; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Stream; - import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf; /** @@ -1315,7 +1315,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding DuplicateIDCache cache = duplicateIDCaches.get(address); if (cache == null) { - cache = new DuplicateIDCacheImpl(address, idCacheSize, storageManager, persistIDCache); + if (persistIDCache) { + cache = DuplicateIDCaches.persistent(address, idCacheSize, storageManager); + } else { + cache = DuplicateIDCaches.inMemory(address, idCacheSize); + } DuplicateIDCache oldCache = duplicateIDCaches.putIfAbsent(address, cache); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java index 8300bdb355..39fe106999 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java @@ -24,7 +24,7 @@ import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; -import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl; +import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RetryRule; @@ -52,7 +52,7 @@ public class DuplicateCacheTest extends StorageManagerTestBase { public void testDuplicate() throws Exception { createStorage(); - DuplicateIDCache cache = new DuplicateIDCacheImpl(new SimpleString("test"), 2000, journal, true); + DuplicateIDCache cache = DuplicateIDCaches.persistent(new SimpleString("test"), 2000, journal); TransactionImpl tx = new TransactionImpl(journal); @@ -108,7 +108,7 @@ public class DuplicateCacheTest extends StorageManagerTestBase { public void testDuplicateNonPersistent() throws Exception { createStorage(); - DuplicateIDCache cache = new DuplicateIDCacheImpl(new SimpleString("test"), 2000, journal, false); + DuplicateIDCache cache = DuplicateIDCaches.inMemory(new SimpleString("test"), 2000); TransactionImpl tx = new TransactionImpl(journal); diff --git a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/DuplicateIDCacheBenchmark.java b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/DuplicateIDCacheBenchmark.java index 21505bf3d8..0917267a49 100644 --- a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/DuplicateIDCacheBenchmark.java +++ b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/DuplicateIDCacheBenchmark.java @@ -21,7 +21,7 @@ import java.util.SplittableRandom; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; -import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl; +import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches; import org.apache.activemq.artemis.utils.RandomUtil; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Fork; @@ -54,7 +54,9 @@ public class DuplicateIDCacheBenchmark { @Setup public void init() throws Exception { - cache = new DuplicateIDCacheImpl(SimpleString.toSimpleString("benchmark"), size, new NullStorageManager(), persist); + cache = persist ? + DuplicateIDCaches.persistent(SimpleString.toSimpleString("benchmark"), size, new NullStorageManager()) : + DuplicateIDCaches.inMemory(SimpleString.toSimpleString("benchmark"), size); final int idSize = findNextHigherPowerOf2(size); idsMask = idSize - 1; nextId = 0; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java index 569f46983a..76c861952f 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java @@ -31,8 +31,9 @@ import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; +import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.PostOffice; -import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl; +import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches; import org.apache.activemq.artemis.core.server.impl.PostOfficeJournalLoader; import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl; import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakePostOffice; @@ -40,8 +41,8 @@ import org.apache.activemq.artemis.tests.unit.util.FakePagingManager; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ExecutorFactory; -import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer; import org.junit.After; import org.junit.Assert; @@ -106,7 +107,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase { Assert.assertEquals(0, mapDups.size()); - DuplicateIDCacheImpl cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true); + DuplicateIDCache cacheID = DuplicateIDCaches.persistent(ADDRESS, 10, journal); for (int i = 0; i < 100; i++) { cacheID.addToCache(RandomUtil.randomBytes()); @@ -126,7 +127,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase { Assert.assertEquals(10, values.size()); - cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true); + cacheID = DuplicateIDCaches.persistent(ADDRESS, 10, journal); cacheID.load(values); for (int i = 0; i < 100; i++) { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index 5220a1a911..d7aa3cceae 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -28,14 +28,13 @@ import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; -import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl; +import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; @@ -201,7 +200,7 @@ public class FakePostOffice implements PostOffice { @Override public DuplicateIDCache getDuplicateIDCache(final SimpleString address) { - return new DuplicateIDCacheImpl(address, 2000, new NullStorageManager(), false); + return DuplicateIDCaches.inMemory(address, 2000); } @Override