This commit is contained in:
Howard Gao 2018-04-17 16:36:19 +08:00
commit 1118bdb65e
9 changed files with 237 additions and 0 deletions

View File

@ -576,4 +576,28 @@ public interface QueueControl {
@Operation(desc = "Flush internal executors", impact = MBeanOperationInfo.ACTION)
void flushExecutor();
/**
* Will reset the all the groups.
* This is useful if you want a complete rebalance of the groups to consumers
*/
@Operation(desc = "Resets all groups", impact = MBeanOperationInfo.ACTION)
void resetAllGroups();
/**
* Will reset the group matching the given groupID.
* This is useful if you want the given group to be rebalanced to the consumers
*/
@Operation(desc = "Reset the specified group", impact = MBeanOperationInfo.ACTION)
void resetGroup(@Parameter(name = "groupID", desc = "ID of group to reset") String groupID);
/**
* Will return the current number of active groups.
*/
@Attribute(desc = "Get the current number of active groups")
int getGroupCount();
@Operation(desc = "List all the existent group to consumers mappings on the Queue")
String listGroupsAsJSON() throws Exception;
}

View File

@ -1189,6 +1189,70 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
}
@Override
public void resetAllGroups() {
checkStarted();
clearIO();
try {
queue.resetAllGroups();
} finally {
blockOnIO();
}
}
@Override
public void resetGroup(String groupID) {
checkStarted();
clearIO();
try {
queue.resetGroup(SimpleString.toSimpleString(groupID));
} finally {
blockOnIO();
}
}
@Override
public int getGroupCount() {
checkStarted();
clearIO();
try {
return queue.getGroupCount();
} finally {
blockOnIO();
}
}
@Override
public String listGroupsAsJSON() throws Exception {
checkStarted();
clearIO();
try {
Map<SimpleString, Consumer> groups = queue.getGroups();
JsonArrayBuilder jsonArray = JsonLoader.createArrayBuilder();
for (Map.Entry<SimpleString, Consumer> group : groups.entrySet()) {
if (group.getValue() instanceof ServerConsumer) {
ServerConsumer serverConsumer = (ServerConsumer) group.getValue();
JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("groupID", group.getKey().toString()).add("consumerID", serverConsumer.getID()).add("connectionID", serverConsumer.getConnectionID().toString()).add("sessionID", serverConsumer.getSessionID()).add("browseOnly", serverConsumer.isBrowseOnly()).add("creationTime", serverConsumer.getCreationTime());
jsonArray.add(obj);
}
}
return jsonArray.build().toString();
} finally {
blockOnIO();
}
}
@Override
public String listConsumersAsJSON() throws Exception {
checkStarted();

View File

@ -255,6 +255,14 @@ public interface Queue extends Bindable,CriticalComponent {
Collection<Consumer> getConsumers();
Map<SimpleString, Consumer> getGroups();
void resetGroup(SimpleString groupID);
void resetAllGroups();
int getGroupCount();
boolean checkRedelivery(MessageReference ref, long timeBase, boolean ignoreRedeliveryDelay) throws Exception;
/**

View File

@ -1038,6 +1038,26 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return new HashSet<>(consumerSet);
}
@Override
public synchronized Map<SimpleString, Consumer> getGroups() {
return new HashMap<>(groups);
}
@Override
public synchronized void resetGroup(SimpleString groupId) {
groups.remove(groupId);
}
@Override
public synchronized void resetAllGroups() {
groups.clear();
}
@Override
public synchronized int getGroupCount() {
return groups.size();
}
@Override
public boolean hasMatchingConsumer(final Message message) {
for (ConsumerHolder holder : consumerList) {

View File

@ -1242,6 +1242,26 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return null;
}
@Override
public Map<SimpleString, Consumer> getGroups() {
return null;
}
@Override
public void resetGroup(SimpleString groupID) {
}
@Override
public void resetAllGroups() {
}
@Override
public int getGroupCount() {
return 0;
}
@Override
public boolean checkRedelivery(MessageReference ref,
long timeBase,

View File

@ -122,6 +122,12 @@ public abstract class ManagementTestBase extends ActiveMQTestBase {
return control.getMessageCount();
}
protected int getGroupCount(QueueControl control) throws Exception {
control.flushExecutor();
return control.getGroupCount();
}
protected long getDurableMessageCount(QueueControl control) throws Exception {
control.flushExecutor();
return control.getDurableMessageCount();

View File

@ -2510,6 +2510,48 @@ public class QueueControlTest extends ManagementTestBase {
Assert.assertEquals(new String(body), "theBody");
}
@Test
public void testResetGroups() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
QueueControl queueControl = createManagementControl(address, queue);
ClientConsumer consumer = session.createConsumer(queue);
Assert.assertEquals(1, queueControl.getConsumerCount());
consumer.setMessageHandler(new MessageHandler() {
@Override
public void onMessage(ClientMessage message) {
System.out.println(message);
}
});
session.start();
ClientProducer producer = session.createProducer(address);
producer.send(session.createMessage(durable).putStringProperty(Message.HDR_GROUP_ID, "group1"));
producer.send(session.createMessage(durable).putStringProperty(Message.HDR_GROUP_ID, "group2"));
producer.send(session.createMessage(durable).putStringProperty(Message.HDR_GROUP_ID, "group3"));
Wait.assertEquals(3, () -> getGroupCount(queueControl));
queueControl.resetGroup("group1");
Wait.assertEquals(2, () -> getGroupCount(queueControl));
producer.send(session.createMessage(durable).putStringProperty(Message.HDR_GROUP_ID, "group1"));
Wait.assertEquals(3, () -> getGroupCount(queueControl));
queueControl.resetAllGroups();
Wait.assertEquals(0, () -> getGroupCount(queueControl));
consumer.close();
session.deleteQueue(queue);
}
@Test
public void testGetScheduledCountOnRemove() throws Exception {
long delay = Integer.MAX_VALUE;

View File

@ -49,6 +49,38 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
}
}
@Override
public void resetAllGroups() {
try {
proxy.invokeOperation("resetAllGroups");
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
@Override
public void resetGroup(String groupID) {
try {
proxy.invokeOperation("resetGroup");
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
@Override
public int getGroupCount() {
try {
return (Integer) proxy.invokeOperation("groupCount");
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
@Override
public String listGroupsAsJSON() throws Exception {
return (String) proxy.invokeOperation("listGroupsAsJSON");
}
@Override
public boolean changeMessagePriority(final long messageID, final int newPriority) throws Exception {
return (Boolean) proxy.invokeOperation("changeMessagePriority", messageID, newPriority);

View File

@ -342,6 +342,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public int getConsumerCount() {
// no-op
return 0;
@ -358,6 +359,26 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
return null;
}
@Override
public Map<SimpleString, Consumer> getGroups() {
return null;
}
@Override
public void resetGroup(SimpleString groupID) {
}
@Override
public void resetAllGroups() {
}
@Override
public int getGroupCount() {
return 0;
}
@Override
public int getDeliveringCount() {
// no-op