ARTEMIS-2604 Optimize journal loading

- Avoid some Properties Decoding, checking if we need certain properties like scheduled delivery
- Avoid creating some unnecessary SimpleString instances
- Removed some intermediate ActiveMQBuffer allocation
- Removed some intermediate UnreleasableByteBuf allocation
This commit is contained in:
Francesco Nigro 2020-01-21 23:23:55 +01:00 committed by Clebert Suconic
parent fb60795b59
commit 3282f105bb
7 changed files with 240 additions and 8 deletions

View File

@ -35,7 +35,15 @@ public class CompositeAddress {
}
public static SimpleString extractQueueName(SimpleString name) {
return name == null ? null : new SimpleString(extractQueueName(name.toString()));
if (name == null) {
return null;
}
final String nameString = name.toString();
final String queueName = extractQueueName(nameString);
if (queueName.equals(nameString)) {
return name;
}
return new SimpleString(queueName);
}
public static String extractQueueName(String queue) {
@ -50,7 +58,15 @@ public class CompositeAddress {
}
public static SimpleString extractAddressName(SimpleString address) {
return address == null ? null : new SimpleString(extractAddressName(address.toString()));
if (address == null) {
return null;
}
final String addrString = address.toString();
final String addressName = extractAddressName(addrString);
if (addressName.equals(addrString)) {
return address;
}
return new SimpleString(addressName);
}
public static String extractAddressName(String address) {

View File

@ -372,6 +372,79 @@ public class TypedProperties {
}
}
/**
* Performs a search among the valid key properties contained in {@code buffer}, starting from {@code from}
* assuming it to be a valid encoded {@link TypedProperties} content.
*
* @throws IllegalStateException if any not-valid property is found while searching the {@code key} property
*/
public static boolean searchProperty(SimpleString key, ByteBuf buffer, int startIndex) {
// It won't implement a straight linear search for key
// because it would risk to find a SimpleString encoded property value
// equals to the key we're searching for!
int index = startIndex;
byte b = buffer.getByte(index);
index++;
if (b == DataConstants.NULL) {
return false;
}
final int numHeaders = buffer.getInt(index);
index += Integer.BYTES;
for (int i = 0; i < numHeaders; i++) {
final int keyLength = buffer.getInt(index);
index += Integer.BYTES;
if (key.equals(buffer, index, keyLength)) {
return true;
}
if (i == numHeaders - 1) {
return false;
}
index += keyLength;
byte type = buffer.getByte(index);
index++;
switch (type) {
case NULL: {
break;
}
case CHAR:
case SHORT: {
index += Short.BYTES;
break;
}
case BOOLEAN:
case BYTE: {
index += Byte.BYTES;
break;
}
case BYTES:
case STRING: {
index += (Integer.BYTES + buffer.getInt(index));
break;
}
case INT: {
index += Integer.BYTES;
break;
}
case LONG: {
index += Long.BYTES;
break;
}
case FLOAT: {
index += Float.BYTES;
break;
}
case DOUBLE: {
index += Double.BYTES;
break;
}
default: {
throw ActiveMQUtilBundle.BUNDLE.invalidType(type);
}
}
}
return false;
}
public synchronized void decode(final ByteBuf buffer,
final TypedPropertiesDecoderPools keyValuePools) {
byte b = buffer.readByte();

View File

@ -31,6 +31,8 @@ import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import static org.apache.activemq.artemis.utils.collections.TypedProperties.searchProperty;
public class TypedPropertiesTest {
private static void assertEqualsTypeProperties(final TypedProperties expected, final TypedProperties actual) {
@ -250,6 +252,96 @@ public class TypedPropertiesTest {
Assert.assertFalse(properties.clearInternalProperties());
}
@Test
public void testSearchPropertyIfNone() {
TypedProperties props = new TypedProperties();
ByteBuf buf = Unpooled.buffer(Byte.BYTES, Byte.BYTES);
props.encode(buf);
buf.resetReaderIndex();
Assert.assertFalse("There is no property", searchProperty(SimpleString.toSimpleString(""), buf, 0));
}
@Test
public void testSearchAllProperties() {
TypedProperties props = new TypedProperties();
props.putByteProperty(RandomUtil.randomSimpleString(), RandomUtil.randomByte());
props.putBytesProperty(RandomUtil.randomSimpleString(), RandomUtil.randomBytes());
props.putBytesProperty(RandomUtil.randomSimpleString(), null);
props.putBooleanProperty(RandomUtil.randomSimpleString(), RandomUtil.randomBoolean());
props.putShortProperty(RandomUtil.randomSimpleString(), RandomUtil.randomShort());
props.putIntProperty(RandomUtil.randomSimpleString(), RandomUtil.randomInt());
props.putLongProperty(RandomUtil.randomSimpleString(), RandomUtil.randomLong());
props.putFloatProperty(RandomUtil.randomSimpleString(), RandomUtil.randomFloat());
props.putDoubleProperty(RandomUtil.randomSimpleString(), RandomUtil.randomDouble());
props.putCharProperty(RandomUtil.randomSimpleString(), RandomUtil.randomChar());
props.putSimpleStringProperty(RandomUtil.randomSimpleString(), RandomUtil.randomSimpleString());
props.putSimpleStringProperty(RandomUtil.randomSimpleString(), null);
final SimpleString value = RandomUtil.randomSimpleString();
props.putSimpleStringProperty(RandomUtil.randomSimpleString(), value);
ByteBuf buf = Unpooled.buffer();
props.encode(buf);
buf.resetReaderIndex();
Assert.assertFalse(searchProperty(value, buf, 0));
props.forEachKey(key -> {
Assert.assertTrue(searchProperty(key, buf, 0));
Assert.assertTrue(searchProperty(SimpleString.toSimpleString(key.toString()), buf, 0));
// concat a string just to check if the search won't perform an eager search to find the string pattern
Assert.assertFalse(searchProperty(key.concat(" "), buf, 0));
});
}
@Test(expected = IndexOutOfBoundsException.class)
public void testSearchPartiallyEncodedBuffer() {
final int expectedLength = Integer.BYTES + Byte.BYTES;
ByteBuf buf = Unpooled.buffer(expectedLength, expectedLength);
buf.writeByte(DataConstants.NOT_NULL);
buf.writeInt(1);
buf.resetReaderIndex();
searchProperty(SimpleString.toSimpleString(" "), buf, 0);
}
@Test(expected = IndexOutOfBoundsException.class)
public void testSearchPartiallyEncodedString() {
final int expectedLength = Integer.BYTES + Byte.BYTES + Integer.BYTES;
ByteBuf buf = Unpooled.buffer(expectedLength, expectedLength);
buf.writeByte(DataConstants.NOT_NULL);
buf.writeInt(1);
//SimpleString::data length
buf.writeInt(2);
buf.resetReaderIndex();
searchProperty(SimpleString.toSimpleString("a"), buf, 0);
}
@Test(expected = IllegalStateException.class)
public void testSearchWithInvalidTypeBeforeEnd() {
ByteBuf buf = Unpooled.buffer();
buf.writeByte(DataConstants.NOT_NULL);
// fake 2 properties
buf.writeInt(2);
// 1 key with length 2
buf.writeInt(2);
buf.writeShort(3);
// invalid type
buf.writeByte(Byte.MIN_VALUE);
buf.resetReaderIndex();
searchProperty(SimpleString.toSimpleString(""), buf, 0);
}
@Test
public void testSearchWithInvalidTypeEnd() {
ByteBuf buf = Unpooled.buffer();
buf.writeByte(DataConstants.NOT_NULL);
// fake 1 property
buf.writeInt(1);
// 1 key with length 2
buf.writeInt(2);
buf.writeShort(3);
// invalid type
buf.writeByte(Byte.MIN_VALUE);
buf.resetReaderIndex();
Assert.assertFalse(searchProperty(SimpleString.toSimpleString(""), buf, 0));
}
@Before
public void setUp() throws Exception {
props = new TypedProperties();

View File

@ -186,6 +186,14 @@ public interface Message {
// only on core
}
/**
* Search for the existence of the property: an implementor can save
* the message to be decoded, if possible.
*/
default boolean hasScheduledDeliveryTime() {
return getScheduledDeliveryTime() != null;
}
default RoutingType getRoutingType() {
return null;
}

View File

@ -19,12 +19,14 @@ package org.apache.activemq.artemis.core.message.impl;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Set;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@ -133,7 +135,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
public CoreMessage initBuffer(final int initialMessageBufferSize) {
buffer = ActiveMQBuffers.dynamicBuffer(initialMessageBufferSize).byteBuf();
buffer = Unpooled.buffer(initialMessageBufferSize);
// There's a bug in netty which means a dynamic buffer won't resize until you write a byte
buffer.writeByte((byte) 0);
@ -1084,6 +1086,37 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return removeProperty(key(key));
}
@Override
public boolean hasScheduledDeliveryTime() {
return searchProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
}
/**
* Differently from {@link #containsProperty(SimpleString)}, this method can save decoding the message,
* performing a search of the {@code key} property and falling back to {@link #containsProperty(SimpleString)}
* if not possible or if already decoded.
*/
public boolean searchProperty(SimpleString key) {
Objects.requireNonNull(key, "key cannot be null");
TypedProperties properties = this.properties;
if (properties != null) {
return properties.containsProperty(key);
}
synchronized (this) {
final ByteBuf buffer = this.buffer;
// acquiring the lock here, although heavy-weight, is the safer way to do this,
// because we cannot trust that a racing thread won't modify buffer
if (buffer == null) {
throw new NullPointerException("buffer cannot be null");
}
final int propertiesLocation = this.propertiesLocation;
if (propertiesLocation < 0) {
throw new IllegalStateException("propertiesLocation = " + propertiesLocation);
}
return TypedProperties.searchProperty(key, buffer, propertiesLocation);
}
}
@Override
public boolean containsProperty(final SimpleString key) {
return getProperties().containsProperty(key);

View File

@ -41,11 +41,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.transaction.xa.Xid;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
@ -869,7 +871,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
byte[] data = record.data;
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
// We can make this byte[] buffer releasable, because subsequent methods using it are not supposed
// to release it. It saves creating useless UnreleasableByteBuf wrappers
ChannelBufferWrapper buff = new ChannelBufferWrapper(Unpooled.wrappedBuffer(data), true);
byte recordType = record.getUserRecordType();

View File

@ -1181,9 +1181,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
MessageReference reference = MessageReference.Factory.createReference(message, queue);
Long scheduledDeliveryTime = message.getScheduledDeliveryTime();
if (scheduledDeliveryTime != null) {
reference.setScheduledDeliveryTime(scheduledDeliveryTime);
Long scheduledDeliveryTime;
if (message.hasScheduledDeliveryTime()) {
scheduledDeliveryTime = message.getScheduledDeliveryTime();
if (scheduledDeliveryTime != null) {
reference.setScheduledDeliveryTime(scheduledDeliveryTime);
}
}
message.incrementDurableRefCount();
@ -1433,7 +1436,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
Transaction tx = context.getTransaction();
Long deliveryTime = message.getScheduledDeliveryTime();
Long deliveryTime = null;
if (message.hasScheduledDeliveryTime()) {
deliveryTime = message.getScheduledDeliveryTime();
}
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
PagingStore store = pagingManager.getPageStore(entry.getKey());