This commit is contained in:
Francesco Nigro 2020-03-17 11:19:57 +01:00
commit 10ed0094ab
9 changed files with 212 additions and 8 deletions

View File

@ -48,4 +48,11 @@ public interface PagedMessage extends EncodingSupport {
* @throws ActiveMQException
*/
long getPersistentSize() throws ActiveMQException;
/** This returns how much the PagedMessage used, or it's going to use
* from storage.
* We can't calculate the encodeSize as some persisters don't guarantee to re-store the data
* at the same amount of bytes it used. In some cases it may need to add headers in AMQP
* or extra data that may affect the outcome of getEncodeSize() */
int getStoredSize();
}

View File

@ -1357,7 +1357,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
break;
}
int nextFileOffset = message.getPosition().getFileOffset() == -1 ? -1 : message.getPosition().getFileOffset() + message.getPagedMessage().getEncodeSize() + Page.SIZE_RECORD;
int nextFileOffset = message.getPosition().getFileOffset() == -1 ? -1 : message.getPosition().getFileOffset() + message.getPagedMessage().getStoredSize() + Page.SIZE_RECORD;
tmpPosition = new PagePositionAndFileOffset(nextFileOffset, message.getPosition());
boolean valid = true;

View File

@ -198,7 +198,7 @@ public final class Page implements Comparable<Page> {
final int endPosition = readFileBuffer.position() + encodedSize;
//this check must be performed upfront decoding
if (readFileBuffer.remaining() >= (encodedSize + 1) && readFileBuffer.get(endPosition) == Page.END_BYTE) {
final PagedMessageImpl msg = new PagedMessageImpl(storageManager);
final PagedMessageImpl msg = new PagedMessageImpl(encodedSize, storageManager);
readFileBufferWrapper.setIndex(readFileBuffer.position(), endPosition);
msg.decode(readFileBufferWrapper);
readFileBuffer.position(endPosition + 1);
@ -363,7 +363,7 @@ public final class Page implements Comparable<Page> {
final int endPosition = fileBuffer.position() + encodedSize;
//this check must be performed upfront decoding
if (fileBuffer.remaining() >= (encodedSize + 1) && fileBuffer.get(endPosition) == Page.END_BYTE) {
final PagedMessageImpl msg = new PagedMessageImpl(storageManager);
final PagedMessageImpl msg = new PagedMessageImpl(encodedSize, storageManager);
fileBufferWrapper.setIndex(fileBuffer.position(), endPosition);
msg.decode(fileBufferWrapper);
fileBuffer.position(endPosition + 1);

View File

@ -47,6 +47,8 @@ public class PagedMessageImpl implements PagedMessage {
private long transactionID = 0;
private final int storedSize;
private volatile StorageManager storageManager;
public PagedMessageImpl(final Message message, final long[] queueIDs, final long transactionID) {
@ -57,10 +59,22 @@ public class PagedMessageImpl implements PagedMessage {
public PagedMessageImpl(final Message message, final long[] queueIDs) {
this.queueIDs = queueIDs;
this.message = message;
this.storedSize = 0;
}
public PagedMessageImpl(StorageManager storageManager) {
public PagedMessageImpl(int storedSize, StorageManager storageManager) {
this.storageManager = storageManager;
this.storedSize = storedSize;
}
@Override
public int getStoredSize() {
if (storedSize <= 0) {
return getEncodeSize();
} else {
return storedSize;
}
}
@Override

View File

@ -56,7 +56,7 @@ public class ReplicationPageWriteMessage extends PacketImpl {
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
pageNumber = buffer.readInt();
pagedMessage = new PagedMessageImpl(null);
pagedMessage = new PagedMessageImpl(0, null);
pagedMessage.decode(buffer);
}

View File

@ -58,7 +58,7 @@ public class PageReaderTest extends ActiveMQTestBase {
PagePosition pagePosition = new PagePositionImpl(10, i);
pagedMessage = pageReader.getMessage(pagePosition);
} else {
int nextFileOffset = pagedMessage == null ? -1 : offsets[i - 1] + pagedMessage.getEncodeSize() + Page.SIZE_RECORD;
int nextFileOffset = pagedMessage == null ? -1 : offsets[i - 1] + pagedMessage.getStoredSize() + Page.SIZE_RECORD;
PagePositionAndFileOffset startPosition = new PagePositionAndFileOffset(nextFileOffset, new PagePositionImpl(10, i - 1));
PagePosition pagePosition = startPosition.nextPagePostion();
assertEquals("Message " + i + " has wrong offset", offsets[i], pagePosition.getFileOffset());
@ -85,7 +85,7 @@ public class PageReaderTest extends ActiveMQTestBase {
PagePosition pagePosition = new PagePositionImpl(10, 0);
PagedMessage firstPagedMessage = pageReader.getMessage(pagePosition);
assertEquals("Message 0 has a wrong encodeSize", pagedMessages[0].getEncodeSize(), firstPagedMessage.getEncodeSize());
int nextFileOffset = offsets[0] + firstPagedMessage.getEncodeSize() + Page.SIZE_RECORD;
int nextFileOffset = offsets[0] + firstPagedMessage.getStoredSize() + Page.SIZE_RECORD;
PagePositionAndFileOffset startPosition = new PagePositionAndFileOffset(nextFileOffset, new PagePositionImpl(10, 0));
PagePosition nextPagePosition = startPosition.nextPagePostion();
assertEquals("Message 1 has a wrong offset", offsets[1], nextPagePosition.getFileOffset());
@ -147,7 +147,10 @@ public class PageReaderTest extends ActiveMQTestBase {
for (int i = 0; i < num; i++) {
Message msg = createMessage(simpleDestination, i, content);
offsets[i] = (int)page.getFile().position();
page.write(new PagedMessageImpl(msg, new long[0]));
PagedMessageImpl pgdMessage = new PagedMessageImpl(msg, new long[0]);
long expectedPosition = pgdMessage.getEncodeSize() + Page.SIZE_RECORD + page.getFile().position();
page.write(pgdMessage);
Assert.assertEquals(page.getFile().position(), expectedPosition);
Assert.assertEquals(i + 1, page.getNumberOfMessages());
}

View File

@ -19,9 +19,14 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import java.net.URI;
import java.util.LinkedList;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.After;
/**
@ -131,4 +136,16 @@ public class AmqpTestSupport extends ActiveMQTestBase {
public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception {
return new AmqpClient(brokerURI, username, password);
}
public static AMQPStandardMessage encodeAndDecodeMessage(int messageFormat, MessageImpl message, int expectedSize) {
ByteBuf nettyBuffer = Unpooled.buffer(expectedSize);
message.encode(new NettyWritable(nettyBuffer));
byte[] bytes = new byte[nettyBuffer.writerIndex()];
nettyBuffer.readBytes(bytes);
return new AMQPStandardMessage(messageFormat, bytes, null);
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.paging;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageReaderTest;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Test;
public class AmqpPageReaderTest extends PageReaderTest {
public MessageImpl createProtonMessage(String address) {
AmqpMessage message = new AmqpMessage();
final StringBuilder builder = new StringBuilder();
for (int i = 0; i < 1000; i++) {
builder.append('0');
}
final String data = builder.toString();
message.setText(data);
message.setAddress(address);
message.setDurable(true);
MessageImpl protonMessage = (MessageImpl) message.getWrappedMessage();
return protonMessage;
}
@Override
protected Message createMessage(SimpleString address, int msgId, byte[] content) {
MessageImpl protonMessage = createProtonMessage(address.toString());
AMQPStandardMessage amqpStandardMessage = AmqpTestSupport.encodeAndDecodeMessage(0, protonMessage, 2 * 1024);
amqpStandardMessage.setMessageID(msgId);
return amqpStandardMessage;
}
@Test
public void testEncodeSize() throws Exception {
Message message = createMessage(SimpleString.toSimpleString("Test"), 1, new byte[10]);
MessagePersister persister = (MessagePersister)message.getPersister();
ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024);
persister.encode(buffer, message);
Assert.assertEquals(persister.getEncodeSize(message), buffer.writerIndex());
// the very first byte is the persisterID, we skip that since we are calling the Persister directly
buffer.readerIndex(1);
Message messageRead = persister.decode(buffer, null, null);
// The current persister does not guarantee the same encode size after loading
/// if this ever changes we can uncomment the next line.
// Assert.assertEquals(persister.getEncodeSize(message), persister.getEncodeSize(messageRead));
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.paging;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Assert;
import org.junit.Test;
public class AmqpPagingTest extends AmqpClientTestSupport {
@Override
protected void addConfiguration(ActiveMQServer server) {
super.addConfiguration(server);
final Map<String, AddressSettings> addressesSettings = server.getConfiguration().getAddressesSettings();
addressesSettings.get("#").setMaxSizeBytes(100000).setPageSizeBytes(10000);
}
@Test(timeout = 60000)
public void testPaging() throws Exception {
final int MSG_SIZE = 1000;
final StringBuilder builder = new StringBuilder();
for (int i = 0; i < MSG_SIZE; i++) {
builder.append('0');
}
final String data = builder.toString();
final int MSG_COUNT = 1_000;
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName(), true);
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.setPresettle(true);
receiver.flow(10);
Assert.assertNull("somehow the queue had messages from a previous test", receiver.receiveNoWait());
receiver.flow(0);
for (int i = 0; i < MSG_COUNT; i++) {
AmqpMessage message = new AmqpMessage();
message.setText(data);
sender.send(message);
}
sender.close();
receiver.flow(MSG_COUNT);
for (int i = 0; i < MSG_COUNT; i++) {
AmqpMessage receive = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull("Not received anything after " + i + " receive", receive);
receive.accept();
}
receiver.close();
connection.close();
}
}