/tests/config/log4j2-tests-config.properties
- *
- * Note: Idea should get these from the pom and you shouldn't need to do this.
+ *
+ * -Dlog4j2.configurationFile=file:/tests/config/log4j2-tests-config.properties
+ *
+ * Note: Idea should get these from the pom and you shouldn't need to do this.
*/
@Test
public void testFailMessagesNonDurable() throws Exception {
- clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig();
HashMap settings = new HashMap<>();
- AddressSettings set = new AddressSettings();
- set.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+ AddressSettings set = new AddressSettings().setMaxReadPageBytes(-1);
+ set.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL).setMaxSizeMessages(10);
settings.put(PagingTest.ADDRESS.toString(), set);
@@ -6419,15 +5130,15 @@ public class PagingTest extends ActiveMQTestBase {
HashMap settings = new HashMap<>();
- AddressSettings set = new AddressSettings();
- set.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
-
- settings.put(PagingTest.ADDRESS.toString(), set);
-
server = createServer(true, config, 1024, 5 * 1024, settings);
server.start();
+ server.getAddressSettingsRepository().clear();
+ AddressSettings set = new AddressSettings().setMaxReadPageBytes(-1);
+ set.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL).setMaxSizeMessages(11);
+ server.getAddressSettingsRepository().addMatch(PagingTest.ADDRESS.toString(), set);
+
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
sf = createSessionFactory(locator);
@@ -6452,7 +5163,7 @@ public class PagingTest extends ActiveMQTestBase {
// The address will actually fill up after 3 messages. Also, it takes 32 messages for the client's
// credits to run out.
for (int i = 0; i < 50; i++) {
- if (i > 2) {
+ if (i > 10) {
validateExceptionOnSending(producer, message);
} else {
producer.send(message);
@@ -6485,14 +5196,13 @@ public class PagingTest extends ActiveMQTestBase {
@Test
public void testFailMessagesDuplicates() throws Exception {
- clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig();
HashMap settings = new HashMap<>();
- AddressSettings set = new AddressSettings();
- set.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+ AddressSettings set = new AddressSettings().setMaxReadPageBytes(-1);
+ set.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL).setMaxSizeMessages(3);
settings.put(PagingTest.ADDRESS.toString(), set);
@@ -6757,10 +5467,6 @@ public class PagingTest extends ActiveMQTestBase {
ClientConsumer consumerQ2 = session.createConsumer("Q2");
session.start();
- // consuming now
-
- // initial burst
-
for (int i = 0; i < initialBurst; i++) {
ClientMessage m = consumerQ1.receive(5000);
assertNotNull(m);
@@ -6827,7 +5533,6 @@ public class PagingTest extends ActiveMQTestBase {
// and expect it to move to the next page
@Test
public void testPageHole() throws Throwable {
- clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
@@ -6919,98 +5624,7 @@ public class PagingTest extends ActiveMQTestBase {
internalTestMultiFilters(false);
}
- @Test
- public void testPageEmptyFile() throws Exception {
- boolean persistentMessages = true;
-
- clearDataRecreateServerDirs();
-
- Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
-
- server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
-
- server.start();
-
- final int messageSize = 1024;
-
- final int numberOfMessages = 100;
-
- try {
- ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(120000).setConnectionTTL(5000000).setCallTimeout(120000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
-
- ClientSessionFactory sf = locator.createSessionFactory();
-
- ClientSession session = sf.createSession(false, false, false);
-
- session.createQueue(new QueueConfiguration(PagingTest.ADDRESS));
-
- PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
- store.forceAnotherPage();
- store.forceAnotherPage();
-
- ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
-
- ClientMessage message = null;
-
- byte[] body = new byte[messageSize];
-
- for (int i = 0; i < numberOfMessages; i++) {
- message = session.createMessage(persistentMessages);
-
- ActiveMQBuffer bodyLocal = message.getBodyBuffer();
-
- bodyLocal.writeBytes(body);
-
- producer.send(message);
- }
-
- session.commit();
-
- Queue queue = server.locateQueue(PagingTest.ADDRESS);
- Wait.assertEquals(numberOfMessages, queue::getMessageCount);
-
- store.forceAnotherPage();
-
- session.start();
-
- ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
-
- for (int i = 0; i < numberOfMessages; i++) {
- message = consumer.receive(5000);
- assertNotNull(message);
- message.acknowledge();
- }
-
- session.commit();
-
- assertNull(consumer.receiveImmediate());
-
- consumer.close();
-
- PageCursorProviderAccessor.cleanup(store.getCursorProvider());
-
- Wait.assertEquals(0, queue::getMessageCount);
-
- PageCursorProviderAccessor.cleanup(store.getCursorProvider());
-
- Wait.assertFalse(store::isPaging, 5000, 100);
-
- sf.close();
-
- locator.close();
-
- Wait.assertEquals(1L, store::getNumberOfPages, 5000, 100);
-
- } finally {
- try {
- server.stop();
- } catch (Throwable ignored) {
- }
- }
- }
-
public void internalTestMultiFilters(boolean browsing) throws Throwable {
- clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
@@ -7091,7 +5705,6 @@ public class PagingTest extends ActiveMQTestBase {
@Test
public void testPendingACKOutOfOrder() throws Throwable {
- clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
@@ -7117,11 +5730,12 @@ public class PagingTest extends ActiveMQTestBase {
ClientMessage msg = session.createMessage(true);
msg.putIntProperty("count", i);
prod.send(msg);
- session.commit();
if ((i + 1) % 5 == 0 && i < 50) {
+ session.commit();
store.forceAnotherPage();
}
}
+ session.commit();
session.start();
@@ -7173,7 +5787,6 @@ public class PagingTest extends ActiveMQTestBase {
// Test a scenario where a page was complete and now needs to be cleared
@Test
public void testPageCompleteWasLive() throws Throwable {
- clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
@@ -7282,7 +5895,6 @@ public class PagingTest extends ActiveMQTestBase {
// Test a scenario where a page was complete and now needs to be cleared
@Test
public void testMoveMessages() throws Throwable {
- clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
@@ -7290,13 +5902,13 @@ public class PagingTest extends ActiveMQTestBase {
server.start();
- final int LARGE_MESSAGE_SIZE = 1024 * 1024;
+ final int LARGE_MESSAGE_SIZE = database == Database.JOURNAL ? 1024 * 1024 : 1024;
try {
ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnDurableSend(false);
ClientSessionFactory sf = locator.createSessionFactory();
- ClientSession session = sf.createSession(true, true, 0);
+ ClientSession session = sf.createSession(false, false, 0);
session.createQueue(new QueueConfiguration("Q1"));
session.createQueue(new QueueConfiguration("Q2"));
@@ -7364,6 +5976,7 @@ public class PagingTest extends ActiveMQTestBase {
msg.acknowledge();
assertEquals(i, msg.getIntProperty("count").intValue());
}
+ session.commit();
assertNull(cons.receiveImmediate());
@@ -7381,7 +5994,6 @@ public class PagingTest extends ActiveMQTestBase {
@Test
public void testOnlyOnePageOnServerCrash() throws Throwable {
- clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig();
@@ -7411,7 +6023,7 @@ public class PagingTest extends ActiveMQTestBase {
}
}
- if (storeType == StoreConfiguration.StoreType.DATABASE) {
+ if (database != Database.JOURNAL) {
server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
@Override
protected PagingStoreFactoryDatabase getPagingStoreFactory() throws Exception {
@@ -7439,7 +6051,7 @@ public class PagingTest extends ActiveMQTestBase {
addServer(server);
- AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(PagingTest.PAGE_SIZE).setMaxSizeBytes(PagingTest.PAGE_SIZE + MESSAGE_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(PagingTest.PAGE_SIZE).setMaxSizeBytes(PagingTest.PAGE_SIZE + MESSAGE_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxReadPageBytes(-1);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
@@ -7519,7 +6131,6 @@ public class PagingTest extends ActiveMQTestBase {
@Test
public void testPagingStoreDestroyed() throws Exception {
- clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
@@ -7599,8 +6210,6 @@ public class PagingTest extends ActiveMQTestBase {
private void testStopPagingWithoutConsumersOnOneQueue(boolean forceAnotherPage) throws Exception {
boolean persistentMessages = true;
- clearDataRecreateServerDirs();
-
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
@@ -7670,11 +6279,10 @@ public class PagingTest extends ActiveMQTestBase {
@Test
public void testStopPagingWithoutMsgsOnOneQueue() throws Exception {
- clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
- server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);
+ server = createServer(true, config, PagingTest.PAGE_SIZE, 1, -1, -1);
server.start();
@@ -7717,20 +6325,14 @@ public class PagingTest extends ActiveMQTestBase {
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
bodyLocal.writeBytes(body);
producer.send(message);
- if (i % 1000 == 0) {
- session.commit();
- }
+ session.commit();
}
- session.commit();
assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS));
assertTrue(server.getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging());
for (int i = 0; i < numberOfMessages; i++) {
- ClientMessage msg = consumer1.receive(1000);
+ ClientMessage msg = consumer1.receive(5000);
assertNotNull(msg);
msg.acknowledge();
- if (i % 500 == 0) {
- session.commit();
- }
}
session.commit();
assertNull(consumer1.receiveImmediate());
@@ -7752,7 +6354,6 @@ public class PagingTest extends ActiveMQTestBase {
// We send messages to page, evict live page cache, send last message when mid consumed, and expect to receive all messages
@Test
public void testLivePageCacheEvicted() throws Throwable {
- clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
@@ -7781,7 +6382,7 @@ public class PagingTest extends ActiveMQTestBase {
ClientMessage msgReceivedCons = null;
// simulate the live page cache evicted
for (int i = 0; i < num; i++) {
- msgReceivedCons = cons.receive(1000);
+ msgReceivedCons = cons.receive(5000);
assertNotNull(msgReceivedCons);
assertTrue(msgReceivedCons.getIntProperty("index") == i);
msgReceivedCons.acknowledge();
@@ -7795,7 +6396,7 @@ public class PagingTest extends ActiveMQTestBase {
}
}
- msgReceivedCons = cons.receive(1000);
+ msgReceivedCons = cons.receive(5000);
assertNotNull(msgReceivedCons);
assertTrue(msgReceivedCons.getIntProperty("index") == num);
msgReceivedCons.acknowledge();
@@ -7822,7 +6423,6 @@ public class PagingTest extends ActiveMQTestBase {
}
private void testRollbackPageTransaction(boolean rollbackBeforeDelivery) throws Exception {
- clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig();
@@ -7871,7 +6471,6 @@ public class PagingTest extends ActiveMQTestBase {
session.close();
}
-
@Override
protected void applySettings(ActiveMQServer server,
final Configuration configuration,
@@ -7883,13 +6482,72 @@ public class PagingTest extends ActiveMQTestBase {
server.getConfiguration().setAddressQueueScanPeriod(100);
}
- @Override
- protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
- Configuration configuration = super.createDefaultConfig(serverID, netty);
- if (storeType == StoreConfiguration.StoreType.DATABASE) {
- setDBStoreType(configuration);
+ @Test
+ public void testSimpleNoTXSend() throws Exception {
+ Configuration config = createDefaultNettyConfig();
+
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, -1, -1);
+ server.start();
+
+ internalNoTX("CORE");
+ internalNoTX("OPENWIRE");
+ internalNoTX("AMQP");
+ }
+
+ private void internalNoTX(String protocol) throws Exception {
+ int numberOfMessages = 20;
+
+ String queueName = "TEST" + RandomUtil.randomString();
+
+ try {
+ server.addAddressInfo(new AddressInfo(queueName).addRoutingType(RoutingType.ANYCAST));
+ server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
+ } catch (Exception ignored) {
+ }
+
+ Wait.waitFor(() -> server.locateQueue(queueName) != null);
+ Queue testQueue = server.locateQueue(queueName);
+
+ testQueue.getPagingStore().startPaging();
+ Assert.assertTrue(testQueue.getPagingStore().isPaging());
+
+ ConnectionFactory connectionFactory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+ try (Connection connection = connectionFactory.createConnection()) {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(session.createQueue(queueName));
+ for (int i = 0; i < numberOfMessages; i++) {
+ logger.debug("Sent {}", i);
+ producer.send(session.createTextMessage("Hello" + i));
+ }
+ }
+
+ server.stop();
+ server = createServer(createDefaultConfig(0, true));
+ server.start();
+
+ Queue queue = server.locateQueue(queueName);
+ Wait.assertEquals(numberOfMessages, queue::getMessageCount);
+
+ receiveMessages(connectionFactory, queueName, numberOfMessages);
+ }
+
+ private void receiveMessages(ConnectionFactory connectionFactory,
+ String queueName,
+ int numberOfMessages) throws JMSException {
+ try (Connection connection = connectionFactory.createConnection()) {
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(session.createQueue(queueName));
+ for (int i = 0; i < numberOfMessages; i++) {
+ if (i % 100 == 0) {
+ logger.debug("Received {}", i);
+ }
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello" + i, message.getText());
+ }
+ Assert.assertNull(consumer.receiveNoWait());
}
- return configuration;
}
private static final class DummyOperationContext implements OperationContext {
@@ -7954,4 +6612,5 @@ public class PagingTest extends ActiveMQTestBase {
runnable.done();
}
}
+
}
diff --git a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PrintDataTest.java b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PrintDataTest.java
new file mode 100644
index 0000000000..2adcca2b26
--- /dev/null
+++ b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PrintDataTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.db.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.cli.commands.tools.PrintData;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.tests.db.common.Database;
+import org.apache.activemq.artemis.tests.db.common.ParameterDBTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class PrintDataTest extends ParameterDBTestBase {
+
+ ActiveMQServer server;
+
+ @Parameterized.Parameters(name = "db={0}")
+ public static Collection