diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java index 5b39691f1f..c2344ff2f1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java @@ -48,4 +48,11 @@ public interface PagedMessage extends EncodingSupport { * @throws ActiveMQException */ long getPersistentSize() throws ActiveMQException; + + /** This returns how much the PagedMessage used, or it's going to use + * from storage. + * We can't calculate the encodeSize as some persisters don't guarantee to re-store the data + * at the same amount of bytes it used. In some cases it may need to add headers in AMQP + * or extra data that may affect the outcome of getEncodeSize() */ + int getStoredSize(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index 423b588500..49b7b459da 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -1357,7 +1357,7 @@ public final class PageSubscriptionImpl implements PageSubscription { break; } - int nextFileOffset = message.getPosition().getFileOffset() == -1 ? -1 : message.getPosition().getFileOffset() + message.getPagedMessage().getEncodeSize() + Page.SIZE_RECORD; + int nextFileOffset = message.getPosition().getFileOffset() == -1 ? -1 : message.getPosition().getFileOffset() + message.getPagedMessage().getStoredSize() + Page.SIZE_RECORD; tmpPosition = new PagePositionAndFileOffset(nextFileOffset, message.getPosition()); boolean valid = true; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java index bbaea46867..bdaf99e49e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java @@ -198,7 +198,7 @@ public final class Page implements Comparable { final int endPosition = readFileBuffer.position() + encodedSize; //this check must be performed upfront decoding if (readFileBuffer.remaining() >= (encodedSize + 1) && readFileBuffer.get(endPosition) == Page.END_BYTE) { - final PagedMessageImpl msg = new PagedMessageImpl(storageManager); + final PagedMessageImpl msg = new PagedMessageImpl(encodedSize, storageManager); readFileBufferWrapper.setIndex(readFileBuffer.position(), endPosition); msg.decode(readFileBufferWrapper); readFileBuffer.position(endPosition + 1); @@ -363,7 +363,7 @@ public final class Page implements Comparable { final int endPosition = fileBuffer.position() + encodedSize; //this check must be performed upfront decoding if (fileBuffer.remaining() >= (encodedSize + 1) && fileBuffer.get(endPosition) == Page.END_BYTE) { - final PagedMessageImpl msg = new PagedMessageImpl(storageManager); + final PagedMessageImpl msg = new PagedMessageImpl(encodedSize, storageManager); fileBufferWrapper.setIndex(fileBuffer.position(), endPosition); msg.decode(fileBufferWrapper); fileBuffer.position(endPosition + 1); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java index 11eb2020d1..67d902f449 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java @@ -47,6 +47,8 @@ public class PagedMessageImpl implements PagedMessage { private long transactionID = 0; + private final int storedSize; + private volatile StorageManager storageManager; public PagedMessageImpl(final Message message, final long[] queueIDs, final long transactionID) { @@ -57,10 +59,22 @@ public class PagedMessageImpl implements PagedMessage { public PagedMessageImpl(final Message message, final long[] queueIDs) { this.queueIDs = queueIDs; this.message = message; + this.storedSize = 0; } - public PagedMessageImpl(StorageManager storageManager) { + public PagedMessageImpl(int storedSize, StorageManager storageManager) { this.storageManager = storageManager; + this.storedSize = storedSize; + } + + + @Override + public int getStoredSize() { + if (storedSize <= 0) { + return getEncodeSize(); + } else { + return storedSize; + } } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java index 4f5079de9d..7aa9f8a1f9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java @@ -56,7 +56,7 @@ public class ReplicationPageWriteMessage extends PacketImpl { @Override public void decodeRest(final ActiveMQBuffer buffer) { pageNumber = buffer.readInt(); - pagedMessage = new PagedMessageImpl(null); + pagedMessage = new PagedMessageImpl(0, null); pagedMessage.decode(buffer); } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java index f347714c62..93cf5bdb63 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java @@ -58,7 +58,7 @@ public class PageReaderTest extends ActiveMQTestBase { PagePosition pagePosition = new PagePositionImpl(10, i); pagedMessage = pageReader.getMessage(pagePosition); } else { - int nextFileOffset = pagedMessage == null ? -1 : offsets[i - 1] + pagedMessage.getEncodeSize() + Page.SIZE_RECORD; + int nextFileOffset = pagedMessage == null ? -1 : offsets[i - 1] + pagedMessage.getStoredSize() + Page.SIZE_RECORD; PagePositionAndFileOffset startPosition = new PagePositionAndFileOffset(nextFileOffset, new PagePositionImpl(10, i - 1)); PagePosition pagePosition = startPosition.nextPagePostion(); assertEquals("Message " + i + " has wrong offset", offsets[i], pagePosition.getFileOffset()); @@ -85,7 +85,7 @@ public class PageReaderTest extends ActiveMQTestBase { PagePosition pagePosition = new PagePositionImpl(10, 0); PagedMessage firstPagedMessage = pageReader.getMessage(pagePosition); assertEquals("Message 0 has a wrong encodeSize", pagedMessages[0].getEncodeSize(), firstPagedMessage.getEncodeSize()); - int nextFileOffset = offsets[0] + firstPagedMessage.getEncodeSize() + Page.SIZE_RECORD; + int nextFileOffset = offsets[0] + firstPagedMessage.getStoredSize() + Page.SIZE_RECORD; PagePositionAndFileOffset startPosition = new PagePositionAndFileOffset(nextFileOffset, new PagePositionImpl(10, 0)); PagePosition nextPagePosition = startPosition.nextPagePostion(); assertEquals("Message 1 has a wrong offset", offsets[1], nextPagePosition.getFileOffset()); @@ -147,7 +147,10 @@ public class PageReaderTest extends ActiveMQTestBase { for (int i = 0; i < num; i++) { Message msg = createMessage(simpleDestination, i, content); offsets[i] = (int)page.getFile().position(); - page.write(new PagedMessageImpl(msg, new long[0])); + PagedMessageImpl pgdMessage = new PagedMessageImpl(msg, new long[0]); + long expectedPosition = pgdMessage.getEncodeSize() + Page.SIZE_RECORD + page.getFile().position(); + page.write(pgdMessage); + Assert.assertEquals(page.getFile().position(), expectedPosition); Assert.assertEquals(i + 1, page.getNumberOfMessages()); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java index 15873a63f4..3532caa7f4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java @@ -19,9 +19,14 @@ package org.apache.activemq.artemis.tests.integration.amqp; import java.net.URI; import java.util.LinkedList; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage; +import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.qpid.proton.message.impl.MessageImpl; import org.junit.After; /** @@ -131,4 +136,16 @@ public class AmqpTestSupport extends ActiveMQTestBase { public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception { return new AmqpClient(brokerURI, username, password); } + + public static AMQPStandardMessage encodeAndDecodeMessage(int messageFormat, MessageImpl message, int expectedSize) { + ByteBuf nettyBuffer = Unpooled.buffer(expectedSize); + + message.encode(new NettyWritable(nettyBuffer)); + byte[] bytes = new byte[nettyBuffer.writerIndex()]; + nettyBuffer.readBytes(bytes); + + return new AMQPStandardMessage(messageFormat, bytes, null); + } + + } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPageReaderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPageReaderTest.java new file mode 100644 index 0000000000..2afb74518d --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPageReaderTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.amqp.paging; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.paging.cursor.impl.PageReaderTest; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage; +import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.qpid.proton.message.impl.MessageImpl; +import org.junit.Assert; +import org.junit.Test; + +public class AmqpPageReaderTest extends PageReaderTest { + + public MessageImpl createProtonMessage(String address) { + AmqpMessage message = new AmqpMessage(); + final StringBuilder builder = new StringBuilder(); + for (int i = 0; i < 1000; i++) { + builder.append('0'); + } + final String data = builder.toString(); + message.setText(data); + message.setAddress(address); + message.setDurable(true); + + MessageImpl protonMessage = (MessageImpl) message.getWrappedMessage(); + + return protonMessage; + } + + @Override + protected Message createMessage(SimpleString address, int msgId, byte[] content) { + MessageImpl protonMessage = createProtonMessage(address.toString()); + AMQPStandardMessage amqpStandardMessage = AmqpTestSupport.encodeAndDecodeMessage(0, protonMessage, 2 * 1024); + amqpStandardMessage.setMessageID(msgId); + + return amqpStandardMessage; + } + + + @Test + public void testEncodeSize() throws Exception { + + Message message = createMessage(SimpleString.toSimpleString("Test"), 1, new byte[10]); + + MessagePersister persister = (MessagePersister)message.getPersister(); + + ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024); + persister.encode(buffer, message); + + Assert.assertEquals(persister.getEncodeSize(message), buffer.writerIndex()); + + // the very first byte is the persisterID, we skip that since we are calling the Persister directly + buffer.readerIndex(1); + Message messageRead = persister.decode(buffer, null, null); + + // The current persister does not guarantee the same encode size after loading + /// if this ever changes we can uncomment the next line. + // Assert.assertEquals(persister.getEncodeSize(message), persister.getEncodeSize(messageRead)); + + + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java new file mode 100644 index 0000000000..2ed9845936 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.paging; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Assert; +import org.junit.Test; + +public class AmqpPagingTest extends AmqpClientTestSupport { + + @Override + protected void addConfiguration(ActiveMQServer server) { + super.addConfiguration(server); + final Map addressesSettings = server.getConfiguration().getAddressesSettings(); + addressesSettings.get("#").setMaxSizeBytes(100000).setPageSizeBytes(10000); + } + + @Test(timeout = 60000) + public void testPaging() throws Exception { + final int MSG_SIZE = 1000; + final StringBuilder builder = new StringBuilder(); + for (int i = 0; i < MSG_SIZE; i++) { + builder.append('0'); + } + final String data = builder.toString(); + final int MSG_COUNT = 1_000; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getQueueName(), true); + + AmqpReceiver receiver = session.createReceiver(getQueueName()); + receiver.setPresettle(true); + receiver.flow(10); + Assert.assertNull("somehow the queue had messages from a previous test", receiver.receiveNoWait()); + receiver.flow(0); + for (int i = 0; i < MSG_COUNT; i++) { + AmqpMessage message = new AmqpMessage(); + message.setText(data); + sender.send(message); + } + sender.close(); + receiver.flow(MSG_COUNT); + for (int i = 0; i < MSG_COUNT; i++) { + AmqpMessage receive = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull("Not received anything after " + i + " receive", receive); + receive.accept(); + } + receiver.close(); + connection.close(); + } + +}