This commit is contained in:
Clebert Suconic 2021-02-03 10:20:42 -05:00
commit 8d1fe2baa0
2 changed files with 46 additions and 3 deletions

View File

@ -124,9 +124,11 @@ public class ScaleDownHandler {
for (Binding binding : bindings.getBindings()) {
if (binding instanceof LocalQueueBinding) {
Queue queue = ((LocalQueueBinding) binding).getQueue();
// as part of scale down we will cancel any scheduled message and pass it to theWhile we scan for the queues we will also cancel any scheduled messages and deliver them right away
queue.deliverScheduledMessages();
queues.add(queue);
if (!queue.isTemporary()) {
// as part of scale down we will cancel any scheduled message and pass it to theWhile we scan for the queues we will also cancel any scheduled messages and deliver them right away
queue.deliverScheduledMessages();
queues.add(queue);
}
}
}
}

View File

@ -253,6 +253,47 @@ public class ScaleDownDirectTest extends ClusterTestBase {
removeConsumer(0);
}
@Test
public void testTemporaryQueues() throws Exception {
final String addressName1 = "testAddress1";
final String addressName2 = "testAddress2";
final String queueName1 = "testQueue1";
final String queueName2 = "testQueue2";
final String queueName3 = "testQueue3";
ClientSessionFactory sf = sfs[0];
ClientSession session = sf.createSession(true, true);
session.createQueue(new QueueConfiguration(queueName1).setAddress(addressName1).setDurable(false).setTemporary(true));
session.createQueue(new QueueConfiguration(queueName2).setAddress(addressName2));
session.createQueue(new QueueConfiguration(queueName3).setAddress(addressName2).setDurable(false).setTemporary(true));
ClientProducer producer1 = session.createProducer(addressName1);
producer1.send(session.createMessage(true));
ClientProducer producer2 = session.createProducer(addressName2);
producer2.send(session.createMessage(true));
Wait.assertEquals(1, () -> getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()));
Wait.assertEquals(1, () -> getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName2))).getQueue()));
Wait.assertEquals(1, () -> getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue()));
assertEquals(1, performScaledown());
sfs[0].close();
session.close();
// trigger scaleDown from node 0 to node 1
servers[0].stop();
Assert.assertNull(servers[1].getPostOffice().getBinding(new SimpleString(queueName1)));
Assert.assertEquals(1, getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName2))).getQueue()));
Assert.assertNull(servers[1].getPostOffice().getBinding(new SimpleString(queueName3)));
}
private void checkBody(ClientMessage message, int bufferSize) {
assertEquals(bufferSize, message.getBodySize());
byte[] body = new byte[message.getBodySize()];