From bcdb13365e06f4b165b2246172cc5700ce7c34e7 Mon Sep 17 00:00:00 2001 From: Domenico Francesco Bruscino Date: Tue, 2 Feb 2021 22:49:26 +0100 Subject: [PATCH] ARTEMIS-3075 Skip temporary queues scale down --- .../core/server/impl/ScaleDownHandler.java | 8 ++-- .../server/ScaleDownDirectTest.java | 41 +++++++++++++++++++ 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java index 25037c5d1e..0e75305bb5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java @@ -124,9 +124,11 @@ public class ScaleDownHandler { for (Binding binding : bindings.getBindings()) { if (binding instanceof LocalQueueBinding) { Queue queue = ((LocalQueueBinding) binding).getQueue(); - // as part of scale down we will cancel any scheduled message and pass it to theWhile we scan for the queues we will also cancel any scheduled messages and deliver them right away - queue.deliverScheduledMessages(); - queues.add(queue); + if (!queue.isTemporary()) { + // as part of scale down we will cancel any scheduled message and pass it to theWhile we scan for the queues we will also cancel any scheduled messages and deliver them right away + queue.deliverScheduledMessages(); + queues.add(queue); + } } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java index 473a8036d4..b9f2685830 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java @@ -253,6 +253,47 @@ public class ScaleDownDirectTest extends ClusterTestBase { removeConsumer(0); } + @Test + public void testTemporaryQueues() throws Exception { + final String addressName1 = "testAddress1"; + final String addressName2 = "testAddress2"; + final String queueName1 = "testQueue1"; + final String queueName2 = "testQueue2"; + final String queueName3 = "testQueue3"; + + ClientSessionFactory sf = sfs[0]; + + ClientSession session = sf.createSession(true, true); + + session.createQueue(new QueueConfiguration(queueName1).setAddress(addressName1).setDurable(false).setTemporary(true)); + + session.createQueue(new QueueConfiguration(queueName2).setAddress(addressName2)); + session.createQueue(new QueueConfiguration(queueName3).setAddress(addressName2).setDurable(false).setTemporary(true)); + + ClientProducer producer1 = session.createProducer(addressName1); + producer1.send(session.createMessage(true)); + + ClientProducer producer2 = session.createProducer(addressName2); + producer2.send(session.createMessage(true)); + + Wait.assertEquals(1, () -> getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue())); + Wait.assertEquals(1, () -> getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName2))).getQueue())); + Wait.assertEquals(1, () -> getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue())); + + assertEquals(1, performScaledown()); + + sfs[0].close(); + + session.close(); + + // trigger scaleDown from node 0 to node 1 + servers[0].stop(); + + Assert.assertNull(servers[1].getPostOffice().getBinding(new SimpleString(queueName1))); + Assert.assertEquals(1, getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName2))).getQueue())); + Assert.assertNull(servers[1].getPostOffice().getBinding(new SimpleString(queueName3))); + } + private void checkBody(ClientMessage message, int bufferSize) { assertEquals(bufferSize, message.getBodySize()); byte[] body = new byte[message.getBodySize()];