This commit is contained in:
Clebert Suconic 2021-01-06 09:05:01 -05:00
commit b1b88b4f6f
16 changed files with 1285 additions and 449 deletions

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core;
import java.io.Serializable;
import java.util.Objects;
/**
* A Pair is a holder for 1 object and a >= 0 primitive long value.
* <p>
* This is a utility class.
*/
public class ObjLongPair<A> implements Serializable {
private static final long serialVersionUID = 7749478219139339853L;
public static final long NIL = -1;
public ObjLongPair(final A a, final long b) {
this.a = a;
this.b = b;
if (b < 0 && b != NIL) {
throw new IllegalStateException("b must be >= 0 or == NIL = " + NIL);
}
}
public ObjLongPair(final A a) {
this.a = a;
this.b = NIL;
}
private A a;
private long b;
@Override
public int hashCode() {
if (a == null && b == NIL) {
return super.hashCode();
}
// it's ok to use b to compute hashCode although NIL
return (a == null ? 0 : a.hashCode()) + 37 * Long.hashCode(b);
}
@Override
public boolean equals(final Object other) {
if (other == this) {
return true;
}
if (other instanceof ObjLongPair == false) {
return false;
}
ObjLongPair<A> pother = (ObjLongPair<A>) other;
return (Objects.equals(pother.a, a)) && (pother.b == b);
}
@Override
public String toString() {
return "ObjLongPair[a=" + a + ", b=" + (b == NIL ? "NIL" : b) + "]";
}
public void setA(A a) {
if (this.a == a) {
return;
}
this.a = a;
}
public A getA() {
return a;
}
public void setB(long b) {
if (b < 0 && b != NIL) {
throw new IllegalStateException("b must be >= 0 or == NIL = " + NIL);
}
this.b = b;
}
public long getB() {
return b;
}
}

View File

@ -275,6 +275,52 @@ public class ByteUtil {
} }
} }
public static int hashCode(byte[] bytes) {
if (PlatformDependent.hasUnsafe() && PlatformDependent.isUnaligned()) {
return unsafeHashCode(bytes);
}
return Arrays.hashCode(bytes);
}
/**
* This hash code computation is borrowed by {@link io.netty.buffer.ByteBufUtil#hashCode(ByteBuf)}.
*/
private static int unsafeHashCode(byte[] bytes) {
if (bytes == null) {
return 0;
}
final int len = bytes.length;
int hashCode = 1;
final int intCount = len >>> 2;
int arrayIndex = 0;
// reading in batch both help hash code computation data dependencies and save memory bandwidth
for (int i = 0; i < intCount; i++) {
hashCode = 31 * hashCode + PlatformDependent.getInt(bytes, arrayIndex);
arrayIndex += Integer.BYTES;
}
final byte remaining = (byte) (len & 3);
if (remaining > 0) {
hashCode = unsafeUnrolledHashCode(bytes, arrayIndex, remaining, hashCode);
}
return hashCode == 0 ? 1 : hashCode;
}
private static int unsafeUnrolledHashCode(byte[] bytes, int index, int bytesCount, int h) {
// there is still the hash data dependency but is more friendly
// then a plain loop, given that we know no loop is needed here
assert bytesCount > 0 && bytesCount < 4;
h = 31 * h + PlatformDependent.getByte(bytes, index);
if (bytesCount == 1) {
return h;
}
h = 31 * h + PlatformDependent.getByte(bytes, index + 1);
if (bytesCount == 2) {
return h;
}
h = 31 * h + PlatformDependent.getByte(bytes, index + 2);
return h;
}
public static boolean equals(final byte[] left, final byte[] right) { public static boolean equals(final byte[] left, final byte[] right) {
return equals(left, right, 0, right.length); return equals(left, right, 0, right.length);
} }

View File

@ -17,17 +17,23 @@
package org.apache.activemq.artemis.utils; package org.apache.activemq.artemis.utils;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.ReadOnlyBufferException; import java.nio.ReadOnlyBufferException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import org.junit.Test;
public class ByteUtilTest { public class ByteUtilTest {
@ -83,6 +89,46 @@ public class ByteUtilTest {
} }
} }
@Test
public void testUnsafeUnalignedByteArrayHashCode() {
Assume.assumeTrue(PlatformDependent.hasUnsafe());
Assume.assumeTrue(PlatformDependent.isUnaligned());
Map<byte[], Integer> map = new LinkedHashMap<>();
map.put(new byte[0], 1);
map.put(new byte[]{1}, 32);
map.put(new byte[]{2}, 33);
map.put(new byte[]{0, 1}, 962);
map.put(new byte[]{1, 2}, 994);
if (ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN) {
map.put(new byte[]{0, 1, 2, 3, 4, 5}, 63504931);
map.put(new byte[]{6, 7, 8, 9, 0, 1}, -1603953111);
map.put(new byte[]{-1, -1, -1, (byte) 0xE1}, 1);
} else {
map.put(new byte[]{0, 1, 2, 3, 4, 5}, 1250309600);
map.put(new byte[]{6, 7, 8, 9, 0, 1}, -417148442);
map.put(new byte[]{-1, -1, -1, (byte) 0xE1}, -503316450);
}
for (Map.Entry<byte[], Integer> e : map.entrySet()) {
assertEquals("input = " + Arrays.toString(e.getKey()), e.getValue().intValue(), ByteUtil.hashCode(e.getKey()));
}
}
@Test
public void testNoUnsafeAlignedByteArrayHashCode() {
Assume.assumeFalse(PlatformDependent.hasUnsafe());
Assume.assumeFalse(PlatformDependent.isUnaligned());
ArrayList<byte[]> inputs = new ArrayList<>();
inputs.add(new byte[0]);
inputs.add(new byte[]{1});
inputs.add(new byte[]{2});
inputs.add(new byte[]{0, 1});
inputs.add(new byte[]{1, 2});
inputs.add(new byte[]{0, 1, 2, 3, 4, 5});
inputs.add(new byte[]{6, 7, 8, 9, 0, 1});
inputs.add(new byte[]{-1, -1, -1, (byte) 0xE1});
inputs.forEach(input -> assertEquals("input = " + Arrays.toString(input), Arrays.hashCode(input), ByteUtil.hashCode(input)));
}
@Test @Test
public void testTextBytesToLongBytesNegative() { public void testTextBytesToLongBytesNegative() {
try { try {

View File

@ -40,7 +40,7 @@ public interface DuplicateIDCache {
void deleteFromCache(byte[] duplicateID) throws Exception; void deleteFromCache(byte[] duplicateID) throws Exception;
void load(List<Pair<byte[], Long>> theIds) throws Exception; void load(List<Pair<byte[], Long>> ids) throws Exception;
void load(Transaction tx, byte[] duplID); void load(Transaction tx, byte[] duplID);

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.postoffice.impl;
import org.apache.activemq.artemis.utils.ByteUtil;
final class ByteArray {
final byte[] bytes;
private int hash;
ByteArray(final byte[] bytes) {
this.bytes = bytes;
}
@Override
public boolean equals(final Object other) {
if (other instanceof ByteArray) {
ByteArray s = (ByteArray) other;
return ByteUtil.equals(bytes, s.bytes);
} else {
return false;
}
}
@Override
public int hashCode() {
if (hash == 0) {
hash = ByteUtil.hashCode(bytes);
}
return hash;
}
}

View File

@ -1,417 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.jboss.logging.Logger;
/**
* A DuplicateIDCacheImpl
*
* A fixed size rotating cache of last X duplicate ids.
*/
public class DuplicateIDCacheImpl implements DuplicateIDCache {
private static final Logger logger = Logger.getLogger(DuplicateIDCacheImpl.class);
// ByteHolder, position
private final Map<ByteArrayHolder, Integer> cache = new ConcurrentHashMap<>();
private final SimpleString address;
// Note - deliberately typed as ArrayList since we want to ensure fast indexed
// based array access
private final ArrayList<Pair<ByteArrayHolder, Long>> ids;
private int pos;
private final int cacheSize;
private final StorageManager storageManager;
private final boolean persist;
public DuplicateIDCacheImpl(final SimpleString address,
final int size,
final StorageManager storageManager,
final boolean persist) {
this.address = address;
cacheSize = size;
ids = new ArrayList<>(size);
this.storageManager = storageManager;
this.persist = persist;
}
@Override
public void load(final List<Pair<byte[], Long>> theIds) throws Exception {
long txID = -1;
// If we have more IDs than cache size, we shrink the first ones
int deleteCount = theIds.size() - cacheSize;
if (deleteCount < 0) {
deleteCount = 0;
}
for (Pair<byte[], Long> id : theIds) {
if (deleteCount > 0) {
if (txID == -1) {
txID = storageManager.generateID();
}
if (logger.isTraceEnabled()) {
logger.trace("DuplicateIDCacheImpl::load deleting id=" + describeID(id.getA(), id.getB()));
}
storageManager.deleteDuplicateIDTransactional(txID, id.getB());
deleteCount--;
} else {
ByteArrayHolder bah = new ByteArrayHolder(id.getA());
Pair<ByteArrayHolder, Long> pair = new Pair<>(bah, id.getB());
cache.put(bah, ids.size());
ids.add(pair);
if (logger.isTraceEnabled()) {
logger.trace("DuplicateIDCacheImpl::load loading id=" + describeID(id.getA(), id.getB()));
}
}
}
if (txID != -1) {
storageManager.commit(txID);
}
pos = ids.size();
if (pos == cacheSize) {
pos = 0;
}
}
@Override
public void deleteFromCache(byte[] duplicateID) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("DuplicateIDCacheImpl::deleteFromCache deleting id=" + describeID(duplicateID, 0));
}
ByteArrayHolder bah = new ByteArrayHolder(duplicateID);
Integer posUsed = cache.remove(bah);
if (posUsed != null) {
Pair<ByteArrayHolder, Long> id;
synchronized (this) {
id = ids.get(posUsed.intValue());
if (id.getA().equals(bah)) {
id.setA(null);
storageManager.deleteDuplicateID(id.getB());
if (logger.isTraceEnabled()) {
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::deleteFromCache deleting id=" + describeID(duplicateID, id.getB()));
}
id.setB(null);
}
}
}
}
private String describeID(byte[] duplicateID, long id) {
if (id != 0) {
return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID);
} else {
return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID) + ", id=" + id;
}
}
@Override
public boolean contains(final byte[] duplID) {
boolean contains = cache.get(new ByteArrayHolder(duplID)) != null;
if (contains) {
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::constains found a duplicate " + describeID(duplID, 0));
}
return contains;
}
@Override
public void addToCache(final byte[] duplID) throws Exception {
addToCache(duplID, null, false);
}
@Override
public void addToCache(final byte[] duplID, final Transaction tx) throws Exception {
addToCache(duplID, tx, false);
}
@Override
public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception {
if (contains(duplID)) {
if (tx != null) {
tx.markAsRollbackOnly(new ActiveMQDuplicateIdException());
}
return false;
} else {
addToCache(duplID, tx, true);
return true;
}
}
@Override
public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {
long recordID = -1;
if (tx == null) {
if (persist) {
recordID = storageManager.generateID();
storageManager.storeDuplicateID(address, duplID, recordID);
}
addToCacheInMemory(duplID, recordID);
} else {
if (persist) {
recordID = storageManager.generateID();
storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID);
tx.setContainsPersistent();
}
if (logger.isTraceEnabled()) {
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID) + ", tx=" + tx);
}
if (instantAdd) {
tx.addOperation(new AddDuplicateIDOperation(duplID, recordID, false));
} else {
// For a tx, it's important that the entry is not added to the cache until commit
// since if the client fails then resends them tx we don't want it to get rejected
tx.afterStore(new AddDuplicateIDOperation(duplID, recordID, true));
}
}
}
@Override
public void load(final Transaction tx, final byte[] duplID) {
tx.addOperation(new AddDuplicateIDOperation(duplID, tx.getID(), true));
}
private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID) {
if (logger.isTraceEnabled()) {
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding " + describeID(duplID, recordID));
}
ByteArrayHolder holder = new ByteArrayHolder(duplID);
cache.put(holder, pos);
Pair<ByteArrayHolder, Long> id;
if (pos < ids.size()) {
// Need fast array style access here -hence ArrayList typing
id = ids.get(pos);
// The id here might be null if it was explicit deleted
if (id.getA() != null) {
if (logger.isTraceEnabled()) {
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory removing excess duplicateDetection " + describeID(id.getA().bytes, id.getB()));
}
cache.remove(id.getA());
// Record already exists - we delete the old one and add the new one
// Note we can't use update since journal update doesn't let older records get
// reclaimed
if (id.getB() != null) {
try {
storageManager.deleteDuplicateID(id.getB());
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorDeletingDuplicateCache(e);
}
}
}
id.setA(holder);
// The recordID could be negative if the duplicateCache is configured to not persist,
// -1 would mean null on this case
id.setB(recordID >= 0 ? recordID : null);
if (logger.isTraceEnabled()) {
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory replacing old duplicateID by " + describeID(id.getA().bytes, id.getB()));
}
holder.pos = pos;
} else {
id = new Pair<>(holder, recordID >= 0 ? recordID : null);
if (logger.isTraceEnabled()) {
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding new duplicateID " + describeID(id.getA().bytes, id.getB()));
}
ids.add(id);
holder.pos = pos;
}
if (pos++ == cacheSize - 1) {
pos = 0;
}
}
@Override
public void clear() throws Exception {
logger.debug("DuplicateIDCacheImpl(" + this.address + ")::clear removing duplicate ID data");
synchronized (this) {
if (ids.size() > 0 && persist) {
long tx = storageManager.generateID();
for (Pair<ByteArrayHolder, Long> id : ids) {
if (id != null) {
storageManager.deleteDuplicateIDTransactional(tx, id.getB());
}
}
storageManager.commit(tx);
}
ids.clear();
cache.clear();
pos = 0;
}
}
@Override
public List<Pair<byte[], Long>> getMap() {
List<Pair<byte[], Long>> list = new ArrayList<>();
for (Pair<ByteArrayHolder, Long> id : ids) {
list.add(new Pair<>(id.getA().bytes, id.getB()));
}
return list;
}
private final class AddDuplicateIDOperation extends TransactionOperationAbstract {
final byte[] duplID;
final long recordID;
volatile boolean done;
private final boolean afterCommit;
AddDuplicateIDOperation(final byte[] duplID, final long recordID, boolean afterCommit) {
this.duplID = duplID;
this.recordID = recordID;
this.afterCommit = afterCommit;
}
private void process() {
if (!done) {
addToCacheInMemory(duplID, recordID);
done = true;
}
}
@Override
public void afterCommit(final Transaction tx) {
if (afterCommit) {
process();
}
}
@Override
public void beforeCommit(Transaction tx) throws Exception {
if (!afterCommit) {
process();
}
}
@Override
public List<MessageReference> getRelatedMessageReferences() {
return null;
}
}
private static final class ByteArrayHolder {
ByteArrayHolder(final byte[] bytes) {
this.bytes = bytes;
}
final byte[] bytes;
int hash;
int pos;
@Override
public boolean equals(final Object other) {
if (other instanceof ByteArrayHolder) {
ByteArrayHolder s = (ByteArrayHolder) other;
if (bytes.length != s.bytes.length) {
return false;
}
for (int i = 0; i < bytes.length; i++) {
if (bytes[i] != s.bytes[i]) {
return false;
}
}
return true;
} else {
return false;
}
}
@Override
public int hashCode() {
if (hash == 0) {
for (byte b : bytes) {
hash = 31 * hash + b;
}
}
return hash;
}
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.postoffice.impl;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
public final class DuplicateIDCaches {
private DuplicateIDCaches() {
}
public static DuplicateIDCache persistent(final SimpleString address,
final int size,
final StorageManager storageManager) {
return new PersistentDuplicateIDCache(address, size, storageManager);
}
public static DuplicateIDCache inMemory(final SimpleString address, final int size) {
return new InMemoryDuplicateIDCache(address, size);
}
}

View File

@ -0,0 +1,276 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.IntFunction;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.postoffice.impl.IntegerCache.boxedInts;
/**
* {@link InMemoryDuplicateIDCache} and {@link PersistentDuplicateIDCache} impls have been separated for performance
* and memory footprint reasons.<br>
* Instead of using a single {@link DuplicateIDCache} impl, we've let 2 different impls to contain just the bare
* minimum data in order to have 2 different memory footprint costs at runtime, while making easier to track dependencies
* eg in-memory cache won't need any {@link StorageManager} because no storage operations are expected to happen.
*/
final class InMemoryDuplicateIDCache implements DuplicateIDCache {
private static final Logger LOGGER = Logger.getLogger(InMemoryDuplicateIDCache.class);
private final Map<ByteArray, Integer> cache = new ConcurrentHashMap<>();
private final SimpleString address;
private final ArrayList<ByteArray> ids;
private final IntFunction<Integer> cachedBoxedInts;
private int pos;
private final int cacheSize;
InMemoryDuplicateIDCache(final SimpleString address, final int size) {
this.address = address;
cacheSize = size;
ids = new ArrayList<>(size);
cachedBoxedInts = boxedInts(size);
}
@Override
public void load(List<Pair<byte[], Long>> ids) throws Exception {
LOGGER.debugf("address = %s ignore loading ids: in memory cache won't load previously stored ids", address);
}
@Override
public void deleteFromCache(byte[] duplicateID) {
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("deleting id = %s", describeID(duplicateID));
}
ByteArray bah = new ByteArray(duplicateID);
Integer posUsed = cache.remove(bah);
if (posUsed != null) {
ByteArray id;
synchronized (this) {
final int index = posUsed.intValue();
id = ids.get(index);
if (id.equals(bah)) {
ids.set(index, null);
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("address = %s deleting id=", address, describeID(duplicateID));
}
}
}
}
}
private static String describeID(byte[] duplicateID) {
return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID);
}
@Override
public boolean contains(final byte[] duplID) {
return contains(new ByteArray(duplID));
}
private boolean contains(final ByteArray id) {
boolean contains = cache.containsKey(id);
if (LOGGER.isTraceEnabled()) {
if (contains) {
LOGGER.tracef("address = %s found a duplicate ", address, describeID(id.bytes));
}
}
return contains;
}
@Override
public void addToCache(final byte[] duplID) throws Exception {
addToCache(duplID, null, false);
}
@Override
public void addToCache(final byte[] duplID, final Transaction tx) throws Exception {
addToCache(duplID, tx, false);
}
@Override
public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) {
final ByteArray holder = new ByteArray(duplID);
if (contains(holder)) {
if (tx != null) {
tx.markAsRollbackOnly(new ActiveMQDuplicateIdException());
}
return false;
}
addToCache(holder, tx, true);
return true;
}
@Override
public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {
addToCache(new ByteArray(duplID), tx, instantAdd);
}
private synchronized void addToCache(final ByteArray holder, final Transaction tx, boolean instantAdd) {
if (tx == null) {
addToCacheInMemory(holder);
} else {
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("address = %s adding duplicateID TX operation for %s, tx = %s", address, describeID(holder.bytes), tx);
}
if (instantAdd) {
tx.addOperation(new AddDuplicateIDOperation(holder, false));
} else {
// For a tx, it's important that the entry is not added to the cache until commit
// since if the client fails then resends them tx we don't want it to get rejected
tx.afterStore(new AddDuplicateIDOperation(holder, true));
}
}
}
@Override
public void load(final Transaction tx, final byte[] duplID) {
tx.addOperation(new AddDuplicateIDOperation(new ByteArray(duplID), true));
}
private synchronized void addToCacheInMemory(final ByteArray holder) {
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("address = %s adding %s", address, describeID(holder.bytes));
}
cache.put(holder, cachedBoxedInts.apply(pos));
if (pos < ids.size()) {
// Need fast array style access here -hence ArrayList typing
final ByteArray id = ids.set(pos, holder);
// The id here might be null if it was explicit deleted
if (id != null) {
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("address = %s removing excess duplicateDetection %s", address, describeID(id.bytes));
}
cache.remove(id);
}
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("address = %s replacing old duplicateID by %s", describeID(holder.bytes));
}
} else {
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("address = %s adding new duplicateID %s", describeID(holder.bytes));
}
ids.add(holder);
}
if (pos++ == cacheSize - 1) {
pos = 0;
}
}
@Override
public synchronized void clear() throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("address = %s removing duplicate ID data", address);
}
ids.clear();
cache.clear();
pos = 0;
}
@Override
public synchronized List<Pair<byte[], Long>> getMap() {
final int idsSize = ids.size();
List<Pair<byte[], Long>> copy = new ArrayList<>(idsSize);
for (int i = 0; i < idsSize; i++) {
final ByteArray id = ids.get(i);
// in case the id has been removed
if (id != null) {
copy.add(new Pair<>(id.bytes, null));
}
}
return copy;
}
private final class AddDuplicateIDOperation extends TransactionOperationAbstract {
final ByteArray id;
volatile boolean done;
private final boolean afterCommit;
AddDuplicateIDOperation(final ByteArray id, boolean afterCommit) {
this.id = id;
this.afterCommit = afterCommit;
}
private void process() {
if (!done) {
addToCacheInMemory(id);
done = true;
}
}
@Override
public void afterCommit(final Transaction tx) {
if (afterCommit) {
process();
}
}
@Override
public void beforeCommit(Transaction tx) throws Exception {
if (!afterCommit) {
process();
}
}
@Override
public List<MessageReference> getRelatedMessageReferences() {
return null;
}
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.postoffice.impl;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.function.IntFunction;
final class IntegerCache {
private static final boolean DISABLE_INTEGER_CACHE = Boolean.valueOf(System.getProperty("disable.integer.cache", Boolean.FALSE.toString()));
// we're not interested into safe publication here: we need to scale, be fast and save "some" GC to happen
private static WeakReference<Integer[]> INDEXES = null;
private static Integer[] ints(int size) {
final Reference<Integer[]> indexesRef = INDEXES;
final Integer[] indexes = indexesRef == null ? null : indexesRef.get();
if (indexes != null && size <= indexes.length) {
return indexes;
}
final int newSize = size + (indexes == null ? 0 : size / 2);
final Integer[] newIndexes = new Integer[newSize];
if (indexes != null) {
System.arraycopy(indexes, 0, newIndexes, 0, indexes.length);
}
INDEXES = new WeakReference<>(newIndexes);
return newIndexes;
}
public static IntFunction<Integer> boxedInts(int size) {
if (DISABLE_INTEGER_CACHE) {
return Integer::valueOf;
}
// use a lambda to have an trusted const field for free
final Integer[] cachedInts = ints(size);
return index -> {
Integer boxedInt = cachedInts[index];
if (boxedInt == null) {
boxedInt = index;
cachedInts[index] = boxedInt;
}
assert boxedInt != null;
return boxedInt;
};
}
}

View File

@ -0,0 +1,394 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.IntFunction;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
import org.apache.activemq.artemis.api.core.ObjLongPair;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.api.core.ObjLongPair.NIL;
import static org.apache.activemq.artemis.core.postoffice.impl.IntegerCache.boxedInts;
/**
* {@link InMemoryDuplicateIDCache} and {@link PersistentDuplicateIDCache} impls have been separated for performance
* and memory footprint reasons.<br>
* Instead of using a single {@link DuplicateIDCache} impl, we've let 2 different impls to contain just the bare
* minimum data in order to have 2 different memory footprint costs at runtime, while making easier to track dependencies
* eg in-memory cache won't need any {@link StorageManager} because no storage operations are expected to happen.
*/
final class PersistentDuplicateIDCache implements DuplicateIDCache {
private static final Logger LOGGER = Logger.getLogger(PersistentDuplicateIDCache.class);
private final Map<ByteArray, Integer> cache = new ConcurrentHashMap<>();
private final SimpleString address;
private final ArrayList<ObjLongPair<ByteArray>> ids;
private final IntFunction<Integer> cachedBoxedInts;
private int pos;
private final int cacheSize;
private final StorageManager storageManager;
PersistentDuplicateIDCache(final SimpleString address, final int size, final StorageManager storageManager) {
this.address = address;
cacheSize = size;
ids = new ArrayList<>(size);
cachedBoxedInts = boxedInts(size);
this.storageManager = storageManager;
}
@Override
public synchronized void load(final List<Pair<byte[], Long>> ids) throws Exception {
if (!cache.isEmpty()) {
throw new IllegalStateException("load is valid only on empty cache");
}
// load only ids that fit this cache:
// - in term of remaining capacity
// - ignoring (and reporting) ids unpaired with record ID
// Then, delete the exceeding ones.
long txID = -1;
int toNotBeAdded = ids.size() - cacheSize;
if (toNotBeAdded < 0) {
toNotBeAdded = 0;
}
for (Pair<byte[], Long> id : ids) {
if (id.getB() == null) {
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("ignoring id = %s because without record ID", describeID(id.getA()));
}
if (toNotBeAdded > 0) {
toNotBeAdded--;
}
continue;
}
assert id.getB() != null && id.getB().longValue() != NIL;
if (toNotBeAdded > 0) {
if (txID == -1) {
txID = storageManager.generateID();
}
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("deleting id = %s", describeID(id.getA(), id.getB()));
}
storageManager.deleteDuplicateIDTransactional(txID, id.getB());
toNotBeAdded--;
} else {
ByteArray bah = new ByteArray(id.getA());
ObjLongPair<ByteArray> pair = new ObjLongPair<>(bah, id.getB());
cache.put(bah, cachedBoxedInts.apply(this.ids.size()));
this.ids.add(pair);
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("loading id = %s", describeID(id.getA(), id.getB()));
}
}
}
if (txID != -1) {
storageManager.commit(txID);
}
pos = this.ids.size();
if (pos == cacheSize) {
pos = 0;
}
}
@Override
public void deleteFromCache(byte[] duplicateID) throws Exception {
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("deleting id = %s", describeID(duplicateID));
}
final ByteArray bah = new ByteArray(duplicateID);
final Integer posUsed = cache.remove(bah);
if (posUsed != null) {
synchronized (this) {
final ObjLongPair<ByteArray> id = ids.get(posUsed.intValue());
if (id.getA().equals(bah)) {
final long recordID = id.getB();
id.setA(null);
id.setB(NIL);
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("address = %s deleting id = %s", address, describeID(duplicateID, id.getB()));
}
storageManager.deleteDuplicateID(recordID);
}
}
}
}
private static String describeID(byte[] duplicateID) {
return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID);
}
private static String describeID(byte[] duplicateID, long id) {
return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID) + ", id=" + id;
}
@Override
public boolean contains(final byte[] duplID) {
return contains(new ByteArray(duplID));
}
private boolean contains(final ByteArray duplID) {
boolean contains = cache.containsKey(duplID);
if (LOGGER.isTraceEnabled()) {
if (contains) {
LOGGER.tracef("address = %s found a duplicate %s", address, describeID(duplID.bytes));
}
}
return contains;
}
@Override
public void addToCache(final byte[] duplID) throws Exception {
addToCache(duplID, null, false);
}
@Override
public void addToCache(final byte[] duplID, final Transaction tx) throws Exception {
addToCache(duplID, tx, false);
}
@Override
public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception {
final ByteArray holder = new ByteArray(duplID);
if (contains(holder)) {
if (tx != null) {
tx.markAsRollbackOnly(new ActiveMQDuplicateIdException());
}
return false;
}
addToCache(holder, tx, true);
return true;
}
@Override
public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {
addToCache(new ByteArray(duplID), tx, instantAdd);
}
private synchronized void addToCache(final ByteArray holder,
final Transaction tx,
boolean instantAdd) throws Exception {
final long recordID = storageManager.generateID();
if (tx == null) {
storageManager.storeDuplicateID(address, holder.bytes, recordID);
addToCacheInMemory(holder, recordID);
} else {
storageManager.storeDuplicateIDTransactional(tx.getID(), address, holder.bytes, recordID);
tx.setContainsPersistent();
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("address = %s adding duplicateID TX operation for %s, tx = %s", address,
describeID(holder.bytes, recordID), tx);
}
if (instantAdd) {
tx.addOperation(new AddDuplicateIDOperation(holder, recordID, false));
} else {
// For a tx, it's important that the entry is not added to the cache until commit
// since if the client fails then resends them tx we don't want it to get rejected
tx.afterStore(new AddDuplicateIDOperation(holder, recordID, true));
}
}
}
@Override
public void load(final Transaction tx, final byte[] duplID) {
tx.addOperation(new AddDuplicateIDOperation(new ByteArray(duplID), tx.getID(), true));
}
private synchronized void addToCacheInMemory(final ByteArray holder, final long recordID) {
Objects.requireNonNull(holder, "holder must be not null");
if (recordID < 0) {
throw new IllegalArgumentException("recordID must be >= 0");
}
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("address = %s adding %s", address, describeID(holder.bytes, recordID));
}
cache.put(holder, cachedBoxedInts.apply(pos));
ObjLongPair<ByteArray> id;
if (pos < ids.size()) {
// Need fast array style access here -hence ArrayList typing
id = ids.get(pos);
// The id here might be null if it was explicit deleted
if (id.getA() != null) {
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("address = %s removing excess duplicateDetection %s", address, describeID(id.getA().bytes, id.getB()));
}
cache.remove(id.getA());
assert id.getB() != NIL;
try {
storageManager.deleteDuplicateID(id.getB());
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorDeletingDuplicateCache(e);
}
}
id.setA(holder);
id.setB(recordID);
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("address = %s replacing old duplicateID by %s", address, describeID(id.getA().bytes, id.getB()));
}
} else {
id = new ObjLongPair<>(holder, recordID);
if (LOGGER.isTraceEnabled()) {
LOGGER.tracef("address = %s adding new duplicateID %s", address, describeID(id.getA().bytes, id.getB()));
}
ids.add(id);
}
if (pos++ == cacheSize - 1) {
pos = 0;
}
}
@Override
public synchronized void clear() throws Exception {
LOGGER.debugf("address = %s removing duplicate ID data", address);
final int idsSize = ids.size();
if (idsSize > 0) {
long tx = storageManager.generateID();
for (int i = 0; i < idsSize; i++) {
final ObjLongPair<ByteArray> id = ids.get(i);
if (id.getA() != null) {
assert id.getB() != NIL;
storageManager.deleteDuplicateIDTransactional(tx, id.getB());
}
}
storageManager.commit(tx);
}
ids.clear();
cache.clear();
pos = 0;
}
@Override
public synchronized List<Pair<byte[], Long>> getMap() {
final int idsSize = ids.size();
List<Pair<byte[], Long>> copy = new ArrayList<>(idsSize);
for (int i = 0; i < idsSize; i++) {
final ObjLongPair<ByteArray> id = ids.get(i);
// in case the pair has been removed
if (id.getA() != null) {
assert id.getB() != NIL;
copy.add(new Pair<>(id.getA().bytes, id.getB()));
}
}
return copy;
}
private final class AddDuplicateIDOperation extends TransactionOperationAbstract {
final ByteArray holder;
final long recordID;
volatile boolean done;
private final boolean afterCommit;
AddDuplicateIDOperation(final ByteArray holder, final long recordID, boolean afterCommit) {
this.holder = holder;
this.recordID = recordID;
this.afterCommit = afterCommit;
}
private void process() {
if (!done) {
addToCacheInMemory(holder, recordID);
done = true;
}
}
@Override
public void afterCommit(final Transaction tx) {
if (afterCommit) {
process();
}
}
@Override
public void beforeCommit(Transaction tx) throws Exception {
if (!afterCommit) {
process();
}
}
@Override
public List<MessageReference> getRelatedMessageReferences() {
return null;
}
}
}

