diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index 2578684251..2aafcb144e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -576,4 +576,28 @@ public interface QueueControl { @Operation(desc = "Flush internal executors", impact = MBeanOperationInfo.ACTION) void flushExecutor(); + /** + * Will reset the all the groups. + * This is useful if you want a complete rebalance of the groups to consumers + */ + @Operation(desc = "Resets all groups", impact = MBeanOperationInfo.ACTION) + void resetAllGroups(); + + /** + * Will reset the group matching the given groupID. + * This is useful if you want the given group to be rebalanced to the consumers + */ + @Operation(desc = "Reset the specified group", impact = MBeanOperationInfo.ACTION) + void resetGroup(@Parameter(name = "groupID", desc = "ID of group to reset") String groupID); + + /** + * Will return the current number of active groups. + */ + @Attribute(desc = "Get the current number of active groups") + int getGroupCount(); + + + @Operation(desc = "List all the existent group to consumers mappings on the Queue") + String listGroupsAsJSON() throws Exception; + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index e678ab8b8c..8963b24e31 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -1189,6 +1189,70 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + @Override + public void resetAllGroups() { + checkStarted(); + + clearIO(); + try { + queue.resetAllGroups(); + } finally { + blockOnIO(); + } + } + + @Override + public void resetGroup(String groupID) { + checkStarted(); + + clearIO(); + try { + queue.resetGroup(SimpleString.toSimpleString(groupID)); + } finally { + blockOnIO(); + } + } + + @Override + public int getGroupCount() { + checkStarted(); + + clearIO(); + try { + return queue.getGroupCount(); + } finally { + blockOnIO(); + } + } + + @Override + public String listGroupsAsJSON() throws Exception { + checkStarted(); + + clearIO(); + try { + Map groups = queue.getGroups(); + + JsonArrayBuilder jsonArray = JsonLoader.createArrayBuilder(); + + for (Map.Entry group : groups.entrySet()) { + + if (group.getValue() instanceof ServerConsumer) { + ServerConsumer serverConsumer = (ServerConsumer) group.getValue(); + + JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("groupID", group.getKey().toString()).add("consumerID", serverConsumer.getID()).add("connectionID", serverConsumer.getConnectionID().toString()).add("sessionID", serverConsumer.getSessionID()).add("browseOnly", serverConsumer.isBrowseOnly()).add("creationTime", serverConsumer.getCreationTime()); + + jsonArray.add(obj); + } + + } + + return jsonArray.build().toString(); + } finally { + blockOnIO(); + } + } + @Override public String listConsumersAsJSON() throws Exception { checkStarted(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index a23535274f..c549be9bf3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -255,6 +255,14 @@ public interface Queue extends Bindable,CriticalComponent { Collection getConsumers(); + Map getGroups(); + + void resetGroup(SimpleString groupID); + + void resetAllGroups(); + + int getGroupCount(); + boolean checkRedelivery(MessageReference ref, long timeBase, boolean ignoreRedeliveryDelay) throws Exception; /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 957c2bbf23..bf86823c59 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1038,6 +1038,26 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return new HashSet<>(consumerSet); } + @Override + public synchronized Map getGroups() { + return new HashMap<>(groups); + } + + @Override + public synchronized void resetGroup(SimpleString groupId) { + groups.remove(groupId); + } + + @Override + public synchronized void resetAllGroups() { + groups.clear(); + } + + @Override + public synchronized int getGroupCount() { + return groups.size(); + } + @Override public boolean hasMatchingConsumer(final Message message) { for (ConsumerHolder holder : consumerList) { diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index fdce2d03cb..9005898a8f 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1242,6 +1242,26 @@ public class ScheduledDeliveryHandlerTest extends Assert { return null; } + @Override + public Map getGroups() { + return null; + } + + @Override + public void resetGroup(SimpleString groupID) { + + } + + @Override + public void resetAllGroups() { + + } + + @Override + public int getGroupCount() { + return 0; + } + @Override public boolean checkRedelivery(MessageReference ref, long timeBase, diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java index 9967e76b34..baaaab3a78 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementTestBase.java @@ -122,6 +122,12 @@ public abstract class ManagementTestBase extends ActiveMQTestBase { return control.getMessageCount(); } + protected int getGroupCount(QueueControl control) throws Exception { + control.flushExecutor(); + return control.getGroupCount(); + } + + protected long getDurableMessageCount(QueueControl control) throws Exception { control.flushExecutor(); return control.getDurableMessageCount(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 5a49e2a229..7bef025ddf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -2510,6 +2510,48 @@ public class QueueControlTest extends ManagementTestBase { Assert.assertEquals(new String(body), "theBody"); } + @Test + public void testResetGroups() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + session.createQueue(address, RoutingType.MULTICAST, queue, null, durable); + + QueueControl queueControl = createManagementControl(address, queue); + + ClientConsumer consumer = session.createConsumer(queue); + Assert.assertEquals(1, queueControl.getConsumerCount()); + consumer.setMessageHandler(new MessageHandler() { + @Override + public void onMessage(ClientMessage message) { + System.out.println(message); + } + }); + session.start(); + + ClientProducer producer = session.createProducer(address); + producer.send(session.createMessage(durable).putStringProperty(Message.HDR_GROUP_ID, "group1")); + producer.send(session.createMessage(durable).putStringProperty(Message.HDR_GROUP_ID, "group2")); + producer.send(session.createMessage(durable).putStringProperty(Message.HDR_GROUP_ID, "group3")); + + Wait.assertEquals(3, () -> getGroupCount(queueControl)); + + queueControl.resetGroup("group1"); + + Wait.assertEquals(2, () -> getGroupCount(queueControl)); + + producer.send(session.createMessage(durable).putStringProperty(Message.HDR_GROUP_ID, "group1")); + + Wait.assertEquals(3, () -> getGroupCount(queueControl)); + + queueControl.resetAllGroups(); + + Wait.assertEquals(0, () -> getGroupCount(queueControl)); + + consumer.close(); + session.deleteQueue(queue); + } + @Test public void testGetScheduledCountOnRemove() throws Exception { long delay = Integer.MAX_VALUE; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index aafbb5b76d..3060799df6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -49,6 +49,38 @@ public class QueueControlUsingCoreTest extends QueueControlTest { } } + @Override + public void resetAllGroups() { + try { + proxy.invokeOperation("resetAllGroups"); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + @Override + public void resetGroup(String groupID) { + try { + proxy.invokeOperation("resetGroup"); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + @Override + public int getGroupCount() { + try { + return (Integer) proxy.invokeOperation("groupCount"); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + @Override + public String listGroupsAsJSON() throws Exception { + return (String) proxy.invokeOperation("listGroupsAsJSON"); + } + @Override public boolean changeMessagePriority(final long messageID, final int newPriority) throws Exception { return (Boolean) proxy.invokeOperation("changeMessagePriority", messageID, newPriority); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index bc628f580c..5c37aec355 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -342,6 +342,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override + public int getConsumerCount() { // no-op return 0; @@ -358,6 +359,26 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { return null; } + @Override + public Map getGroups() { + return null; + } + + @Override + public void resetGroup(SimpleString groupID) { + + } + + @Override + public void resetAllGroups() { + + } + + @Override + public int getGroupCount() { + return 0; + } + @Override public int getDeliveringCount() { // no-op