This closes #510

This commit is contained in:
Clebert Suconic 2016-05-06 11:48:52 -04:00
commit 7846521603
5 changed files with 20 additions and 12 deletions

View File

@ -46,8 +46,10 @@ public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager {
logger.debug("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteJmsQueues = " + isAutoDeleteJmsQueues);
}
if (server.getJMSQueueDeleter() != null) {
server.getJMSQueueDeleter().delete(queueName);
}
}
else if (logger.isDebugEnabled()) {
logger.debug("NOT deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteJmsQueues = " + isAutoDeleteJmsQueues);
}

View File

@ -120,7 +120,7 @@ public class RaceOnSyncLargeMessageOverReplication2Test extends ActiveMQTestBase
connection = (ActiveMQConnection) factory.createConnection();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
queue = session.createQueue("jms.queue.Queue");
queue = session.createQueue("Queue");
producer = session.createProducer(queue);
}

View File

@ -65,10 +65,10 @@ public class AutoCreateJmsQueueTest extends JMSTestBase {
Assert.assertNotNull(m);
}
connection.close();
// make sure the JMX control was created for the JMS queue
assertNotNull(server.getManagementService().getResource("jms.queue.test"));
connection.close();
}
@Test

View File

@ -75,6 +75,9 @@ public class AutoDeleteJmsQueueTest extends JMSTestBase {
// ensure the queue was removed
Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("jms.queue.test")));
// make sure the JMX control was removed for the JMS queue
assertNull(server.getManagementService().getResource("jms.queue.test"));
}
@Test

View File

@ -76,6 +76,8 @@ public class ScaleDownTest extends ClusterTestBase {
setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
haPolicyConfiguration0.getScaleDownConfiguration().getConnectors().addAll(servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
haPolicyConfiguration1.getScaleDownConfiguration().getConnectors().addAll(servers[1].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
servers[0].getConfiguration().getAddressesSettings().put("#", new AddressSettings().setRedistributionDelay(0));
servers[1].getConfiguration().getAddressesSettings().put("#", new AddressSettings().setRedistributionDelay(0));
startServers(0, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@ -156,10 +158,6 @@ public class ScaleDownTest extends ClusterTestBase {
createQueue(0, addressName2, queueName2, null, false);
createQueue(1, addressName2, queueName2, null, false);
// add consumers to node 1 to force any messages we send into the sf queue
addConsumer(0, 1, queueName1, null);
addConsumer(1, 1, queueName2, null);
// find and pause the sf queue so no messages actually move from node 0 to node 1
String sfQueueName = null;
for (Map.Entry<SimpleString, Binding> entry : servers[0].getPostOffice().getAllBindings().entrySet()) {
@ -175,17 +173,22 @@ public class ScaleDownTest extends ClusterTestBase {
assertNotNull(sfQueueName);
// send messages to node 0 that will get stuck in the paused sf queue going to node 1
// send messages to node 0
send(0, addressName1, TEST_SIZE, false, null);
send(0, addressName2, TEST_SIZE, false, null);
removeConsumer(0);
removeConsumer(1);
// at this point on node 0 there should be 0 messages in testQueue and TEST_SIZE messages in the sfQueue
// add consumers to node 1 to force messages messages to redistribute to node 2 through the paused sf queue
addConsumer(0, 1, queueName1, null);
addConsumer(1, 1, queueName2, null);
// at this point on node 0 there should be 0 messages in test queues and TEST_SIZE * 2 messages in the sf queue
Assert.assertEquals(0, getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()));
Assert.assertEquals(0, getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName2))).getQueue()));
Assert.assertEquals(TEST_SIZE * 2, getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(sfQueueName))).getQueue()));
removeConsumer(0);
removeConsumer(1);
// trigger scaleDown from node 0 to node 1
servers[0].stop();