View File

@ -16,6 +16,25 @@
*/ */
package org.apache.activemq.artemis.core.postoffice.impl; package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
@ -82,25 +101,6 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf; import static org.apache.activemq.artemis.utils.collections.IterableStream.iterableOf;
/** /**
@ -1315,7 +1315,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
DuplicateIDCache cache = duplicateIDCaches.get(address); DuplicateIDCache cache = duplicateIDCaches.get(address);
if (cache == null) { if (cache == null) {
cache = new DuplicateIDCacheImpl(address, idCacheSize, storageManager, persistIDCache); if (persistIDCache) {
cache = DuplicateIDCaches.persistent(address, idCacheSize, storageManager);
} else {
cache = DuplicateIDCaches.inMemory(address, idCacheSize);
}
DuplicateIDCache oldCache = duplicateIDCaches.putIfAbsent(address, cache); DuplicateIDCache oldCache = duplicateIDCaches.putIfAbsent(address, cache);

View File

@ -24,7 +24,7 @@ import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl; import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.RetryRule; import org.apache.activemq.artemis.utils.RetryRule;
@ -52,7 +52,7 @@ public class DuplicateCacheTest extends StorageManagerTestBase {
public void testDuplicate() throws Exception { public void testDuplicate() throws Exception {
createStorage(); createStorage();
DuplicateIDCache cache = new DuplicateIDCacheImpl(new SimpleString("test"), 2000, journal, true); DuplicateIDCache cache = DuplicateIDCaches.persistent(new SimpleString("test"), 2000, journal);
TransactionImpl tx = new TransactionImpl(journal); TransactionImpl tx = new TransactionImpl(journal);
@ -108,7 +108,7 @@ public class DuplicateCacheTest extends StorageManagerTestBase {
public void testDuplicateNonPersistent() throws Exception { public void testDuplicateNonPersistent() throws Exception {
createStorage(); createStorage();
DuplicateIDCache cache = new DuplicateIDCacheImpl(new SimpleString("test"), 2000, journal, false); DuplicateIDCache cache = DuplicateIDCaches.inMemory(new SimpleString("test"), 2000);
TransactionImpl tx = new TransactionImpl(journal); TransactionImpl tx = new TransactionImpl(journal);

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.performance.jmh;
import java.util.Arrays;
import java.util.SplittableRandom;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
@State(Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Fork(2)
@Warmup(iterations = 5, time = 200, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 8, time = 200, timeUnit = TimeUnit.MILLISECONDS)
public class ByteArrayHashCodeBenchamark {
@Param({"7", "8", "36"})
private int length;
@Param("0")
private int seed;
/**
* The higher the less predictable for the CPU branch predictor.
*/
@Param({"4", "10"})
private int logPermutations;
@Param({"true"})
private boolean unsafe;
private long seq;
private int inputMask;
private byte[][] inputs;
@Setup
public void init() {
System.setProperty("io.netty.noUnsafe", Boolean.valueOf(!unsafe).toString());
int inputSize = 1 << logPermutations;
inputs = new byte[inputSize][];
inputMask = inputs.length - 1;
// splittable random can create repeatable sequence of inputs
SplittableRandom random = new SplittableRandom(seed);
for (int i = 0; i < inputs.length; i++) {
final byte[] bytes = new byte[length];
for (int b = 0; b < length; b++) {
bytes[b] = (byte) random.nextInt(Byte.MIN_VALUE, Byte.MAX_VALUE + 1);
}
inputs[i] = bytes;
}
}
private byte[] nextInput() {
final long seq = this.seq;
final int index = (int) (seq & inputMask);
this.seq = seq + 1;
return inputs[index];
}
@Benchmark
public int artemisHashCode() {
return ByteUtil.hashCode(nextInput());
}
@Benchmark
public int arraysHashCode() {
return Arrays.hashCode(nextInput());
}
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.performance.jmh;
import java.util.SplittableRandom;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
@State(Scope.Benchmark)
@Fork(2)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 8, time = 1)
public class DuplicateIDCacheBenchmark {
@Param({"20000"})
private int size;
@Param({"false", "true"})
private boolean persist;
private DuplicateIDCache cache;
private byte[][] ids;
private int idsMask;
private int missingIdsMask;
private long nextId;
private byte[][] randomEvictedIds;
@Setup
public void init() throws Exception {
cache = persist ?
DuplicateIDCaches.persistent(SimpleString.toSimpleString("benchmark"), size, new NullStorageManager()) :
DuplicateIDCaches.inMemory(SimpleString.toSimpleString("benchmark"), size);
final int idSize = findNextHigherPowerOf2(size);
idsMask = idSize - 1;
nextId = 0;
ids = new byte[idSize][];
// fill the cache
for (int i = 0; i < size; i++) {
final byte[] id = RandomUtil.randomBytes();
ids[i] = id;
cache.addToCache(id, null, true);
}
// evict the first (idSize - size) elements on the ids array.
// Given that being a FIFO cache isn't a stable contract it's going to validate it too.
final int evicted = idSize - size;
for (int i = 0; i < evicted; i++) {
final byte[] id = RandomUtil.randomBytes();
ids[size + i] = id;
cache.addToCache(id, null, true);
// check correctness of eviction policy
if (cache.contains(ids[i])) {
throw new AssertionError("This cache isn't using anymore a FIFO eviction strategy or its real capacity is > " + size);
}
}
// always use the same seed!
SplittableRandom random = new SplittableRandom(0);
// set it big enough to trick branch predictors
final int evictedIdsLength = findNextPowerOf2(Math.max(1024, evicted));
missingIdsMask = evictedIdsLength - 1;
randomEvictedIds = new byte[evictedIdsLength][];
for (int i = 0; i < evictedIdsLength; i++) {
final int id = random.nextInt(0, evicted);
randomEvictedIds[i] = ids[id];
// check correctness of eviction policy
if (cache.contains(ids[id])) {
throw new AssertionError("This cache isn't using anymore a FIFO eviction strategy");
}
}
}
// it isn't checking what's the max power of 2 number nor if size > 0
private static int findNextHigherPowerOf2(int size) {
final int nextPow2 = findNextPowerOf2(size);
if (nextPow2 > size) {
return nextPow2;
}
return nextPow2 * 2;
}
private static int findNextPowerOf2(int size) {
return 1 << (32 - Integer.numberOfLeadingZeros(size - 1));
}
private byte[] nextId() {
final long seq = nextId;
final int index = (int) (seq & idsMask);
nextId = seq + 1;
return ids[index];
}
private byte[] nextMissingId() {
final long seq = nextId;
final int index = (int) (seq & missingIdsMask);
nextId = seq + 1;
return randomEvictedIds[index];
}
@Benchmark
public boolean atomicVerify() throws Exception {
return cache.atomicVerify(nextId(), null);
}
@Benchmark
public boolean containsMissingId() {
return cache.contains(nextMissingId());
}
@TearDown
public void clear() throws Exception {
cache.clear();
}
}

