ARTEMIS-3016 Reduce DuplicateIDCache memory footprint
This commit is contained in:
parent
985559d086
commit
b3b5d4893c
|
@ -0,0 +1,99 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.api.core;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Pair is a holder for 1 object and a >= 0 primitive long value.
|
||||||
|
* <p>
|
||||||
|
* This is a utility class.
|
||||||
|
*/
|
||||||
|
public class ObjLongPair<A> implements Serializable {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 7749478219139339853L;
|
||||||
|
|
||||||
|
public static final long NIL = -1;
|
||||||
|
|
||||||
|
public ObjLongPair(final A a, final long b) {
|
||||||
|
this.a = a;
|
||||||
|
this.b = b;
|
||||||
|
if (b < 0 && b != NIL) {
|
||||||
|
throw new IllegalStateException("b must be >= 0 or == NIL = " + NIL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public ObjLongPair(final A a) {
|
||||||
|
this.a = a;
|
||||||
|
this.b = NIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
private A a;
|
||||||
|
private long b;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
if (a == null && b == NIL) {
|
||||||
|
return super.hashCode();
|
||||||
|
}
|
||||||
|
// it's ok to use b to compute hashCode although NIL
|
||||||
|
return (a == null ? 0 : a.hashCode()) + 37 * Long.hashCode(b);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(final Object other) {
|
||||||
|
if (other == this) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (other instanceof ObjLongPair == false) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
ObjLongPair<A> pother = (ObjLongPair<A>) other;
|
||||||
|
|
||||||
|
return (Objects.equals(pother.a, a)) && (pother.b == b);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ObjLongPair[a=" + a + ", b=" + (b == NIL ? "NIL" : b) + "]";
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setA(A a) {
|
||||||
|
if (this.a == a) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.a = a;
|
||||||
|
}
|
||||||
|
|
||||||
|
public A getA() {
|
||||||
|
return a;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setB(long b) {
|
||||||
|
if (b < 0 && b != NIL) {
|
||||||
|
throw new IllegalStateException("b must be >= 0 or == NIL = " + NIL);
|
||||||
|
}
|
||||||
|
this.b = b;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getB() {
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
}
|
|
@ -275,6 +275,52 @@ public class ByteUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static int hashCode(byte[] bytes) {
|
||||||
|
if (PlatformDependent.hasUnsafe() && PlatformDependent.isUnaligned()) {
|
||||||
|
return unsafeHashCode(bytes);
|
||||||
|
}
|
||||||
|
return Arrays.hashCode(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This hash code computation is borrowed by {@link io.netty.buffer.ByteBufUtil#hashCode(ByteBuf)}.
|
||||||
|
*/
|
||||||
|
private static int unsafeHashCode(byte[] bytes) {
|
||||||
|
if (bytes == null) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
final int len = bytes.length;
|
||||||
|
int hashCode = 1;
|
||||||
|
final int intCount = len >>> 2;
|
||||||
|
int arrayIndex = 0;
|
||||||
|
// reading in batch both help hash code computation data dependencies and save memory bandwidth
|
||||||
|
for (int i = 0; i < intCount; i++) {
|
||||||
|
hashCode = 31 * hashCode + PlatformDependent.getInt(bytes, arrayIndex);
|
||||||
|
arrayIndex += Integer.BYTES;
|
||||||
|
}
|
||||||
|
final byte remaining = (byte) (len & 3);
|
||||||
|
if (remaining > 0) {
|
||||||
|
hashCode = unsafeUnrolledHashCode(bytes, arrayIndex, remaining, hashCode);
|
||||||
|
}
|
||||||
|
return hashCode == 0 ? 1 : hashCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int unsafeUnrolledHashCode(byte[] bytes, int index, int bytesCount, int h) {
|
||||||
|
// there is still the hash data dependency but is more friendly
|
||||||
|
// then a plain loop, given that we know no loop is needed here
|
||||||
|
assert bytesCount > 0 && bytesCount < 4;
|
||||||
|
h = 31 * h + PlatformDependent.getByte(bytes, index);
|
||||||
|
if (bytesCount == 1) {
|
||||||
|
return h;
|
||||||
|
}
|
||||||
|
h = 31 * h + PlatformDependent.getByte(bytes, index + 1);
|
||||||
|
if (bytesCount == 2) {
|
||||||
|
return h;
|
||||||
|
}
|
||||||
|
h = 31 * h + PlatformDependent.getByte(bytes, index + 2);
|
||||||
|
return h;
|
||||||
|
}
|
||||||
|
|
||||||
public static boolean equals(final byte[] left, final byte[] right) {
|
public static boolean equals(final byte[] left, final byte[] right) {
|
||||||
return equals(left, right, 0, right.length);
|
return equals(left, right, 0, right.length);
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,17 +17,23 @@
|
||||||
package org.apache.activemq.artemis.utils;
|
package org.apache.activemq.artemis.utils;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.ByteOrder;
|
||||||
import java.nio.ReadOnlyBufferException;
|
import java.nio.ReadOnlyBufferException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import io.netty.util.internal.PlatformDependent;
|
import io.netty.util.internal.PlatformDependent;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.Assume;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
public class ByteUtilTest {
|
public class ByteUtilTest {
|
||||||
|
|
||||||
|
@ -83,6 +89,46 @@ public class ByteUtilTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUnsafeUnalignedByteArrayHashCode() {
|
||||||
|
Assume.assumeTrue(PlatformDependent.hasUnsafe());
|
||||||
|
Assume.assumeTrue(PlatformDependent.isUnaligned());
|
||||||
|
Map<byte[], Integer> map = new LinkedHashMap<>();
|
||||||
|
map.put(new byte[0], 1);
|
||||||
|
map.put(new byte[]{1}, 32);
|
||||||
|
map.put(new byte[]{2}, 33);
|
||||||
|
map.put(new byte[]{0, 1}, 962);
|
||||||
|
map.put(new byte[]{1, 2}, 994);
|
||||||
|
if (ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN) {
|
||||||
|
map.put(new byte[]{0, 1, 2, 3, 4, 5}, 63504931);
|
||||||
|
map.put(new byte[]{6, 7, 8, 9, 0, 1}, -1603953111);
|
||||||
|
map.put(new byte[]{-1, -1, -1, (byte) 0xE1}, 1);
|
||||||
|
} else {
|
||||||
|
map.put(new byte[]{0, 1, 2, 3, 4, 5}, 1250309600);
|
||||||
|
map.put(new byte[]{6, 7, 8, 9, 0, 1}, -417148442);
|
||||||
|
map.put(new byte[]{-1, -1, -1, (byte) 0xE1}, -503316450);
|
||||||
|
}
|
||||||
|
for (Map.Entry<byte[], Integer> e : map.entrySet()) {
|
||||||
|
assertEquals("input = " + Arrays.toString(e.getKey()), e.getValue().intValue(), ByteUtil.hashCode(e.getKey()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNoUnsafeAlignedByteArrayHashCode() {
|
||||||
|
Assume.assumeFalse(PlatformDependent.hasUnsafe());
|
||||||
|
Assume.assumeFalse(PlatformDependent.isUnaligned());
|
||||||
|
ArrayList<byte[]> inputs = new ArrayList<>();
|
||||||
|
inputs.add(new byte[0]);
|
||||||
|
inputs.add(new byte[]{1});
|
||||||
|
inputs.add(new byte[]{2});
|
||||||
|
inputs.add(new byte[]{0, 1});
|
||||||
|
inputs.add(new byte[]{1, 2});
|
||||||
|
inputs.add(new byte[]{0, 1, 2, 3, 4, 5});
|
||||||
|
inputs.add(new byte[]{6, 7, 8, 9, 0, 1});
|
||||||
|
inputs.add(new byte[]{-1, -1, -1, (byte) 0xE1});
|
||||||
|
inputs.forEach(input -> assertEquals("input = " + Arrays.toString(input), Arrays.hashCode(input), ByteUtil.hashCode(input)));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTextBytesToLongBytesNegative() {
|
public void testTextBytesToLongBytesNegative() {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -16,12 +16,15 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.postoffice.impl;
|
package org.apache.activemq.artemis.core.postoffice.impl;
|
||||||
|
|
||||||
|
import java.lang.ref.Reference;
|
||||||
|
import java.lang.ref.WeakReference;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
|
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
|
||||||
|
import org.apache.activemq.artemis.api.core.ObjLongPair;
|
||||||
import org.apache.activemq.artemis.api.core.Pair;
|
import org.apache.activemq.artemis.api.core.Pair;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||||
|
@ -33,6 +36,8 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract
|
||||||
import org.apache.activemq.artemis.utils.ByteUtil;
|
import org.apache.activemq.artemis.utils.ByteUtil;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.api.core.ObjLongPair.NIL;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A DuplicateIDCacheImpl
|
* A DuplicateIDCacheImpl
|
||||||
*
|
*
|
||||||
|
@ -41,6 +46,23 @@ import org.jboss.logging.Logger;
|
||||||
public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(DuplicateIDCacheImpl.class);
|
private static final Logger logger = Logger.getLogger(DuplicateIDCacheImpl.class);
|
||||||
|
// we're not interested into safe publication here: we need to scale, be fast and save "some" GC to happen
|
||||||
|
private static WeakReference<Integer[]> INDEXES = null;
|
||||||
|
|
||||||
|
private static Integer[] boxedInts(int size) {
|
||||||
|
final Reference<Integer[]> indexesRef = INDEXES;
|
||||||
|
final Integer[] indexes = indexesRef == null ? null : indexesRef.get();
|
||||||
|
if (indexes != null && size <= indexes.length) {
|
||||||
|
return indexes;
|
||||||
|
}
|
||||||
|
final int newSize = size + (indexes == null ? 0 : size / 2);
|
||||||
|
final Integer[] newIndexes = new Integer[newSize];
|
||||||
|
if (indexes != null) {
|
||||||
|
System.arraycopy(indexes, 0, newIndexes, 0, indexes.length);
|
||||||
|
}
|
||||||
|
INDEXES = new WeakReference<>(newIndexes);
|
||||||
|
return newIndexes;
|
||||||
|
}
|
||||||
|
|
||||||
// ByteHolder, position
|
// ByteHolder, position
|
||||||
private final Map<ByteArrayHolder, Integer> cache = new ConcurrentHashMap<>();
|
private final Map<ByteArrayHolder, Integer> cache = new ConcurrentHashMap<>();
|
||||||
|
@ -49,7 +71,9 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
||||||
|
|
||||||
// Note - deliberately typed as ArrayList since we want to ensure fast indexed
|
// Note - deliberately typed as ArrayList since we want to ensure fast indexed
|
||||||
// based array access
|
// based array access
|
||||||
private final ArrayList<Pair<ByteArrayHolder, Long>> ids;
|
private final ArrayList<ObjLongPair<ByteArrayHolder>> ids;
|
||||||
|
|
||||||
|
private final Integer[] cachedBoxedInts;
|
||||||
|
|
||||||
private int pos;
|
private int pos;
|
||||||
|
|
||||||
|
@ -69,11 +93,24 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
||||||
|
|
||||||
ids = new ArrayList<>(size);
|
ids = new ArrayList<>(size);
|
||||||
|
|
||||||
|
cachedBoxedInts = boxedInts(size);
|
||||||
|
|
||||||
this.storageManager = storageManager;
|
this.storageManager = storageManager;
|
||||||
|
|
||||||
this.persist = persist;
|
this.persist = persist;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// best effort caching mechanism
|
||||||
|
private Integer boxed(int index) {
|
||||||
|
Integer boxedInt = this.cachedBoxedInts[index];
|
||||||
|
if (boxedInt == null) {
|
||||||
|
boxedInt = index;
|
||||||
|
cachedBoxedInts[index] = boxedInt;
|
||||||
|
}
|
||||||
|
assert boxedInt != null;
|
||||||
|
return boxedInt;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void load(final List<Pair<byte[], Long>> theIds) throws Exception {
|
public void load(final List<Pair<byte[], Long>> theIds) throws Exception {
|
||||||
long txID = -1;
|
long txID = -1;
|
||||||
|
@ -98,9 +135,9 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
||||||
} else {
|
} else {
|
||||||
ByteArrayHolder bah = new ByteArrayHolder(id.getA());
|
ByteArrayHolder bah = new ByteArrayHolder(id.getA());
|
||||||
|
|
||||||
Pair<ByteArrayHolder, Long> pair = new Pair<>(bah, id.getB());
|
ObjLongPair<ByteArrayHolder> pair = new ObjLongPair<>(bah, id.getB());
|
||||||
|
|
||||||
cache.put(bah, ids.size());
|
cache.put(bah, boxed(ids.size()));
|
||||||
|
|
||||||
ids.add(pair);
|
ids.add(pair);
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
|
@ -133,7 +170,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
||||||
Integer posUsed = cache.remove(bah);
|
Integer posUsed = cache.remove(bah);
|
||||||
|
|
||||||
if (posUsed != null) {
|
if (posUsed != null) {
|
||||||
Pair<ByteArrayHolder, Long> id;
|
ObjLongPair<ByteArrayHolder> id;
|
||||||
|
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
id = ids.get(posUsed.intValue());
|
id = ids.get(posUsed.intValue());
|
||||||
|
@ -144,7 +181,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::deleteFromCache deleting id=" + describeID(duplicateID, id.getB()));
|
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::deleteFromCache deleting id=" + describeID(duplicateID, id.getB()));
|
||||||
}
|
}
|
||||||
id.setB(null);
|
id.setB(NIL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -161,10 +198,16 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean contains(final byte[] duplID) {
|
public boolean contains(final byte[] duplID) {
|
||||||
boolean contains = cache.get(new ByteArrayHolder(duplID)) != null;
|
return contains(new ByteArrayHolder(duplID));
|
||||||
|
}
|
||||||
|
|
||||||
if (contains) {
|
private boolean contains(final ByteArrayHolder duplID) {
|
||||||
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::constains found a duplicate " + describeID(duplID, 0));
|
boolean contains = cache.containsKey(duplID);
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
if (contains) {
|
||||||
|
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::contains found a duplicate " + describeID(duplID.bytes, 0));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return contains;
|
return contains;
|
||||||
}
|
}
|
||||||
|
@ -181,68 +224,68 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception {
|
public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception {
|
||||||
|
final ByteArrayHolder holder = new ByteArrayHolder(duplID);
|
||||||
if (contains(duplID)) {
|
if (contains(holder)) {
|
||||||
if (tx != null) {
|
if (tx != null) {
|
||||||
tx.markAsRollbackOnly(new ActiveMQDuplicateIdException());
|
tx.markAsRollbackOnly(new ActiveMQDuplicateIdException());
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
} else {
|
|
||||||
addToCache(duplID, tx, true);
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
addToCache(holder, tx, true);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {
|
public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {
|
||||||
long recordID = -1;
|
addToCache(new ByteArrayHolder(duplID), tx, instantAdd);
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void addToCache(final ByteArrayHolder holder,
|
||||||
|
final Transaction tx,
|
||||||
|
boolean instantAdd) throws Exception {
|
||||||
|
long recordID = -1;
|
||||||
if (tx == null) {
|
if (tx == null) {
|
||||||
if (persist) {
|
if (persist) {
|
||||||
recordID = storageManager.generateID();
|
recordID = storageManager.generateID();
|
||||||
storageManager.storeDuplicateID(address, duplID, recordID);
|
storageManager.storeDuplicateID(address, holder.bytes, recordID);
|
||||||
}
|
}
|
||||||
|
|
||||||
addToCacheInMemory(duplID, recordID);
|
addToCacheInMemory(holder, recordID);
|
||||||
} else {
|
} else {
|
||||||
if (persist) {
|
if (persist) {
|
||||||
recordID = storageManager.generateID();
|
recordID = storageManager.generateID();
|
||||||
storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID);
|
storageManager.storeDuplicateIDTransactional(tx.getID(), address, holder.bytes, recordID);
|
||||||
|
|
||||||
tx.setContainsPersistent();
|
tx.setContainsPersistent();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID) + ", tx=" + tx);
|
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(holder.bytes, recordID) + ", tx=" + tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (instantAdd) {
|
if (instantAdd) {
|
||||||
tx.addOperation(new AddDuplicateIDOperation(duplID, recordID, false));
|
tx.addOperation(new AddDuplicateIDOperation(holder, recordID, false));
|
||||||
} else {
|
} else {
|
||||||
// For a tx, it's important that the entry is not added to the cache until commit
|
// For a tx, it's important that the entry is not added to the cache until commit
|
||||||
// since if the client fails then resends them tx we don't want it to get rejected
|
// since if the client fails then resends them tx we don't want it to get rejected
|
||||||
tx.afterStore(new AddDuplicateIDOperation(duplID, recordID, true));
|
tx.afterStore(new AddDuplicateIDOperation(holder, recordID, true));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void load(final Transaction tx, final byte[] duplID) {
|
public void load(final Transaction tx, final byte[] duplID) {
|
||||||
tx.addOperation(new AddDuplicateIDOperation(duplID, tx.getID(), true));
|
tx.addOperation(new AddDuplicateIDOperation(new ByteArrayHolder(duplID), tx.getID(), true));
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID) {
|
private synchronized void addToCacheInMemory(final ByteArrayHolder holder, final long recordID) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding " + describeID(duplID, recordID));
|
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding " + describeID(holder.bytes, recordID));
|
||||||
}
|
}
|
||||||
|
|
||||||
ByteArrayHolder holder = new ByteArrayHolder(duplID);
|
cache.put(holder, boxed(pos));
|
||||||
|
|
||||||
cache.put(holder, pos);
|
ObjLongPair<ByteArrayHolder> id;
|
||||||
|
|
||||||
Pair<ByteArrayHolder, Long> id;
|
|
||||||
|
|
||||||
if (pos < ids.size()) {
|
if (pos < ids.size()) {
|
||||||
// Need fast array style access here -hence ArrayList typing
|
// Need fast array style access here -hence ArrayList typing
|
||||||
|
@ -260,7 +303,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
||||||
// Note we can't use update since journal update doesn't let older records get
|
// Note we can't use update since journal update doesn't let older records get
|
||||||
// reclaimed
|
// reclaimed
|
||||||
|
|
||||||
if (id.getB() != null) {
|
if (id.getB() != NIL) {
|
||||||
try {
|
try {
|
||||||
storageManager.deleteDuplicateID(id.getB());
|
storageManager.deleteDuplicateID(id.getB());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -273,15 +316,14 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
||||||
|
|
||||||
// The recordID could be negative if the duplicateCache is configured to not persist,
|
// The recordID could be negative if the duplicateCache is configured to not persist,
|
||||||
// -1 would mean null on this case
|
// -1 would mean null on this case
|
||||||
id.setB(recordID >= 0 ? recordID : null);
|
id.setB(recordID >= 0 ? recordID : NIL);
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory replacing old duplicateID by " + describeID(id.getA().bytes, id.getB()));
|
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory replacing old duplicateID by " + describeID(id.getA().bytes, id.getB()));
|
||||||
}
|
}
|
||||||
|
|
||||||
holder.pos = pos;
|
|
||||||
} else {
|
} else {
|
||||||
id = new Pair<>(holder, recordID >= 0 ? recordID : null);
|
id = new ObjLongPair<>(holder, recordID >= 0 ? recordID : NIL);
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding new duplicateID " + describeID(id.getA().bytes, id.getB()));
|
logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding new duplicateID " + describeID(id.getA().bytes, id.getB()));
|
||||||
|
@ -289,7 +331,6 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
||||||
|
|
||||||
ids.add(id);
|
ids.add(id);
|
||||||
|
|
||||||
holder.pos = pos;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pos++ == cacheSize - 1) {
|
if (pos++ == cacheSize - 1) {
|
||||||
|
@ -301,10 +342,12 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
||||||
public void clear() throws Exception {
|
public void clear() throws Exception {
|
||||||
logger.debug("DuplicateIDCacheImpl(" + this.address + ")::clear removing duplicate ID data");
|
logger.debug("DuplicateIDCacheImpl(" + this.address + ")::clear removing duplicate ID data");
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (ids.size() > 0 && persist) {
|
final int idsSize = ids.size();
|
||||||
|
if (idsSize > 0 && persist) {
|
||||||
long tx = storageManager.generateID();
|
long tx = storageManager.generateID();
|
||||||
for (Pair<ByteArrayHolder, Long> id : ids) {
|
for (int i = 0; i < idsSize; i++) {
|
||||||
if (id != null) {
|
final ObjLongPair<ByteArrayHolder> id = ids.get(i);
|
||||||
|
if (id != null && id.getB() != NIL) {
|
||||||
storageManager.deleteDuplicateIDTransactional(tx, id.getB());
|
storageManager.deleteDuplicateIDTransactional(tx, id.getB());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -319,16 +362,18 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Pair<byte[], Long>> getMap() {
|
public List<Pair<byte[], Long>> getMap() {
|
||||||
List<Pair<byte[], Long>> list = new ArrayList<>();
|
final int idsSize = ids.size();
|
||||||
for (Pair<ByteArrayHolder, Long> id : ids) {
|
List<Pair<byte[], Long>> copy = new ArrayList<>(idsSize);
|
||||||
list.add(new Pair<>(id.getA().bytes, id.getB()));
|
for (int i = 0; i < idsSize; i++) {
|
||||||
|
final ObjLongPair<ByteArrayHolder> id = ids.get(i);
|
||||||
|
copy.add(new Pair<>(id.getA().bytes, id.getB() == NIL ? null : id.getB()));
|
||||||
}
|
}
|
||||||
return list;
|
return copy;
|
||||||
}
|
}
|
||||||
|
|
||||||
private final class AddDuplicateIDOperation extends TransactionOperationAbstract {
|
private final class AddDuplicateIDOperation extends TransactionOperationAbstract {
|
||||||
|
|
||||||
final byte[] duplID;
|
final ByteArrayHolder holder;
|
||||||
|
|
||||||
final long recordID;
|
final long recordID;
|
||||||
|
|
||||||
|
@ -336,15 +381,15 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
||||||
|
|
||||||
private final boolean afterCommit;
|
private final boolean afterCommit;
|
||||||
|
|
||||||
AddDuplicateIDOperation(final byte[] duplID, final long recordID, boolean afterCommit) {
|
AddDuplicateIDOperation(final ByteArrayHolder holder, final long recordID, boolean afterCommit) {
|
||||||
this.duplID = duplID;
|
this.holder = holder;
|
||||||
this.recordID = recordID;
|
this.recordID = recordID;
|
||||||
this.afterCommit = afterCommit;
|
this.afterCommit = afterCommit;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void process() {
|
private void process() {
|
||||||
if (!done) {
|
if (!done) {
|
||||||
addToCacheInMemory(duplID, recordID);
|
addToCacheInMemory(holder, recordID);
|
||||||
|
|
||||||
done = true;
|
done = true;
|
||||||
}
|
}
|
||||||
|
@ -372,32 +417,20 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
||||||
|
|
||||||
private static final class ByteArrayHolder {
|
private static final class ByteArrayHolder {
|
||||||
|
|
||||||
|
private final byte[] bytes;
|
||||||
|
|
||||||
|
private int hash;
|
||||||
|
|
||||||
ByteArrayHolder(final byte[] bytes) {
|
ByteArrayHolder(final byte[] bytes) {
|
||||||
this.bytes = bytes;
|
this.bytes = bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
final byte[] bytes;
|
|
||||||
|
|
||||||
int hash;
|
|
||||||
|
|
||||||
int pos;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(final Object other) {
|
public boolean equals(final Object other) {
|
||||||
if (other instanceof ByteArrayHolder) {
|
if (other instanceof ByteArrayHolder) {
|
||||||
ByteArrayHolder s = (ByteArrayHolder) other;
|
ByteArrayHolder s = (ByteArrayHolder) other;
|
||||||
|
|
||||||
if (bytes.length != s.bytes.length) {
|
return ByteUtil.equals(bytes, s.bytes);
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < bytes.length; i++) {
|
|
||||||
if (bytes[i] != s.bytes[i]) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -406,9 +439,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
if (hash == 0) {
|
if (hash == 0) {
|
||||||
for (byte b : bytes) {
|
hash = ByteUtil.hashCode(bytes);
|
||||||
hash = 31 * hash + b;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return hash;
|
return hash;
|
||||||
|
|
|
@ -0,0 +1,97 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.performance.jmh;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.SplittableRandom;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.utils.ByteUtil;
|
||||||
|
import org.openjdk.jmh.annotations.Benchmark;
|
||||||
|
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||||
|
import org.openjdk.jmh.annotations.Fork;
|
||||||
|
import org.openjdk.jmh.annotations.Measurement;
|
||||||
|
import org.openjdk.jmh.annotations.Mode;
|
||||||
|
import org.openjdk.jmh.annotations.OutputTimeUnit;
|
||||||
|
import org.openjdk.jmh.annotations.Param;
|
||||||
|
import org.openjdk.jmh.annotations.Scope;
|
||||||
|
import org.openjdk.jmh.annotations.Setup;
|
||||||
|
import org.openjdk.jmh.annotations.State;
|
||||||
|
import org.openjdk.jmh.annotations.Warmup;
|
||||||
|
|
||||||
|
@State(Scope.Benchmark)
|
||||||
|
@BenchmarkMode(Mode.AverageTime)
|
||||||
|
@OutputTimeUnit(TimeUnit.NANOSECONDS)
|
||||||
|
@Fork(2)
|
||||||
|
@Warmup(iterations = 5, time = 200, timeUnit = TimeUnit.MILLISECONDS)
|
||||||
|
@Measurement(iterations = 8, time = 200, timeUnit = TimeUnit.MILLISECONDS)
|
||||||
|
public class ByteArrayHashCodeBenchamark {
|
||||||
|
|
||||||
|
@Param({"7", "8", "36"})
|
||||||
|
private int length;
|
||||||
|
|
||||||
|
@Param("0")
|
||||||
|
private int seed;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The higher the less predictable for the CPU branch predictor.
|
||||||
|
*/
|
||||||
|
@Param({"4", "10"})
|
||||||
|
private int logPermutations;
|
||||||
|
|
||||||
|
@Param({"true"})
|
||||||
|
private boolean unsafe;
|
||||||
|
|
||||||
|
private long seq;
|
||||||
|
private int inputMask;
|
||||||
|
private byte[][] inputs;
|
||||||
|
|
||||||
|
@Setup
|
||||||
|
public void init() {
|
||||||
|
System.setProperty("io.netty.noUnsafe", Boolean.valueOf(!unsafe).toString());
|
||||||
|
int inputSize = 1 << logPermutations;
|
||||||
|
inputs = new byte[inputSize][];
|
||||||
|
inputMask = inputs.length - 1;
|
||||||
|
// splittable random can create repeatable sequence of inputs
|
||||||
|
SplittableRandom random = new SplittableRandom(seed);
|
||||||
|
for (int i = 0; i < inputs.length; i++) {
|
||||||
|
final byte[] bytes = new byte[length];
|
||||||
|
for (int b = 0; b < length; b++) {
|
||||||
|
bytes[b] = (byte) random.nextInt(Byte.MIN_VALUE, Byte.MAX_VALUE + 1);
|
||||||
|
}
|
||||||
|
inputs[i] = bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] nextInput() {
|
||||||
|
final long seq = this.seq;
|
||||||
|
final int index = (int) (seq & inputMask);
|
||||||
|
this.seq = seq + 1;
|
||||||
|
return inputs[index];
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
public int artemisHashCode() {
|
||||||
|
return ByteUtil.hashCode(nextInput());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
public int arraysHashCode() {
|
||||||
|
return Arrays.hashCode(nextInput());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue