ARTEMIS-2662 Using previously stored encodeSize on page record offset
There is no guarantee that the encodeSize size is the same in AMQP right after read. As the protocol may add additional bytes right after decoded such as header, extra properties.. etc.
This commit is contained in:
parent
327af99373
commit
c801c00e33
|
@ -48,4 +48,11 @@ public interface PagedMessage extends EncodingSupport {
|
|||
* @throws ActiveMQException
|
||||
*/
|
||||
long getPersistentSize() throws ActiveMQException;
|
||||
|
||||
/** This returns how much the PagedMessage used, or it's going to use
|
||||
* from storage.
|
||||
* We can't calculate the encodeSize as some persisters don't guarantee to re-store the data
|
||||
* at the same amount of bytes it used. In some cases it may need to add headers in AMQP
|
||||
* or extra data that may affect the outcome of getEncodeSize() */
|
||||
int getStoredSize();
|
||||
}
|
||||
|
|
|
@ -1357,7 +1357,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
|||
break;
|
||||
}
|
||||
|
||||
int nextFileOffset = message.getPosition().getFileOffset() == -1 ? -1 : message.getPosition().getFileOffset() + message.getPagedMessage().getEncodeSize() + Page.SIZE_RECORD;
|
||||
int nextFileOffset = message.getPosition().getFileOffset() == -1 ? -1 : message.getPosition().getFileOffset() + message.getPagedMessage().getStoredSize() + Page.SIZE_RECORD;
|
||||
tmpPosition = new PagePositionAndFileOffset(nextFileOffset, message.getPosition());
|
||||
|
||||
boolean valid = true;
|
||||
|
|
|
@ -198,7 +198,7 @@ public final class Page implements Comparable<Page> {
|
|||
final int endPosition = readFileBuffer.position() + encodedSize;
|
||||
//this check must be performed upfront decoding
|
||||
if (readFileBuffer.remaining() >= (encodedSize + 1) && readFileBuffer.get(endPosition) == Page.END_BYTE) {
|
||||
final PagedMessageImpl msg = new PagedMessageImpl(storageManager);
|
||||
final PagedMessageImpl msg = new PagedMessageImpl(encodedSize, storageManager);
|
||||
readFileBufferWrapper.setIndex(readFileBuffer.position(), endPosition);
|
||||
msg.decode(readFileBufferWrapper);
|
||||
readFileBuffer.position(endPosition + 1);
|
||||
|
@ -363,7 +363,7 @@ public final class Page implements Comparable<Page> {
|
|||
final int endPosition = fileBuffer.position() + encodedSize;
|
||||
//this check must be performed upfront decoding
|
||||
if (fileBuffer.remaining() >= (encodedSize + 1) && fileBuffer.get(endPosition) == Page.END_BYTE) {
|
||||
final PagedMessageImpl msg = new PagedMessageImpl(storageManager);
|
||||
final PagedMessageImpl msg = new PagedMessageImpl(encodedSize, storageManager);
|
||||
fileBufferWrapper.setIndex(fileBuffer.position(), endPosition);
|
||||
msg.decode(fileBufferWrapper);
|
||||
fileBuffer.position(endPosition + 1);
|
||||
|
|
|
@ -47,6 +47,8 @@ public class PagedMessageImpl implements PagedMessage {
|
|||
|
||||
private long transactionID = 0;
|
||||
|
||||
private final int storedSize;
|
||||
|
||||
private volatile StorageManager storageManager;
|
||||
|
||||
public PagedMessageImpl(final Message message, final long[] queueIDs, final long transactionID) {
|
||||
|
@ -57,10 +59,22 @@ public class PagedMessageImpl implements PagedMessage {
|
|||
public PagedMessageImpl(final Message message, final long[] queueIDs) {
|
||||
this.queueIDs = queueIDs;
|
||||
this.message = message;
|
||||
this.storedSize = 0;
|
||||
}
|
||||
|
||||
public PagedMessageImpl(StorageManager storageManager) {
|
||||
public PagedMessageImpl(int storedSize, StorageManager storageManager) {
|
||||
this.storageManager = storageManager;
|
||||
this.storedSize = storedSize;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int getStoredSize() {
|
||||
if (storedSize <= 0) {
|
||||
return getEncodeSize();
|
||||
} else {
|
||||
return storedSize;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -56,7 +56,7 @@ public class ReplicationPageWriteMessage extends PacketImpl {
|
|||
@Override
|
||||
public void decodeRest(final ActiveMQBuffer buffer) {
|
||||
pageNumber = buffer.readInt();
|
||||
pagedMessage = new PagedMessageImpl(null);
|
||||
pagedMessage = new PagedMessageImpl(0, null);
|
||||
pagedMessage.decode(buffer);
|
||||
}
|
||||
|
||||
|
|
|
@ -58,7 +58,7 @@ public class PageReaderTest extends ActiveMQTestBase {
|
|||
PagePosition pagePosition = new PagePositionImpl(10, i);
|
||||
pagedMessage = pageReader.getMessage(pagePosition);
|
||||
} else {
|
||||
int nextFileOffset = pagedMessage == null ? -1 : offsets[i - 1] + pagedMessage.getEncodeSize() + Page.SIZE_RECORD;
|
||||
int nextFileOffset = pagedMessage == null ? -1 : offsets[i - 1] + pagedMessage.getStoredSize() + Page.SIZE_RECORD;
|
||||
PagePositionAndFileOffset startPosition = new PagePositionAndFileOffset(nextFileOffset, new PagePositionImpl(10, i - 1));
|
||||
PagePosition pagePosition = startPosition.nextPagePostion();
|
||||
assertEquals("Message " + i + " has wrong offset", offsets[i], pagePosition.getFileOffset());
|
||||
|
@ -85,7 +85,7 @@ public class PageReaderTest extends ActiveMQTestBase {
|
|||
PagePosition pagePosition = new PagePositionImpl(10, 0);
|
||||
PagedMessage firstPagedMessage = pageReader.getMessage(pagePosition);
|
||||
assertEquals("Message 0 has a wrong encodeSize", pagedMessages[0].getEncodeSize(), firstPagedMessage.getEncodeSize());
|
||||
int nextFileOffset = offsets[0] + firstPagedMessage.getEncodeSize() + Page.SIZE_RECORD;
|
||||
int nextFileOffset = offsets[0] + firstPagedMessage.getStoredSize() + Page.SIZE_RECORD;
|
||||
PagePositionAndFileOffset startPosition = new PagePositionAndFileOffset(nextFileOffset, new PagePositionImpl(10, 0));
|
||||
PagePosition nextPagePosition = startPosition.nextPagePostion();
|
||||
assertEquals("Message 1 has a wrong offset", offsets[1], nextPagePosition.getFileOffset());
|
||||
|
@ -147,7 +147,10 @@ public class PageReaderTest extends ActiveMQTestBase {
|
|||
for (int i = 0; i < num; i++) {
|
||||
Message msg = createMessage(simpleDestination, i, content);
|
||||
offsets[i] = (int)page.getFile().position();
|
||||
page.write(new PagedMessageImpl(msg, new long[0]));
|
||||
PagedMessageImpl pgdMessage = new PagedMessageImpl(msg, new long[0]);
|
||||
long expectedPosition = pgdMessage.getEncodeSize() + Page.SIZE_RECORD + page.getFile().position();
|
||||
page.write(pgdMessage);
|
||||
Assert.assertEquals(page.getFile().position(), expectedPosition);
|
||||
|
||||
Assert.assertEquals(i + 1, page.getNumberOfMessages());
|
||||
}
|
||||
|
|
|
@ -19,9 +19,14 @@ package org.apache.activemq.artemis.tests.integration.amqp;
|
|||
import java.net.URI;
|
||||
import java.util.LinkedList;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
|
||||
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
import org.apache.qpid.proton.message.impl.MessageImpl;
|
||||
import org.junit.After;
|
||||
|
||||
/**
|
||||
|
@ -131,4 +136,16 @@ public class AmqpTestSupport extends ActiveMQTestBase {
|
|||
public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception {
|
||||
return new AmqpClient(brokerURI, username, password);
|
||||
}
|
||||
|
||||
public static AMQPStandardMessage encodeAndDecodeMessage(int messageFormat, MessageImpl message, int expectedSize) {
|
||||
ByteBuf nettyBuffer = Unpooled.buffer(expectedSize);
|
||||
|
||||
message.encode(new NettyWritable(nettyBuffer));
|
||||
byte[] bytes = new byte[nettyBuffer.writerIndex()];
|
||||
nettyBuffer.readBytes(bytes);
|
||||
|
||||
return new AMQPStandardMessage(messageFormat, bytes, null);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.integration.amqp.paging;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.impl.PageReaderTest;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
|
||||
import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||
import org.apache.qpid.proton.message.impl.MessageImpl;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class AmqpPageReaderTest extends PageReaderTest {
|
||||
|
||||
public MessageImpl createProtonMessage(String address) {
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
final StringBuilder builder = new StringBuilder();
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
builder.append('0');
|
||||
}
|
||||
final String data = builder.toString();
|
||||
message.setText(data);
|
||||
message.setAddress(address);
|
||||
message.setDurable(true);
|
||||
|
||||
MessageImpl protonMessage = (MessageImpl) message.getWrappedMessage();
|
||||
|
||||
return protonMessage;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Message createMessage(SimpleString address, int msgId, byte[] content) {
|
||||
MessageImpl protonMessage = createProtonMessage(address.toString());
|
||||
AMQPStandardMessage amqpStandardMessage = AmqpTestSupport.encodeAndDecodeMessage(0, protonMessage, 2 * 1024);
|
||||
amqpStandardMessage.setMessageID(msgId);
|
||||
|
||||
return amqpStandardMessage;
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testEncodeSize() throws Exception {
|
||||
|
||||
Message message = createMessage(SimpleString.toSimpleString("Test"), 1, new byte[10]);
|
||||
|
||||
MessagePersister persister = (MessagePersister)message.getPersister();
|
||||
|
||||
ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024);
|
||||
persister.encode(buffer, message);
|
||||
|
||||
Assert.assertEquals(persister.getEncodeSize(message), buffer.writerIndex());
|
||||
|
||||
// the very first byte is the persisterID, we skip that since we are calling the Persister directly
|
||||
buffer.readerIndex(1);
|
||||
Message messageRead = persister.decode(buffer, null, null);
|
||||
|
||||
// The current persister does not guarantee the same encode size after loading
|
||||
/// if this ever changes we can uncomment the next line.
|
||||
// Assert.assertEquals(persister.getEncodeSize(message), persister.getEncodeSize(messageRead));
|
||||
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.amqp.paging;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class AmqpPagingTest extends AmqpClientTestSupport {
|
||||
|
||||
@Override
|
||||
protected void addConfiguration(ActiveMQServer server) {
|
||||
super.addConfiguration(server);
|
||||
final Map<String, AddressSettings> addressesSettings = server.getConfiguration().getAddressesSettings();
|
||||
addressesSettings.get("#").setMaxSizeBytes(100000).setPageSizeBytes(10000);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testPaging() throws Exception {
|
||||
final int MSG_SIZE = 1000;
|
||||
final StringBuilder builder = new StringBuilder();
|
||||
for (int i = 0; i < MSG_SIZE; i++) {
|
||||
builder.append('0');
|
||||
}
|
||||
final String data = builder.toString();
|
||||
final int MSG_COUNT = 1_000;
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(getQueueName(), true);
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(getQueueName());
|
||||
receiver.setPresettle(true);
|
||||
receiver.flow(10);
|
||||
Assert.assertNull("somehow the queue had messages from a previous test", receiver.receiveNoWait());
|
||||
receiver.flow(0);
|
||||
for (int i = 0; i < MSG_COUNT; i++) {
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setText(data);
|
||||
sender.send(message);
|
||||
}
|
||||
sender.close();
|
||||
receiver.flow(MSG_COUNT);
|
||||
for (int i = 0; i < MSG_COUNT; i++) {
|
||||
AmqpMessage receive = receiver.receive(10, TimeUnit.SECONDS);
|
||||
assertNotNull("Not received anything after " + i + " receive", receive);
|
||||
receive.accept();
|
||||
}
|
||||
receiver.close();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue