ARTEMIS-3016 Refactored duplicate ids cache
This commit is contained in:
parent
b3b5d4893c
commit
2b5d99bbd1
|
@ -40,7 +40,7 @@ public interface DuplicateIDCache {
|
||||||
|
|
||||||
void deleteFromCache(byte[] duplicateID) throws Exception;
|
void deleteFromCache(byte[] duplicateID) throws Exception;
|
||||||
|
|
||||||
void load(List<Pair<byte[], Long>> theIds) throws Exception;
|
void load(List<Pair<byte[], Long>> ids) throws Exception;
|
||||||
|
|
||||||
void load(Transaction tx, byte[] duplID);
|
void load(Transaction tx, byte[] duplID);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,50 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.core.postoffice.impl;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.utils.ByteUtil;
|
||||||
|
|
||||||
|
final class ByteArray {
|
||||||
|
|
||||||
|
final byte[] bytes;
|
||||||
|
|
||||||
|
private int hash;
|
||||||
|
|
||||||
|
ByteArray(final byte[] bytes) {
|
||||||
|
this.bytes = bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(final Object other) {
|
||||||
|
if (other instanceof ByteArray) {
|
||||||
|
ByteArray s = (ByteArray) other;
|
||||||
|
|
||||||
|
return ByteUtil.equals(bytes, s.bytes);
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
if (hash == 0) {
|
||||||
|
hash = ByteUtil.hashCode(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
return hash;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,448 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.artemis.core.postoffice.impl;
|
|
||||||
|
|
||||||
import java.lang.ref.Reference;
|
|
||||||
import java.lang.ref.WeakReference;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
|
|
||||||
import org.apache.activemq.artemis.api.core.ObjLongPair;
|
|
||||||
import org.apache.activemq.artemis.api.core.Pair;
|
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
|
||||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
|
||||||
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
|
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
|
||||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
|
||||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
|
||||||
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
|
|
||||||
import org.apache.activemq.artemis.utils.ByteUtil;
|
|
||||||
import org.jboss.logging.Logger;
|
|
||||||
|
|
||||||
import static org.apache.activemq.artemis.api.core.ObjLongPair.NIL;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A DuplicateIDCacheImpl
|
|
||||||
*
|
|
||||||
* A fixed size rotating cache of last X duplicate ids.
|
|
||||||
*/
|
|
||||||
public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(DuplicateIDCacheImpl.class);
|
|
||||||
// we're not interested into safe publication here: we need to scale, be fast and save "some" GC to happen
|
|
||||||
private static WeakReference<Integer[]> INDEXES = null;
|
|
||||||
|
|
||||||
private static Integer[] boxedInts(int size) {
|
|
||||||
final Reference<Integer[]> indexesRef = INDEXES;
|
|
||||||
final Integer[] indexes = indexesRef == null ? null : indexesRef.get();
|
|
||||||
if (indexes != null && size <= indexes.length) {
|
|
||||||
return indexes;
|
|
||||||
}
|
|
||||||
final int newSize = size + (indexes == null ? 0 : size / 2);
|
|
||||||
final Integer[] newIndexes = new Integer[newSize];
|
|
||||||
if (indexes != null) {
|
|
||||||
System.arraycopy(indexes, 0, newIndexes, 0, indexes.length);
|
|
||||||
}
|
|
||||||
INDEXES = new WeakReference<>(newIndexes);
|
|
||||||
return newIndexes;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ByteHolder, position
|
|
||||||
private final Map<ByteArrayHolder, Integer> cache = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
private final SimpleString address;
|
|
||||||
|
|
||||||
// Note - deliberately typed as ArrayList since we want to ensure fast indexed
|
|
||||||
// based array access
|
|
||||||
private final ArrayList<ObjLongPair<ByteArrayHolder>> ids;
|
|
||||||
|
|
||||||
private final Integer[] cachedBoxedInts;
|
|
||||||
|
|
||||||
private int pos;
|
|
||||||
|
|
||||||
private final int cacheSize;
|
|
||||||
|
|
||||||
private final StorageManager storageManager;
|
|
||||||
|
|
||||||
private final boolean persist;
|
|
||||||
|
|
||||||
public DuplicateIDCacheImpl(final SimpleString address,
|
|
||||||
final int size,
|
|
||||||
final StorageManager storageManager,
|
|
||||||
final boolean persist) {
|
|
||||||
this.address = address;
|
|
||||||
|
|
||||||
cacheSize = size;
|
|
||||||
|
|
||||||
ids = new ArrayList<>(size);
|
|
||||||
|
|
||||||
cachedBoxedInts = boxedInts(size);
|
|
||||||
|
|
||||||
this.storageManager = storageManager;
|
|
||||||
|
|
||||||
this.persist = persist;
|
|
||||||
}
|
|
||||||
|
|
||||||
// best effort caching mechanism
|
|
||||||
private Integer boxed(int index) {
|
|
||||||
Integer boxedInt = this.cachedBoxedInts[index];
|
|
||||||
if (boxedInt == null) {
|
|
||||||
boxedInt = index;
|
|
||||||
cachedBoxedInts[index] = boxedInt;
|
|
||||||
}
|
|
||||||
assert boxedInt != null;
|
|
||||||
return boxedInt;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void load(final List<Pair<byte[], Long>> theIds) throws Exception {
|
|
||||||
long txID = -1;
|
|
||||||
|
|
||||||
// If we have more IDs than cache size, we shrink the first ones
|
|
||||||
int deleteCount = theIds.size() - cacheSize;
|
|
||||||
if (deleteCount < 0) {
|
|
||||||
deleteCount = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (Pair<byte[], Long> id : theIds) {
|
|
||||||
if (deleteCount > 0) {
|
|
||||||
if (txID == -1) {
|
|
||||||
txID = storageManager.generateID();
|
|
||||||
}
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("DuplicateIDCacheImpl::load deleting id=" + describeID(id.getA(), id.getB()));
|
|
||||||
}
|
|
||||||
|
|
||||||
storageManager.deleteDuplicateIDTransactional(txID, id.getB());
|
|
||||||
deleteCount--;
|
|
||||||
} else {
|
|
||||||
ByteArrayHolder bah = new ByteArrayHolder(id.getA());
|
|
||||||
|
|
||||||
ObjLongPair<ByteArrayHolder> pair = new ObjLongPair<>(bah, id.getB());
|
|
||||||
|
|
||||||
cache.put(bah, boxed(ids.size()));
|
|
||||||
|
|
||||||
ids.add(pair);
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("DuplicateIDCacheImpl::load loading id=" + describeID(id.getA(), id.getB()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
if (txID != -1) {
|
|
||||||
storageManager.commit(txID);
|
|
||||||
}
|
|
||||||
|
|
||||||
pos = ids.size();
|
|
||||||
|
|
||||||
if (pos == cacheSize) {
|
|
||||||
pos = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void deleteFromCache(byte[] duplicateID) throws Exception {
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("DuplicateIDCacheImpl::deleteFromCache deleting id=" + describeID(duplicateID, 0));
|
|
||||||
}
|
|
||||||
|
|
||||||
ByteArrayHolder bah = new ByteArrayHolder(duplicateID);
|
|
||||||
|
|
||||||
Integer posUsed = cache.remove(bah);
|
|
||||||
|
|
||||||
if (posUsed != null) {
|
|
||||||
ObjLongPair<ByteArrayHolder> id;
|
|
||||||
|
|
||||||
synchronized (this) {
|
|
||||||
id = ids.get(posUsed.intValue());
|
|
||||||
|
|
||||||
if (id.getA().equals(bah)) {
|
|
||||||
id.setA(null);
|
|
||||||
storageManager.deleteDuplicateID(id.getB());
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::deleteFromCache deleting id=" + describeID(duplicateID, id.getB()));
|
|
||||||
}
|
|
||||||
id.setB(NIL);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private String describeID(byte[] duplicateID, long id) {
|
|
||||||
if (id != 0) {
|
|
||||||
return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID);
|
|
||||||
} else {
|
|
||||||
return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID) + ", id=" + id;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean contains(final byte[] duplID) {
|
|
||||||
return contains(new ByteArrayHolder(duplID));
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean contains(final ByteArrayHolder duplID) {
|
|
||||||
boolean contains = cache.containsKey(duplID);
|
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
if (contains) {
|
|
||||||
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::contains found a duplicate " + describeID(duplID.bytes, 0));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return contains;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void addToCache(final byte[] duplID) throws Exception {
|
|
||||||
addToCache(duplID, null, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void addToCache(final byte[] duplID, final Transaction tx) throws Exception {
|
|
||||||
addToCache(duplID, tx, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception {
|
|
||||||
final ByteArrayHolder holder = new ByteArrayHolder(duplID);
|
|
||||||
if (contains(holder)) {
|
|
||||||
if (tx != null) {
|
|
||||||
tx.markAsRollbackOnly(new ActiveMQDuplicateIdException());
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
addToCache(holder, tx, true);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {
|
|
||||||
addToCache(new ByteArrayHolder(duplID), tx, instantAdd);
|
|
||||||
}
|
|
||||||
|
|
||||||
private synchronized void addToCache(final ByteArrayHolder holder,
|
|
||||||
final Transaction tx,
|
|
||||||
boolean instantAdd) throws Exception {
|
|
||||||
long recordID = -1;
|
|
||||||
if (tx == null) {
|
|
||||||
if (persist) {
|
|
||||||
recordID = storageManager.generateID();
|
|
||||||
storageManager.storeDuplicateID(address, holder.bytes, recordID);
|
|
||||||
}
|
|
||||||
|
|
||||||
addToCacheInMemory(holder, recordID);
|
|
||||||
} else {
|
|
||||||
if (persist) {
|
|
||||||
recordID = storageManager.generateID();
|
|
||||||
storageManager.storeDuplicateIDTransactional(tx.getID(), address, holder.bytes, recordID);
|
|
||||||
|
|
||||||
tx.setContainsPersistent();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(holder.bytes, recordID) + ", tx=" + tx);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (instantAdd) {
|
|
||||||
tx.addOperation(new AddDuplicateIDOperation(holder, recordID, false));
|
|
||||||
} else {
|
|
||||||
// For a tx, it's important that the entry is not added to the cache until commit
|
|
||||||
// since if the client fails then resends them tx we don't want it to get rejected
|
|
||||||
tx.afterStore(new AddDuplicateIDOperation(holder, recordID, true));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void load(final Transaction tx, final byte[] duplID) {
|
|
||||||
tx.addOperation(new AddDuplicateIDOperation(new ByteArrayHolder(duplID), tx.getID(), true));
|
|
||||||
}
|
|
||||||
|
|
||||||
private synchronized void addToCacheInMemory(final ByteArrayHolder holder, final long recordID) {
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding " + describeID(holder.bytes, recordID));
|
|
||||||
}
|
|
||||||
|
|
||||||
cache.put(holder, boxed(pos));
|
|
||||||
|
|
||||||
ObjLongPair<ByteArrayHolder> id;
|
|
||||||
|
|
||||||
if (pos < ids.size()) {
|
|
||||||
// Need fast array style access here -hence ArrayList typing
|
|
||||||
id = ids.get(pos);
|
|
||||||
|
|
||||||
// The id here might be null if it was explicit deleted
|
|
||||||
if (id.getA() != null) {
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory removing excess duplicateDetection " + describeID(id.getA().bytes, id.getB()));
|
|
||||||
}
|
|
||||||
|
|
||||||
cache.remove(id.getA());
|
|
||||||
|
|
||||||
// Record already exists - we delete the old one and add the new one
|
|
||||||
// Note we can't use update since journal update doesn't let older records get
|
|
||||||
// reclaimed
|
|
||||||
|
|
||||||
if (id.getB() != NIL) {
|
|
||||||
try {
|
|
||||||
storageManager.deleteDuplicateID(id.getB());
|
|
||||||
} catch (Exception e) {
|
|
||||||
ActiveMQServerLogger.LOGGER.errorDeletingDuplicateCache(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
id.setA(holder);
|
|
||||||
|
|
||||||
// The recordID could be negative if the duplicateCache is configured to not persist,
|
|
||||||
// -1 would mean null on this case
|
|
||||||
id.setB(recordID >= 0 ? recordID : NIL);
|
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory replacing old duplicateID by " + describeID(id.getA().bytes, id.getB()));
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
id = new ObjLongPair<>(holder, recordID >= 0 ? recordID : NIL);
|
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding new duplicateID " + describeID(id.getA().bytes, id.getB()));
|
|
||||||
}
|
|
||||||
|
|
||||||
ids.add(id);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pos++ == cacheSize - 1) {
|
|
||||||
pos = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void clear() throws Exception {
|
|
||||||
logger.debug("DuplicateIDCacheImpl(" + this.address + ")::clear removing duplicate ID data");
|
|
||||||
synchronized (this) {
|
|
||||||
final int idsSize = ids.size();
|
|
||||||
if (idsSize > 0 && persist) {
|
|
||||||
long tx = storageManager.generateID();
|
|
||||||
for (int i = 0; i < idsSize; i++) {
|
|
||||||
final ObjLongPair<ByteArrayHolder> id = ids.get(i);
|
|
||||||
if (id != null && id.getB() != NIL) {
|
|
||||||
storageManager.deleteDuplicateIDTransactional(tx, id.getB());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
storageManager.commit(tx);
|
|
||||||
}
|
|
||||||
|
|
||||||
ids.clear();
|
|
||||||
cache.clear();
|
|
||||||
pos = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<Pair<byte[], Long>> getMap() {
|
|
||||||
final int idsSize = ids.size();
|
|
||||||
List<Pair<byte[], Long>> copy = new ArrayList<>(idsSize);
|
|
||||||
for (int i = 0; i < idsSize; i++) {
|
|
||||||
final ObjLongPair<ByteArrayHolder> id = ids.get(i);
|
|
||||||
copy.add(new Pair<>(id.getA().bytes, id.getB() == NIL ? null : id.getB()));
|
|
||||||
}
|
|
||||||
return copy;
|
|
||||||
}
|
|
||||||
|
|
||||||
private final class AddDuplicateIDOperation extends TransactionOperationAbstract {
|
|
||||||
|
|
||||||
final ByteArrayHolder holder;
|
|
||||||
|
|
||||||
final long recordID;
|
|
||||||
|
|
||||||
volatile boolean done;
|
|
||||||
|
|
||||||
private final boolean afterCommit;
|
|
||||||
|
|
||||||
AddDuplicateIDOperation(final ByteArrayHolder holder, final long recordID, boolean afterCommit) {
|
|
||||||
this.holder = holder;
|
|
||||||
this.recordID = recordID;
|
|
||||||
this.afterCommit = afterCommit;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void process() {
|
|
||||||
if (!done) {
|
|
||||||
addToCacheInMemory(holder, recordID);
|
|
||||||
|
|
||||||
done = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void afterCommit(final Transaction tx) {
|
|
||||||
if (afterCommit) {
|
|
||||||
process();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void beforeCommit(Transaction tx) throws Exception {
|
|
||||||
if (!afterCommit) {
|
|
||||||
process();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<MessageReference> getRelatedMessageReferences() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final class ByteArrayHolder {
|
|
||||||
|
|
||||||
private final byte[] bytes;
|
|
||||||
|
|
||||||
private int hash;
|
|
||||||
|
|
||||||
ByteArrayHolder(final byte[] bytes) {
|
|
||||||
this.bytes = bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(final Object other) {
|
|
||||||
if (other instanceof ByteArrayHolder) {
|
|
||||||
ByteArrayHolder s = (ByteArrayHolder) other;
|
|
||||||
|
|
||||||
return ByteUtil.equals(bytes, s.bytes);
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
if (hash == 0) {
|
|
||||||
hash = ByteUtil.hashCode(bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
return hash;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.core.postoffice.impl;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||||
|
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
|
||||||
|
|
||||||
|
public final class DuplicateIDCaches {
|
||||||
|
|
||||||
|
private DuplicateIDCaches() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static DuplicateIDCache persistent(final SimpleString address,
|
||||||
|
final int size,
|
||||||
|
final StorageManager storageManager) {
|
||||||
|
return new PersistentDuplicateIDCache(address, size, storageManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static DuplicateIDCache inMemory(final SimpleString address, final int size) {
|
||||||
|
return new InMemoryDuplicateIDCache(address, size);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,276 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.core.postoffice.impl;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.function.IntFunction;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
|
||||||
|
import org.apache.activemq.artemis.api.core.Pair;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||||
|
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
|
||||||
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
|
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
|
||||||
|
import org.apache.activemq.artemis.utils.ByteUtil;
|
||||||
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.core.postoffice.impl.IntegerCache.boxedInts;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link InMemoryDuplicateIDCache} and {@link PersistentDuplicateIDCache} impls have been separated for performance
|
||||||
|
* and memory footprint reasons.<br>
|
||||||
|
* Instead of using a single {@link DuplicateIDCache} impl, we've let 2 different impls to contain just the bare
|
||||||
|
* minimum data in order to have 2 different memory footprint costs at runtime, while making easier to track dependencies
|
||||||
|
* eg in-memory cache won't need any {@link StorageManager} because no storage operations are expected to happen.
|
||||||
|
*/
|
||||||
|
final class InMemoryDuplicateIDCache implements DuplicateIDCache {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = Logger.getLogger(InMemoryDuplicateIDCache.class);
|
||||||
|
|
||||||
|
private final Map<ByteArray, Integer> cache = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private final SimpleString address;
|
||||||
|
|
||||||
|
private final ArrayList<ByteArray> ids;
|
||||||
|
|
||||||
|
private final IntFunction<Integer> cachedBoxedInts;
|
||||||
|
|
||||||
|
private int pos;
|
||||||
|
|
||||||
|
private final int cacheSize;
|
||||||
|
|
||||||
|
InMemoryDuplicateIDCache(final SimpleString address, final int size) {
|
||||||
|
this.address = address;
|
||||||
|
|
||||||
|
cacheSize = size;
|
||||||
|
|
||||||
|
ids = new ArrayList<>(size);
|
||||||
|
|
||||||
|
cachedBoxedInts = boxedInts(size);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void load(List<Pair<byte[], Long>> ids) throws Exception {
|
||||||
|
LOGGER.debugf("address = %s ignore loading ids: in memory cache won't load previously stored ids", address);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteFromCache(byte[] duplicateID) {
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
LOGGER.tracef("deleting id = %s", describeID(duplicateID));
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteArray bah = new ByteArray(duplicateID);
|
||||||
|
|
||||||
|
Integer posUsed = cache.remove(bah);
|
||||||
|
|
||||||
|
if (posUsed != null) {
|
||||||
|
ByteArray id;
|
||||||
|
|
||||||
|
synchronized (this) {
|
||||||
|
final int index = posUsed.intValue();
|
||||||
|
id = ids.get(index);
|
||||||
|
|
||||||
|
if (id.equals(bah)) {
|
||||||
|
ids.set(index, null);
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
LOGGER.tracef("address = %s deleting id=", address, describeID(duplicateID));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String describeID(byte[] duplicateID) {
|
||||||
|
return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean contains(final byte[] duplID) {
|
||||||
|
return contains(new ByteArray(duplID));
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean contains(final ByteArray id) {
|
||||||
|
boolean contains = cache.containsKey(id);
|
||||||
|
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
if (contains) {
|
||||||
|
LOGGER.tracef("address = %s found a duplicate ", address, describeID(id.bytes));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return contains;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addToCache(final byte[] duplID) throws Exception {
|
||||||
|
addToCache(duplID, null, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addToCache(final byte[] duplID, final Transaction tx) throws Exception {
|
||||||
|
addToCache(duplID, tx, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) {
|
||||||
|
final ByteArray holder = new ByteArray(duplID);
|
||||||
|
if (contains(holder)) {
|
||||||
|
if (tx != null) {
|
||||||
|
tx.markAsRollbackOnly(new ActiveMQDuplicateIdException());
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
addToCache(holder, tx, true);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {
|
||||||
|
addToCache(new ByteArray(duplID), tx, instantAdd);
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void addToCache(final ByteArray holder, final Transaction tx, boolean instantAdd) {
|
||||||
|
if (tx == null) {
|
||||||
|
addToCacheInMemory(holder);
|
||||||
|
} else {
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
LOGGER.tracef("address = %s adding duplicateID TX operation for %s, tx = %s", address, describeID(holder.bytes), tx);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (instantAdd) {
|
||||||
|
tx.addOperation(new AddDuplicateIDOperation(holder, false));
|
||||||
|
} else {
|
||||||
|
// For a tx, it's important that the entry is not added to the cache until commit
|
||||||
|
// since if the client fails then resends them tx we don't want it to get rejected
|
||||||
|
tx.afterStore(new AddDuplicateIDOperation(holder, true));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void load(final Transaction tx, final byte[] duplID) {
|
||||||
|
tx.addOperation(new AddDuplicateIDOperation(new ByteArray(duplID), true));
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void addToCacheInMemory(final ByteArray holder) {
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
LOGGER.tracef("address = %s adding %s", address, describeID(holder.bytes));
|
||||||
|
}
|
||||||
|
|
||||||
|
cache.put(holder, cachedBoxedInts.apply(pos));
|
||||||
|
|
||||||
|
if (pos < ids.size()) {
|
||||||
|
// Need fast array style access here -hence ArrayList typing
|
||||||
|
final ByteArray id = ids.set(pos, holder);
|
||||||
|
|
||||||
|
// The id here might be null if it was explicit deleted
|
||||||
|
if (id != null) {
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
LOGGER.tracef("address = %s removing excess duplicateDetection %s", address, describeID(id.bytes));
|
||||||
|
}
|
||||||
|
|
||||||
|
cache.remove(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
LOGGER.tracef("address = %s replacing old duplicateID by %s", describeID(holder.bytes));
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
LOGGER.tracef("address = %s adding new duplicateID %s", describeID(holder.bytes));
|
||||||
|
}
|
||||||
|
|
||||||
|
ids.add(holder);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pos++ == cacheSize - 1) {
|
||||||
|
pos = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void clear() throws Exception {
|
||||||
|
if (LOGGER.isDebugEnabled()) {
|
||||||
|
LOGGER.debugf("address = %s removing duplicate ID data", address);
|
||||||
|
}
|
||||||
|
ids.clear();
|
||||||
|
cache.clear();
|
||||||
|
pos = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized List<Pair<byte[], Long>> getMap() {
|
||||||
|
final int idsSize = ids.size();
|
||||||
|
List<Pair<byte[], Long>> copy = new ArrayList<>(idsSize);
|
||||||
|
for (int i = 0; i < idsSize; i++) {
|
||||||
|
final ByteArray id = ids.get(i);
|
||||||
|
// in case the id has been removed
|
||||||
|
if (id != null) {
|
||||||
|
copy.add(new Pair<>(id.bytes, null));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return copy;
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class AddDuplicateIDOperation extends TransactionOperationAbstract {
|
||||||
|
|
||||||
|
final ByteArray id;
|
||||||
|
|
||||||
|
volatile boolean done;
|
||||||
|
|
||||||
|
private final boolean afterCommit;
|
||||||
|
|
||||||
|
AddDuplicateIDOperation(final ByteArray id, boolean afterCommit) {
|
||||||
|
this.id = id;
|
||||||
|
this.afterCommit = afterCommit;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void process() {
|
||||||
|
if (!done) {
|
||||||
|
addToCacheInMemory(id);
|
||||||
|
|
||||||
|
done = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterCommit(final Transaction tx) {
|
||||||
|
if (afterCommit) {
|
||||||
|
process();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void beforeCommit(Transaction tx) throws Exception {
|
||||||
|
if (!afterCommit) {
|
||||||
|
process();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<MessageReference> getRelatedMessageReferences() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,62 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.core.postoffice.impl;
|
||||||
|
|
||||||
|
import java.lang.ref.Reference;
|
||||||
|
import java.lang.ref.WeakReference;
|
||||||
|
import java.util.function.IntFunction;
|
||||||
|
|
||||||
|
final class IntegerCache {
|
||||||
|
|
||||||
|
private static final boolean DISABLE_INTEGER_CACHE = Boolean.valueOf(System.getProperty("disable.integer.cache", Boolean.FALSE.toString()));
|
||||||
|
|
||||||
|
// we're not interested into safe publication here: we need to scale, be fast and save "some" GC to happen
|
||||||
|
private static WeakReference<Integer[]> INDEXES = null;
|
||||||
|
|
||||||
|
private static Integer[] ints(int size) {
|
||||||
|
final Reference<Integer[]> indexesRef = INDEXES;
|
||||||
|
final Integer[] indexes = indexesRef == null ? null : indexesRef.get();
|
||||||
|
if (indexes != null && size <= indexes.length) {
|
||||||
|
return indexes;
|
||||||
|
}
|
||||||
|
final int newSize = size + (indexes == null ? 0 : size / 2);
|
||||||
|
final Integer[] newIndexes = new Integer[newSize];
|
||||||
|
if (indexes != null) {
|
||||||
|
System.arraycopy(indexes, 0, newIndexes, 0, indexes.length);
|
||||||
|
}
|
||||||
|
INDEXES = new WeakReference<>(newIndexes);
|
||||||
|
return newIndexes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static IntFunction<Integer> boxedInts(int size) {
|
||||||
|
if (DISABLE_INTEGER_CACHE) {
|
||||||
|
return Integer::valueOf;
|
||||||
|
}
|
||||||
|
// use a lambda to have an trusted const field for free
|
||||||
|
final Integer[] cachedInts = ints(size);
|
||||||
|
return index -> {
|
||||||
|
Integer boxedInt = cachedInts[index];
|
||||||
|
if (boxedInt == null) {
|
||||||
|
boxedInt = index;
|
||||||
|
cachedInts[index] = boxedInt;
|
||||||
|
}
|
||||||
|
assert boxedInt != null;
|
||||||
|
return boxedInt;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,394 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.core.postoffice.impl;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.function.IntFunction;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
|
||||||
|
import org.apache.activemq.artemis.api.core.ObjLongPair;
|
||||||
|
import org.apache.activemq.artemis.api.core.Pair;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||||
|
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
|
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
|
||||||
|
import org.apache.activemq.artemis.utils.ByteUtil;
|
||||||
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.api.core.ObjLongPair.NIL;
|
||||||
|
import static org.apache.activemq.artemis.core.postoffice.impl.IntegerCache.boxedInts;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link InMemoryDuplicateIDCache} and {@link PersistentDuplicateIDCache} impls have been separated for performance
|
||||||
|
* and memory footprint reasons.<br>
|
||||||
|
* Instead of using a single {@link DuplicateIDCache} impl, we've let 2 different impls to contain just the bare
|
||||||
|
* minimum data in order to have 2 different memory footprint costs at runtime, while making easier to track dependencies
|
||||||
|
* eg in-memory cache won't need any {@link StorageManager} because no storage operations are expected to happen.
|
||||||
|
*/
|
||||||
|
final class PersistentDuplicateIDCache implements DuplicateIDCache {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = Logger.getLogger(PersistentDuplicateIDCache.class);
|
||||||
|
|
||||||
|
private final Map<ByteArray, Integer> cache = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private final SimpleString address;
|
||||||
|
|
||||||
|
private final ArrayList<ObjLongPair<ByteArray>> ids;
|
||||||
|
|
||||||
|
private final IntFunction<Integer> cachedBoxedInts;
|
||||||
|
|
||||||
|
private int pos;
|
||||||
|
|
||||||
|
private final int cacheSize;
|
||||||
|
|
||||||
|
private final StorageManager storageManager;
|
||||||
|
|
||||||
|
PersistentDuplicateIDCache(final SimpleString address, final int size, final StorageManager storageManager) {
|
||||||
|
this.address = address;
|
||||||
|
|
||||||
|
cacheSize = size;
|
||||||
|
|
||||||
|
ids = new ArrayList<>(size);
|
||||||
|
|
||||||
|
cachedBoxedInts = boxedInts(size);
|
||||||
|
|
||||||
|
this.storageManager = storageManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void load(final List<Pair<byte[], Long>> ids) throws Exception {
|
||||||
|
if (!cache.isEmpty()) {
|
||||||
|
throw new IllegalStateException("load is valid only on empty cache");
|
||||||
|
}
|
||||||
|
// load only ids that fit this cache:
|
||||||
|
// - in term of remaining capacity
|
||||||
|
// - ignoring (and reporting) ids unpaired with record ID
|
||||||
|
// Then, delete the exceeding ones.
|
||||||
|
|
||||||
|
long txID = -1;
|
||||||
|
|
||||||
|
int toNotBeAdded = ids.size() - cacheSize;
|
||||||
|
if (toNotBeAdded < 0) {
|
||||||
|
toNotBeAdded = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Pair<byte[], Long> id : ids) {
|
||||||
|
if (id.getB() == null) {
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
LOGGER.tracef("ignoring id = %s because without record ID", describeID(id.getA()));
|
||||||
|
}
|
||||||
|
if (toNotBeAdded > 0) {
|
||||||
|
toNotBeAdded--;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
assert id.getB() != null && id.getB().longValue() != NIL;
|
||||||
|
if (toNotBeAdded > 0) {
|
||||||
|
if (txID == -1) {
|
||||||
|
txID = storageManager.generateID();
|
||||||
|
}
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
LOGGER.tracef("deleting id = %s", describeID(id.getA(), id.getB()));
|
||||||
|
}
|
||||||
|
|
||||||
|
storageManager.deleteDuplicateIDTransactional(txID, id.getB());
|
||||||
|
toNotBeAdded--;
|
||||||
|
} else {
|
||||||
|
ByteArray bah = new ByteArray(id.getA());
|
||||||
|
|
||||||
|
ObjLongPair<ByteArray> pair = new ObjLongPair<>(bah, id.getB());
|
||||||
|
|
||||||
|
cache.put(bah, cachedBoxedInts.apply(this.ids.size()));
|
||||||
|
|
||||||
|
this.ids.add(pair);
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
LOGGER.tracef("loading id = %s", describeID(id.getA(), id.getB()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
if (txID != -1) {
|
||||||
|
storageManager.commit(txID);
|
||||||
|
}
|
||||||
|
|
||||||
|
pos = this.ids.size();
|
||||||
|
|
||||||
|
if (pos == cacheSize) {
|
||||||
|
pos = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteFromCache(byte[] duplicateID) throws Exception {
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
LOGGER.tracef("deleting id = %s", describeID(duplicateID));
|
||||||
|
}
|
||||||
|
|
||||||
|
final ByteArray bah = new ByteArray(duplicateID);
|
||||||
|
|
||||||
|
final Integer posUsed = cache.remove(bah);
|
||||||
|
|
||||||
|
if (posUsed != null) {
|
||||||
|
synchronized (this) {
|
||||||
|
final ObjLongPair<ByteArray> id = ids.get(posUsed.intValue());
|
||||||
|
|
||||||
|
if (id.getA().equals(bah)) {
|
||||||
|
final long recordID = id.getB();
|
||||||
|
id.setA(null);
|
||||||
|
id.setB(NIL);
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
LOGGER.tracef("address = %s deleting id = %s", address, describeID(duplicateID, id.getB()));
|
||||||
|
}
|
||||||
|
storageManager.deleteDuplicateID(recordID);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String describeID(byte[] duplicateID) {
|
||||||
|
return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String describeID(byte[] duplicateID, long id) {
|
||||||
|
return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID) + ", id=" + id;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean contains(final byte[] duplID) {
|
||||||
|
return contains(new ByteArray(duplID));
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean contains(final ByteArray duplID) {
|
||||||
|
boolean contains = cache.containsKey(duplID);
|
||||||
|
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
if (contains) {
|
||||||
|
LOGGER.tracef("address = %s found a duplicate %s", address, describeID(duplID.bytes));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return contains;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addToCache(final byte[] duplID) throws Exception {
|
||||||
|
addToCache(duplID, null, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addToCache(final byte[] duplID, final Transaction tx) throws Exception {
|
||||||
|
addToCache(duplID, tx, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception {
|
||||||
|
final ByteArray holder = new ByteArray(duplID);
|
||||||
|
if (contains(holder)) {
|
||||||
|
if (tx != null) {
|
||||||
|
tx.markAsRollbackOnly(new ActiveMQDuplicateIdException());
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
addToCache(holder, tx, true);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {
|
||||||
|
addToCache(new ByteArray(duplID), tx, instantAdd);
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void addToCache(final ByteArray holder,
|
||||||
|
final Transaction tx,
|
||||||
|
boolean instantAdd) throws Exception {
|
||||||
|
final long recordID = storageManager.generateID();
|
||||||
|
if (tx == null) {
|
||||||
|
storageManager.storeDuplicateID(address, holder.bytes, recordID);
|
||||||
|
|
||||||
|
addToCacheInMemory(holder, recordID);
|
||||||
|
} else {
|
||||||
|
storageManager.storeDuplicateIDTransactional(tx.getID(), address, holder.bytes, recordID);
|
||||||
|
|
||||||
|
tx.setContainsPersistent();
|
||||||
|
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
LOGGER.tracef("address = %s adding duplicateID TX operation for %s, tx = %s", address,
|
||||||
|
describeID(holder.bytes, recordID), tx);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (instantAdd) {
|
||||||
|
tx.addOperation(new AddDuplicateIDOperation(holder, recordID, false));
|
||||||
|
} else {
|
||||||
|
// For a tx, it's important that the entry is not added to the cache until commit
|
||||||
|
// since if the client fails then resends them tx we don't want it to get rejected
|
||||||
|
tx.afterStore(new AddDuplicateIDOperation(holder, recordID, true));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void load(final Transaction tx, final byte[] duplID) {
|
||||||
|
tx.addOperation(new AddDuplicateIDOperation(new ByteArray(duplID), tx.getID(), true));
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void addToCacheInMemory(final ByteArray holder, final long recordID) {
|
||||||
|
Objects.requireNonNull(holder, "holder must be not null");
|
||||||
|
if (recordID < 0) {
|
||||||
|
throw new IllegalArgumentException("recordID must be >= 0");
|
||||||
|
}
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
LOGGER.tracef("address = %s adding %s", address, describeID(holder.bytes, recordID));
|
||||||
|
}
|
||||||
|
|
||||||
|
cache.put(holder, cachedBoxedInts.apply(pos));
|
||||||
|
|
||||||
|
ObjLongPair<ByteArray> id;
|
||||||
|
|
||||||
|
if (pos < ids.size()) {
|
||||||
|
// Need fast array style access here -hence ArrayList typing
|
||||||
|
id = ids.get(pos);
|
||||||
|
|
||||||
|
// The id here might be null if it was explicit deleted
|
||||||
|
if (id.getA() != null) {
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
LOGGER.tracef("address = %s removing excess duplicateDetection %s", address, describeID(id.getA().bytes, id.getB()));
|
||||||
|
}
|
||||||
|
|
||||||
|
cache.remove(id.getA());
|
||||||
|
|
||||||
|
assert id.getB() != NIL;
|
||||||
|
try {
|
||||||
|
storageManager.deleteDuplicateID(id.getB());
|
||||||
|
} catch (Exception e) {
|
||||||
|
ActiveMQServerLogger.LOGGER.errorDeletingDuplicateCache(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
id.setA(holder);
|
||||||
|
|
||||||
|
id.setB(recordID);
|
||||||
|
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
LOGGER.tracef("address = %s replacing old duplicateID by %s", address, describeID(id.getA().bytes, id.getB()));
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
id = new ObjLongPair<>(holder, recordID);
|
||||||
|
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
LOGGER.tracef("address = %s adding new duplicateID %s", address, describeID(id.getA().bytes, id.getB()));
|
||||||
|
}
|
||||||
|
|
||||||
|
ids.add(id);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pos++ == cacheSize - 1) {
|
||||||
|
pos = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void clear() throws Exception {
|
||||||
|
LOGGER.debugf("address = %s removing duplicate ID data", address);
|
||||||
|
final int idsSize = ids.size();
|
||||||
|
if (idsSize > 0) {
|
||||||
|
long tx = storageManager.generateID();
|
||||||
|
for (int i = 0; i < idsSize; i++) {
|
||||||
|
final ObjLongPair<ByteArray> id = ids.get(i);
|
||||||
|
if (id.getA() != null) {
|
||||||
|
assert id.getB() != NIL;
|
||||||
|
storageManager.deleteDuplicateIDTransactional(tx, id.getB());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
storageManager.commit(tx);
|
||||||
|
}
|
||||||
|
|
||||||
|
ids.clear();
|
||||||
|
cache.clear();
|
||||||
|
pos = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized List<Pair<byte[], Long>> getMap() {
|
||||||
|
final int idsSize = ids.size();
|
||||||
|
List<Pair<byte[], Long>> copy = new ArrayList<>(idsSize);
|
||||||
|
for (int i = 0; i < idsSize; i++) {
|
||||||
|
final ObjLongPair<ByteArray> id = ids.get(i);
|
||||||
|
// in case the pair has been removed
|
||||||
|
if (id.getA() != null) {
|
||||||
|
assert id.getB() != NIL;
|
||||||
|
copy.add(new Pair<>(id.getA().bytes, id.getB()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return copy;
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class AddDuplicateIDOperation extends TransactionOperationAbstract {
|
||||||
|
|
||||||
|
final ByteArray holder;
|
||||||
|
|
||||||
|
final long recordID;
|
||||||
|
|
||||||
|
volatile boolean done;
|
||||||
|
|
||||||
|
private final boolean afterCommit;
|
||||||
|
|
||||||
|
AddDuplicateIDOperation(final ByteArray holder, final long recordID, boolean afterCommit) {
|
||||||
|
this.holder = holder;
|
||||||
|
this.recordID = recordID;
|
||||||
|
this.afterCommit = afterCommit;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void process() {
|
||||||
|
if (!done) {
|
||||||
|
addToCacheInMemory(holder, recordID);
|
||||||
|
|
||||||
|
done = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterCommit(final Transaction tx) {
|
||||||
|
if (afterCommit) {
|
||||||
|
process();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void beforeCommit(Transaction tx) throws Exception {
|
||||||
|
if (!afterCommit) {
|
||||||
|
process();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<MessageReference> getRelatedMessageReferences() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -16,6 +16,25 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.postoffice.impl;
|
package org.apache.activemq.artemis.core.postoffice.impl;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
|
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
|
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
|
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
|
||||||
|
@ -82,25 +101,6 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||||
import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
import java.util.concurrent.Executor;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.stream.Stream;
|
|
||||||
|
|
||||||
import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
|
import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1315,7 +1315,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
DuplicateIDCache cache = duplicateIDCaches.get(address);
|
DuplicateIDCache cache = duplicateIDCaches.get(address);
|
||||||
|
|
||||||
if (cache == null) {
|
if (cache == null) {
|
||||||
cache = new DuplicateIDCacheImpl(address, idCacheSize, storageManager, persistIDCache);
|
if (persistIDCache) {
|
||||||
|
cache = DuplicateIDCaches.persistent(address, idCacheSize, storageManager);
|
||||||
|
} else {
|
||||||
|
cache = DuplicateIDCaches.inMemory(address, idCacheSize);
|
||||||
|
}
|
||||||
|
|
||||||
DuplicateIDCache oldCache = duplicateIDCaches.putIfAbsent(address, cache);
|
DuplicateIDCache oldCache = duplicateIDCaches.putIfAbsent(address, cache);
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||||
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
|
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
|
||||||
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
|
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
|
||||||
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
|
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches;
|
||||||
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
|
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
|
||||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||||
import org.apache.activemq.artemis.utils.RetryRule;
|
import org.apache.activemq.artemis.utils.RetryRule;
|
||||||
|
@ -52,7 +52,7 @@ public class DuplicateCacheTest extends StorageManagerTestBase {
|
||||||
public void testDuplicate() throws Exception {
|
public void testDuplicate() throws Exception {
|
||||||
createStorage();
|
createStorage();
|
||||||
|
|
||||||
DuplicateIDCache cache = new DuplicateIDCacheImpl(new SimpleString("test"), 2000, journal, true);
|
DuplicateIDCache cache = DuplicateIDCaches.persistent(new SimpleString("test"), 2000, journal);
|
||||||
|
|
||||||
TransactionImpl tx = new TransactionImpl(journal);
|
TransactionImpl tx = new TransactionImpl(journal);
|
||||||
|
|
||||||
|
@ -108,7 +108,7 @@ public class DuplicateCacheTest extends StorageManagerTestBase {
|
||||||
public void testDuplicateNonPersistent() throws Exception {
|
public void testDuplicateNonPersistent() throws Exception {
|
||||||
createStorage();
|
createStorage();
|
||||||
|
|
||||||
DuplicateIDCache cache = new DuplicateIDCacheImpl(new SimpleString("test"), 2000, journal, false);
|
DuplicateIDCache cache = DuplicateIDCaches.inMemory(new SimpleString("test"), 2000);
|
||||||
|
|
||||||
TransactionImpl tx = new TransactionImpl(journal);
|
TransactionImpl tx = new TransactionImpl(journal);
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,7 @@ import java.util.SplittableRandom;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
|
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
|
||||||
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
|
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
|
||||||
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
|
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches;
|
||||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||||
import org.openjdk.jmh.annotations.Benchmark;
|
import org.openjdk.jmh.annotations.Benchmark;
|
||||||
import org.openjdk.jmh.annotations.Fork;
|
import org.openjdk.jmh.annotations.Fork;
|
||||||
|
@ -54,7 +54,9 @@ public class DuplicateIDCacheBenchmark {
|
||||||
|
|
||||||
@Setup
|
@Setup
|
||||||
public void init() throws Exception {
|
public void init() throws Exception {
|
||||||
cache = new DuplicateIDCacheImpl(SimpleString.toSimpleString("benchmark"), size, new NullStorageManager(), persist);
|
cache = persist ?
|
||||||
|
DuplicateIDCaches.persistent(SimpleString.toSimpleString("benchmark"), size, new NullStorageManager()) :
|
||||||
|
DuplicateIDCaches.inMemory(SimpleString.toSimpleString("benchmark"), size);
|
||||||
final int idSize = findNextHigherPowerOf2(size);
|
final int idSize = findNextHigherPowerOf2(size);
|
||||||
idsMask = idSize - 1;
|
idsMask = idSize - 1;
|
||||||
nextId = 0;
|
nextId = 0;
|
||||||
|
|
|
@ -31,8 +31,9 @@ import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
|
||||||
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
|
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
|
||||||
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
|
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
|
||||||
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
|
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
|
||||||
|
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
|
||||||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||||
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
|
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches;
|
||||||
import org.apache.activemq.artemis.core.server.impl.PostOfficeJournalLoader;
|
import org.apache.activemq.artemis.core.server.impl.PostOfficeJournalLoader;
|
||||||
import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl;
|
import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl;
|
||||||
import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakePostOffice;
|
import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakePostOffice;
|
||||||
|
@ -40,8 +41,8 @@ import org.apache.activemq.artemis.tests.unit.util.FakePagingManager;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
|
||||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||||
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
|
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -106,7 +107,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
Assert.assertEquals(0, mapDups.size());
|
Assert.assertEquals(0, mapDups.size());
|
||||||
|
|
||||||
DuplicateIDCacheImpl cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true);
|
DuplicateIDCache cacheID = DuplicateIDCaches.persistent(ADDRESS, 10, journal);
|
||||||
|
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
cacheID.addToCache(RandomUtil.randomBytes());
|
cacheID.addToCache(RandomUtil.randomBytes());
|
||||||
|
@ -126,7 +127,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
Assert.assertEquals(10, values.size());
|
Assert.assertEquals(10, values.size());
|
||||||
|
|
||||||
cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true);
|
cacheID = DuplicateIDCaches.persistent(ADDRESS, 10, journal);
|
||||||
cacheID.load(values);
|
cacheID.load(values);
|
||||||
|
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
|
|
|
@ -28,14 +28,13 @@ import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.filter.Filter;
|
import org.apache.activemq.artemis.core.filter.Filter;
|
||||||
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
|
|
||||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||||
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
|
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
|
||||||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||||
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
|
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
|
||||||
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
|
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches;
|
||||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||||
|
@ -201,7 +200,7 @@ public class FakePostOffice implements PostOffice {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DuplicateIDCache getDuplicateIDCache(final SimpleString address) {
|
public DuplicateIDCache getDuplicateIDCache(final SimpleString address) {
|
||||||
return new DuplicateIDCacheImpl(address, 2000, new NullStorageManager(), false);
|
return DuplicateIDCaches.inMemory(address, 2000);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue