ARTEMIS-3075 Skip temporary queues scale down

This commit is contained in:
Domenico Francesco Bruscino 2021-02-02 22:49:26 +01:00 committed by Clebert Suconic
parent 7a199d7f97
commit bcdb13365e
2 changed files with 46 additions and 3 deletions

View File

@ -124,9 +124,11 @@ public class ScaleDownHandler {
for (Binding binding : bindings.getBindings()) {
if (binding instanceof LocalQueueBinding) {
Queue queue = ((LocalQueueBinding) binding).getQueue();
// as part of scale down we will cancel any scheduled message and pass it to theWhile we scan for the queues we will also cancel any scheduled messages and deliver them right away
queue.deliverScheduledMessages();
queues.add(queue);
if (!queue.isTemporary()) {
// as part of scale down we will cancel any scheduled message and pass it to theWhile we scan for the queues we will also cancel any scheduled messages and deliver them right away
queue.deliverScheduledMessages();
queues.add(queue);
}
}
}
}

View File

@ -253,6 +253,47 @@ public class ScaleDownDirectTest extends ClusterTestBase {
removeConsumer(0);
}
@Test
public void testTemporaryQueues() throws Exception {
final String addressName1 = "testAddress1";
final String addressName2 = "testAddress2";
final String queueName1 = "testQueue1";
final String queueName2 = "testQueue2";
final String queueName3 = "testQueue3";
ClientSessionFactory sf = sfs[0];
ClientSession session = sf.createSession(true, true);
session.createQueue(new QueueConfiguration(queueName1).setAddress(addressName1).setDurable(false).setTemporary(true));
session.createQueue(new QueueConfiguration(queueName2).setAddress(addressName2));
session.createQueue(new QueueConfiguration(queueName3).setAddress(addressName2).setDurable(false).setTemporary(true));
ClientProducer producer1 = session.createProducer(addressName1);
producer1.send(session.createMessage(true));
ClientProducer producer2 = session.createProducer(addressName2);
producer2.send(session.createMessage(true));
Wait.assertEquals(1, () -> getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()));
Wait.assertEquals(1, () -> getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName2))).getQueue()));
Wait.assertEquals(1, () -> getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue()));
assertEquals(1, performScaledown());
sfs[0].close();
session.close();
// trigger scaleDown from node 0 to node 1
servers[0].stop();
Assert.assertNull(servers[1].getPostOffice().getBinding(new SimpleString(queueName1)));
Assert.assertEquals(1, getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName2))).getQueue()));
Assert.assertNull(servers[1].getPostOffice().getBinding(new SimpleString(queueName3)));
}
private void checkBody(ClientMessage message, int bufferSize) {
assertEquals(bufferSize, message.getBodySize());
byte[] body = new byte[message.getBodySize()];