View File

@ -31,8 +31,9 @@ import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl; import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches;
import org.apache.activemq.artemis.core.server.impl.PostOfficeJournalLoader; import org.apache.activemq.artemis.core.server.impl.PostOfficeJournalLoader;
import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl; import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl;
import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakePostOffice; import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakePostOffice;
@ -40,8 +41,8 @@ import org.apache.activemq.artemis.tests.unit.util.FakePagingManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer; import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -106,7 +107,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
Assert.assertEquals(0, mapDups.size()); Assert.assertEquals(0, mapDups.size());
DuplicateIDCacheImpl cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true); DuplicateIDCache cacheID = DuplicateIDCaches.persistent(ADDRESS, 10, journal);
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
cacheID.addToCache(RandomUtil.randomBytes()); cacheID.addToCache(RandomUtil.randomBytes());
@ -126,7 +127,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
Assert.assertEquals(10, values.size()); Assert.assertEquals(10, values.size());
cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true); cacheID = DuplicateIDCaches.persistent(ADDRESS, 10, journal);
cacheID.load(values); cacheID.load(values);
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {

View File

@ -28,14 +28,13 @@ import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl; import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCaches;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.RoutingContext;
@ -201,7 +200,7 @@ public class FakePostOffice implements PostOffice {
@Override @Override
public DuplicateIDCache getDuplicateIDCache(final SimpleString address) { public DuplicateIDCache getDuplicateIDCache(final SimpleString address) {
return new DuplicateIDCacheImpl(address, 2000, new NullStorageManager(), false); return DuplicateIDCaches.inMemory(address, 2000);
} }
@Override @Override