This commit is contained in:
Clebert Suconic 2020-03-31 15:55:04 -04:00
commit fb4b8ffc16
1 changed files with 25 additions and 6 deletions

View File

@ -22,15 +22,19 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -39,22 +43,31 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class AmqpPagingTest extends AmqpClientTestSupport {
@Parameterized.Parameters(name = "durability={0}")
@Parameterized.Parameters(name = "durability={0}, readWholePage={1}")
public static Collection getParams() {
return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}, {null}});
return Arrays.asList(new Object[][]{
{Boolean.TRUE, true}, {Boolean.TRUE, false},
{Boolean.FALSE, true}, {Boolean.FALSE, false},
{null, true}, {null, false}});
}
private final Boolean durable;
private final boolean readWholePage;
public AmqpPagingTest(Boolean durable) {
public AmqpPagingTest(Boolean durable, boolean readWholePage) {
this.durable = durable;
this.readWholePage = readWholePage;
}
@Override
protected void addConfiguration(ActiveMQServer server) {
super.addConfiguration(server);
final Map<String, AddressSettings> addressesSettings = server.getConfiguration().getAddressesSettings();
addressesSettings.get("#").setMaxSizeBytes(100000).setPageSizeBytes(10000);
final Map<String, AddressSettings> addressesSettings = server.getConfiguration()
.setReadWholePage(readWholePage)
.getAddressesSettings();
addressesSettings.get("#")
.setMaxSizeBytes(100000)
.setPageSizeBytes(10000);
}
@Test(timeout = 60000)
@ -86,8 +99,14 @@ public class AmqpPagingTest extends AmqpClientTestSupport {
}
sender.send(message);
}
Assert.assertTrue(server.getPagingManager().getPageStore(SimpleString.toSimpleString(getQueueName())).isPaging());
sender.close();
final Queue queueView = getProxyToQueue(getQueueName());
Wait.assertEquals(MSG_COUNT, queueView::getMessageCount);
PagingStore pagingStore = server.getPagingManager().getPageStore(SimpleString.toSimpleString(getQueueName()));
Assert.assertTrue(pagingStore.isPaging());
final int pageCacheMaxSize = server.getConfiguration().getAddressesSettings().get("#").getPageCacheMaxSize();
Assert.assertThat("the size of the messages or the number of messages isn't enough",
pagingStore.getNumberOfPages(), Matchers.greaterThan(pageCacheMaxSize));
receiver.flow(MSG_COUNT);
for (int i = 0; i < MSG_COUNT; i++) {
AmqpMessage receive = receiver.receive(10, TimeUnit.SECONDS);