ARTEMIS-857 Add JMX endpoints to view and reset groups

Expose method to return current mappings of groups to consumers
Expose methods to reset (remove) specific group mapping from groupID to Consumer
Expose methods to reset (remove) all group mappings
This commit is contained in:
Michael André Pearce 2018-04-16 17:11:44 +01:00 committed by Howard Gao
parent 7156b61408
commit af91d3ac82
9 changed files with 237 additions and 0 deletions

View File

@ -576,4 +576,28 @@ public interface QueueControl {
@Operation(desc = "Flush internal executors", impact = MBeanOperationInfo.ACTION)
void flushExecutor();
/**
* Will reset the all the groups.
* This is useful if you want a complete rebalance of the groups to consumers
*/
@Operation(desc = "Resets all groups", impact = MBeanOperationInfo.ACTION)
void resetAllGroups();
/**
* Will reset the group matching the given groupID.
* This is useful if you want the given group to be rebalanced to the consumers
*/
@Operation(desc = "Reset the specified group", impact = MBeanOperationInfo.ACTION)
void resetGroup(@Parameter(name = "groupID", desc = "ID of group to reset") String groupID);
/**
* Will return the current number of active groups.
*/
@Attribute(desc = "Get the current number of active groups")
int getGroupCount();
@Operation(desc = "List all the existent group to consumers mappings on the Queue")
String listGroupsAsJSON() throws Exception;
}

View File

@ -1189,6 +1189,70 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
}
@Override
public void resetAllGroups() {
checkStarted();
clearIO();
try {
queue.resetAllGroups();
} finally {
blockOnIO();
}
}
@Override
public void resetGroup(String groupID) {
checkStarted();
clearIO();
try {
queue.resetGroup(SimpleString.toSimpleString(groupID));
} finally {
blockOnIO();
}
}
@Override
public int getGroupCount() {
checkStarted();
clearIO();
try {
return queue.getGroupCount();
} finally {
blockOnIO();
}
}
@Override
public String listGroupsAsJSON() throws Exception {
checkStarted();
clearIO();
try {
Map<SimpleString, Consumer> groups = queue.getGroups();
JsonArrayBuilder jsonArray = JsonLoader.createArrayBuilder();
for (Map.Entry<SimpleString, Consumer> group : groups.entrySet()) {
if (group.getValue() instanceof ServerConsumer) {
ServerConsumer serverConsumer = (ServerConsumer) group.getValue();
JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("groupID", group.getKey().toString()).add("consumerID", serverConsumer.getID()).add("connectionID", serverConsumer.getConnectionID().toString()).add("sessionID", serverConsumer.getSessionID()).add("browseOnly", serverConsumer.isBrowseOnly()).add("creationTime", serverConsumer.getCreationTime());
jsonArray.add(obj);
}
}
return jsonArray.build().toString();
} finally {
blockOnIO();
}
}
@Override
public String listConsumersAsJSON() throws Exception {
checkStarted();

View File

@ -255,6 +255,14 @@ public interface Queue extends Bindable,CriticalComponent {
Collection<Consumer> getConsumers();
Map<SimpleString, Consumer> getGroups();
void resetGroup(SimpleString groupID);
void resetAllGroups();
int getGroupCount();
boolean checkRedelivery(MessageReference ref, long timeBase, boolean ignoreRedeliveryDelay) throws Exception;
/**

View File

@ -1038,6 +1038,26 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return new HashSet<>(consumerSet);
}
@Override
public synchronized Map<SimpleString, Consumer> getGroups() {
return new HashMap<>(groups);
}
@Override
public synchronized void resetGroup(SimpleString groupId) {
groups.remove(groupId);
}
@Override
public synchronized void resetAllGroups() {
groups.clear();
}
@Override
public synchronized int getGroupCount() {
return groups.size();
}
@Override
public boolean hasMatchingConsumer(final Message message) {
for (ConsumerHolder holder : consumerList) {

View File

@ -1242,6 +1242,26 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return null;
}
@Override
public Map<SimpleString, Consumer> getGroups() {
return null;
}
@Override
public void resetGroup(SimpleString groupID) {
}
@Override
public void resetAllGroups() {
}
@Override
public int getGroupCount() {
return 0;
}
@Override
public boolean checkRedelivery(MessageReference ref,
long timeBase,

View File

@ -122,6 +122,12 @@ public abstract class ManagementTestBase extends ActiveMQTestBase {
return control.getMessageCount();
}
protected int getGroupCount(QueueControl control) throws Exception {
control.flushExecutor();
return control.getGroupCount();
}
protected long getDurableMessageCount(QueueControl control) throws Exception {
control.flushExecutor();
return control.getDurableMessageCount();

View File

@ -2510,6 +2510,48 @@ public class QueueControlTest extends ManagementTestBase {
Assert.assertEquals(new String(body), "theBody");
}
@Test
public void testResetGroups() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
QueueControl queueControl = createManagementControl(address, queue);
ClientConsumer consumer = session.createConsumer(queue);
Assert.assertEquals(1, queueControl.getConsumerCount());
consumer.setMessageHandler(new MessageHandler() {
@Override
public void onMessage(ClientMessage message) {
System.out.println(message);
}
});
session.start();
ClientProducer producer = session.createProducer(address);
producer.send(session.createMessage(durable).putStringProperty(Message.HDR_GROUP_ID, "group1"));
producer.send(session.createMessage(durable).putStringProperty(Message.HDR_GROUP_ID, "group2"));
producer.send(session.createMessage(durable).putStringProperty(Message.HDR_GROUP_ID, "group3"));
Wait.assertEquals(3, () -> getGroupCount(queueControl));
queueControl.resetGroup("group1");
Wait.assertEquals(2, () -> getGroupCount(queueControl));
producer.send(session.createMessage(durable).putStringProperty(Message.HDR_GROUP_ID, "group1"));
Wait.assertEquals(3, () -> getGroupCount(queueControl));
queueControl.resetAllGroups();
Wait.assertEquals(0, () -> getGroupCount(queueControl));
consumer.close();
session.deleteQueue(queue);
}
@Test
public void testGetScheduledCountOnRemove() throws Exception {
long delay = Integer.MAX_VALUE;

View File

@ -49,6 +49,38 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
}
}
@Override
public void resetAllGroups() {
try {
proxy.invokeOperation("resetAllGroups");
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
@Override
public void resetGroup(String groupID) {
try {
proxy.invokeOperation("resetGroup");
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
@Override
public int getGroupCount() {
try {
return (Integer) proxy.invokeOperation("groupCount");
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
@Override
public String listGroupsAsJSON() throws Exception {
return (String) proxy.invokeOperation("listGroupsAsJSON");
}
@Override
public boolean changeMessagePriority(final long messageID, final int newPriority) throws Exception {
return (Boolean) proxy.invokeOperation("changeMessagePriority", messageID, newPriority);

View File

@ -342,6 +342,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public int getConsumerCount() {
// no-op
return 0;
@ -358,6 +359,26 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
return null;
}
@Override
public Map<SimpleString, Consumer> getGroups() {
return null;
}
@Override
public void resetGroup(SimpleString groupID) {
}
@Override
public void resetAllGroups() {
}
@Override
public int getGroupCount() {
return 0;
}
@Override
public int getDeliveringCount() {
// no-op