This commit is contained in:
Clebert Suconic 2017-11-15 17:19:30 -05:00
commit 5cea228dbc
2 changed files with 135 additions and 0 deletions

View File

@ -2118,6 +2118,20 @@ public abstract class ActiveMQTestBase extends Assert {
return message;
}
protected ClientMessage createTextMessage(final ClientSession session, final boolean durable, final int numChars) {
ClientMessage message = session.createMessage(Message.TEXT_TYPE,
durable,
0,
System.currentTimeMillis(),
(byte)4);
StringBuilder builder = new StringBuilder();
for (int i = 0; i < numChars; i++) {
builder.append('a');
}
message.getBodyBuffer().writeString(builder.toString());
return message;
}
protected XidImpl newXID() {
return new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
}

View File

@ -16,11 +16,22 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.junit.Before;
import org.junit.Test;
import java.util.Map;
public class TwoWayTwoNodeClusterTest extends ClusterTestBase {
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
@ -48,6 +59,116 @@ public class TwoWayTwoNodeClusterTest extends ClusterTestBase {
return false;
}
/*
* This test starts 2 servers and send messages to
* a queue until it enters into paging state. Then
* it changes the max-size to -1, restarts the 2 servers
* and consumes all the messages. If verifies that
* even if the max-size has changed all the paged
* messages will be depaged and consumed. No stuck
* messages after restarting.
*/
@Test(timeout = 60000)
public void testClusterRestartWithConfigChanged() throws Exception {
Configuration config0 = servers[0].getConfiguration();
Configuration config1 = servers[1].getConfiguration();
configureBeforeStart(config0, config1);
startServers(0, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, "queues", "queue0", null, true);
createQueue(1, "queues", "queue0", null, true);
waitForBindings(0, "queues", 1, 0, true);
waitForBindings(1, "queues", 1, 0, true);
waitForBindings(0, "queues", 1, 0, false);
waitForBindings(1, "queues", 1, 0, false);
ClientSessionFactory sf0 = sfs[0];
ClientSession session0 = sf0.createSession(false, false);
ClientProducer producer = session0.createProducer("queues");
final int numSent = 200;
for (int i = 0; i < numSent; i++) {
ClientMessage msg = createTextMessage(session0, true, 5000);
producer.send(msg);
if (i % 50 == 0) {
session0.commit();
}
}
session0.commit();
session0.close();
while (true) {
long msgCount0 = getMessageCount(servers[0], "queues");
long msgCount1 = getMessageCount(servers[1], "queues");
if (msgCount0 + msgCount1 >= numSent) {
break;
}
Thread.sleep(100);
}
Queue queue0 = servers[0].locateQueue(new SimpleString("queue0"));
assertTrue(queue0.getPageSubscription().isPaging());
closeAllSessionFactories();
stopServers(0, 1);
AddressSettings addressSettings0 = config0.getAddressesSettings().get("#");
AddressSettings addressSettings1 = config1.getAddressesSettings().get("#");
addressSettings0.setMaxSizeBytes(-1);
addressSettings1.setMaxSizeBytes(-1);
startServers(0, 1);
waitForBindings(0, "queues", 1, 0, true);
waitForBindings(1, "queues", 1, 0, true);
waitForBindings(0, "queues", 1, 0, false);
waitForBindings(1, "queues", 1, 0, false);
setupSessionFactory(0, isNetty());
addConsumer(0, 0, "queue0", null);
waitForBindings(0, "queues", 1, 1, true);
for (int i = 0; i < numSent; i++) {
ClientMessage m = consumers[0].consumer.receive(5000);
assertNotNull("failed to receive message " + i, m);
}
}
private void configureBeforeStart(Configuration... serverConfigs) {
for (Configuration config : serverConfigs) {
config.setPersistenceEnabled(true);
config.setMessageCounterEnabled(true);
config.setJournalFileSize(20971520);
config.setJournalMinFiles(20);
config.setJournalCompactPercentage(50);
Map<String, AddressSettings> addressSettingsMap0 = config.getAddressesSettings();
AddressSettings addrSettings = addressSettingsMap0.get("#");
if (addrSettings == null) {
addrSettings = new AddressSettings();
addressSettingsMap0.put("#", addrSettings);
}
addrSettings.setDeadLetterAddress(new SimpleString("jms.queue.DLQ"));
addrSettings.setExpiryAddress(new SimpleString("jms.queue.ExpiryQueue"));
addrSettings.setRedeliveryDelay(30);
addrSettings.setMaxDeliveryAttempts(5);
addrSettings.setMaxSizeBytes(1048576);
addrSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addrSettings.setPageSizeBytes(524288);
addrSettings.setMessageCounterHistoryDayLimit(10);
addrSettings.setRedistributionDelay(1000);
}
}
@Test
public void testStartStop() throws Exception {