This commit is contained in:
Clebert Suconic 2020-04-01 17:46:26 -04:00
commit 0624b08b77
4 changed files with 197 additions and 51 deletions

View File

@ -506,9 +506,10 @@ public final class Page implements Comparable<Page> {
List<Long> largeMessageIds = new ArrayList<>();
if (messages != null) {
for (PagedMessage msg : messages) {
// this will trigger large message delete
msg.getMessage().usageDown();
if ((msg.getMessage()).isLargeMessage()) {
// this will trigger large message delete: no need to do it
// for non-large messages!
msg.getMessage().usageDown();
largeMessageIds.add(msg.getMessage().getMessageID());
}
}

View File

@ -49,7 +49,7 @@ public class PagedMessageImpl implements PagedMessage {
private final int storedSize;
private volatile StorageManager storageManager;
private final StorageManager storageManager;
public PagedMessageImpl(final Message message, final long[] queueIDs, final long transactionID) {
this(message, queueIDs);
@ -57,6 +57,7 @@ public class PagedMessageImpl implements PagedMessage {
}
public PagedMessageImpl(final Message message, final long[] queueIDs) {
this.storageManager = null;
this.queueIDs = queueIDs;
this.message = message;
this.storedSize = 0;
@ -135,8 +136,12 @@ public class PagedMessageImpl implements PagedMessage {
}
} else {
this.message = MessagePersister.getInstance().decode(buffer, null, null, storageManager);
if (message.isLargeMessage()) {
message.usageUp();
}
}
int queueIDsSize = buffer.readInt();
queueIDs = new long[queueIDsSize];

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.paging;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
import org.apache.activemq.artemis.tests.unit.core.paging.impl.PageTest;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.qpid.proton.message.impl.MessageImpl;
public class AmqpPageTest extends PageTest {
private static MessageImpl createProtonMessage(String address, byte[] content) {
AmqpMessage message = new AmqpMessage();
message.setBytes(content);
message.setAddress(address);
message.setDurable(true);
MessageImpl protonMessage = (MessageImpl) message.getWrappedMessage();
return protonMessage;
}
private static AMQPStandardMessage createStandardMessage(SimpleString address, long msgId, byte[] content) {
MessageImpl protonMessage = createProtonMessage(address.toString(), content);
AMQPStandardMessage amqpMessage = AmqpTestSupport.encodeAndDecodeMessage(0, protonMessage, content.length + 1000);
amqpMessage.setMessageID(msgId);
return amqpMessage;
}
private static AMQPLargeMessage createLargeMessage(StorageManager storageManager,
SimpleString address,
long msgId,
byte[] content) throws Exception {
final AMQPLargeMessage amqpMessage = new AMQPLargeMessage(msgId, 0, null, null, storageManager);
amqpMessage.setAddress(address);
amqpMessage.setFileDurable(true);
amqpMessage.addBytes(content);
return amqpMessage;
}
@Override
protected void writeMessage(StorageManager storageManager,
boolean isLargeMessage,
long msgID,
SimpleString address,
byte[] content,
Page page) throws Exception {
if (!isLargeMessage) {
final Message message = createStandardMessage(address, msgID, content);
page.write(new PagedMessageImpl(message, new long[0]));
} else {
final AMQPLargeMessage message = createLargeMessage(storageManager, address, msgID, content);
page.write(new PagedMessageImpl(message, new long[0]));
message.releaseResources(false);
}
}
}

View File

@ -17,7 +17,10 @@
package org.apache.activemq.artemis.tests.unit.core.paging.impl;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -29,11 +32,17 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersister;
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -55,6 +64,27 @@ public class PageTest extends ActiveMQTestBase {
MessagePersister.registerPersister(AMQPMessagePersister.getInstance());
}
@Test
public void testLargeMessagePageWithNIO() throws Exception {
recreateDirectory(getTestDir());
final ExecutorService executor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
final ExecutorService ioexecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
OrderedExecutorFactory factory = new OrderedExecutorFactory(executor);
OrderedExecutorFactory iofactory = new OrderedExecutorFactory(ioexecutor);
final JournalStorageManager storageManager = new JournalStorageManager(
createBasicConfig(), EmptyCriticalAnalyzer.getInstance(),
factory, iofactory);
storageManager.start();
storageManager.loadInternalOnly();
try {
testAdd(storageManager, new NIOSequentialFileFactory(getTestDirfile(), 1), 1000, true);
} finally {
storageManager.stop();
executor.shutdownNow();
ioexecutor.shutdownNow();
}
}
@Test
public void testPageWithNIO() throws Exception {
recreateDirectory(getTestDir());
@ -73,7 +103,7 @@ public class PageTest extends ActiveMQTestBase {
}
@Test
public void testAddCore() throws Exception {
public void testPageSingleMessageWithNIO() throws Exception {
testAdd(new NIOSequentialFileFactory(getTestDirfile(), 1), 1);
}
@ -89,39 +119,56 @@ public class PageTest extends ActiveMQTestBase {
* Validate if everything we add is recovered
*/
protected void testAdd(final SequentialFileFactory factory, final int numberOfElements) throws Exception {
testAdd(new NullStorageManager(), factory, numberOfElements, false);
}
protected void testAdd(final StorageManager storageManager,
final SequentialFileFactory factory,
final int numberOfElements,
final boolean largeMessages) throws Exception {
SequentialFile file = factory.createSequentialFile("00010.page");
Page impl = new Page(new SimpleString("something"), new NullStorageManager(), factory, file, 10);
Page page = new Page(new SimpleString("something"), storageManager, factory, file, 10);
Assert.assertEquals(10, impl.getPageId());
Assert.assertEquals(10, page.getPageId());
impl.open();
page.open();
Assert.assertEquals(1, factory.listFiles("page").size());
SimpleString simpleDestination = new SimpleString("Test");
addPageElements(simpleDestination, impl, numberOfElements);
final long startMessageID = 1;
impl.sync();
impl.close(false, false);
addPageElements(storageManager, simpleDestination, page, numberOfElements, largeMessages, startMessageID);
page.sync();
page.close(false, false);
file = factory.createSequentialFile("00010.page");
file.open();
impl = new Page(new SimpleString("something"), new NullStorageManager(), factory, file, 10);
page = new Page(new SimpleString("something"), storageManager, factory, file, 10);
List<PagedMessage> msgs = impl.read(new NullStorageManager());
List<PagedMessage> msgs = page.read(storageManager);
Assert.assertEquals(numberOfElements, msgs.size());
Assert.assertEquals(numberOfElements, impl.getNumberOfMessages());
Assert.assertEquals(numberOfElements, page.getNumberOfMessages());
for (int i = 0; i < msgs.size(); i++) {
Assert.assertEquals(simpleDestination, msgs.get(i).getMessage().getAddressSimpleString());
final PagedMessage pagedMessage = msgs.get(i);
Assert.assertEquals(simpleDestination, pagedMessage.getMessage().getAddressSimpleString());
Assert.assertEquals(largeMessages, pagedMessage.getMessage().isLargeMessage());
Assert.assertEquals(startMessageID + i, pagedMessage.getMessage().getMessageID());
Assert.assertEquals(largeMessages ? 1 : 0, pagedMessage.getMessage().getUsage());
}
impl.delete(null);
Assert.assertTrue(page.delete(msgs.toArray(new PagedMessage[msgs.size()])));
for (PagedMessage pagedMessage : msgs) {
Assert.assertEquals(0, pagedMessage.getMessage().getUsage());
}
Assert.assertEquals(0, factory.listFiles(".page").size());
@ -131,29 +178,29 @@ public class PageTest extends ActiveMQTestBase {
SequentialFile file = factory.createSequentialFile("00010.page");
Page impl = new Page(new SimpleString("something"), new NullStorageManager(), factory, file, 10);
Page page = new Page(new SimpleString("something"), new NullStorageManager(), factory, file, 10);
Assert.assertEquals(10, impl.getPageId());
Assert.assertEquals(10, page.getPageId());
impl.open();
page.open();
Assert.assertEquals(1, factory.listFiles("page").size());
SimpleString simpleDestination = new SimpleString("Test");
addPageElements(simpleDestination, impl, numberOfElements);
addPageElements(simpleDestination, page, numberOfElements, 1);
impl.sync();
page.sync();
long positionA = file.position();
// Add one record that will be damaged
addPageElements(simpleDestination, impl, 1);
addPageElements(simpleDestination, page, 1, numberOfElements + 1);
long positionB = file.position();
// Add more 10 as they will need to be ignored
addPageElements(simpleDestination, impl, 10);
addPageElements(simpleDestination, page, 10, numberOfElements + 2);
// Damage data... position the file on the middle between points A and B
file.position(positionA + (positionB - positionA) / 2);
@ -168,23 +215,23 @@ public class PageTest extends ActiveMQTestBase {
file.writeDirect(buffer, true);
impl.close(false);
page.close(false);
file = factory.createSequentialFile("00010.page");
file.open();
impl = new Page(new SimpleString("something"), new NullStorageManager(), factory, file, 10);
page = new Page(new SimpleString("something"), new NullStorageManager(), factory, file, 10);
List<PagedMessage> msgs = impl.read(new NullStorageManager());
List<PagedMessage> msgs = page.read(new NullStorageManager());
Assert.assertEquals(numberOfElements, msgs.size());
Assert.assertEquals(numberOfElements, impl.getNumberOfMessages());
Assert.assertEquals(numberOfElements, page.getNumberOfMessages());
for (int i = 0; i < msgs.size(); i++) {
Assert.assertEquals(simpleDestination, msgs.get(i).getMessage().getAddressSimpleString());
}
impl.delete(null);
page.delete(null);
Assert.assertEquals(0, factory.listFiles("page").size());
@ -192,6 +239,13 @@ public class PageTest extends ActiveMQTestBase {
}
protected void addPageElements(final SimpleString simpleDestination,
final Page page,
final int numberOfElements,
final long startMessageID) throws Exception {
addPageElements(new NullStorageManager(), simpleDestination, page, numberOfElements, false, startMessageID);
}
/**
* @param simpleDestination
* @param page
@ -199,36 +253,46 @@ public class PageTest extends ActiveMQTestBase {
* @return
* @throws Exception
*/
protected void addPageElements(final SimpleString simpleDestination,
final Page page,
final int numberOfElements) throws Exception {
protected void addPageElements(final StorageManager storageManager,
final SimpleString simpleDestination,
final Page page,
final int numberOfElements,
final boolean largeMessages,
final long startMessageID) throws Exception {
int initialNumberOfMessages = page.getNumberOfMessages();
final int msgSize = 10;
final byte[] content = new byte[msgSize];
Arrays.fill(content, (byte) 'b');
for (int i = 0; i < numberOfElements; i++) {
ICoreMessage msg = new CoreMessage().initBuffer(100);
for (int j = 0; j < 10; j++) {
msg.getBodyBuffer().writeByte((byte) 'b');
}
msg.setAddress(simpleDestination);
page.write(new PagedMessageImpl(msg, new long[0]));
writeMessage(storageManager, largeMessages, startMessageID + i, simpleDestination, content, page);
Assert.assertEquals(initialNumberOfMessages + i + 1, page.getNumberOfMessages());
}
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
protected void writeMessage(StorageManager storageManager,
boolean isLargeMessage,
long msgID,
SimpleString address,
byte[] content,
Page page) throws Exception {
if (isLargeMessage) {
LargeServerMessageImpl msg = new LargeServerMessageImpl(storageManager);
msg.setMessageID(msgID);
msg.addBytes(content);
msg.setAddress(address);
page.write(new PagedMessageImpl(msg, new long[0]));
msg.releaseResources(false);
} else {
ICoreMessage msg = new CoreMessage().initBuffer(100);
msg.setMessageID(msgID);
msg.getBodyBuffer().writeBytes(content);
msg.setAddress(address);
page.write(new PagedMessageImpl(msg, new long[0]));
}
}
}