This commit is contained in:
Clebert Suconic 2020-04-08 12:53:56 -04:00
commit d13b802db3
7 changed files with 137 additions and 5 deletions

View File

@ -2300,4 +2300,28 @@ public interface AuditLogger extends BasicLogger {
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601505, value = "User {0} is getting disk store usage percentage on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void getDiskStoreUsagePercentage(String user, Object source, Object... args);
static void isGroupRebalance(Object source) {
LOGGER.isGroupRebalance(getCaller(), source);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601506, value = "User {0} is getting group rebalance property on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void isGroupRebalance(String user, Object source, Object... args);
static void getGroupBuckets(Object source) {
LOGGER.getGroupBuckets(getCaller(), source);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601507, value = "User {0} is getting group buckets on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void getGroupBuckets(String user, Object source, Object... args);
static void getGroupFirstKey(Object source) {
LOGGER.getGroupFirstKey(getCaller(), source);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601508, value = "User {0} is getting group first key on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void getGroupFirstKey(String user, Object source, Object... args);
}

View File

@ -676,4 +676,21 @@ public interface QueueControl {
@Attribute(desc = "Get the ring size")
long getRingSize();
/**
* Returns whether the groups of this queue are automatically rebalanced.
*/
@Attribute(desc = "whether the groups of this queue are automatically rebalanced")
boolean isGroupRebalance();
/**
* Will return the group buckets.
*/
@Attribute(desc = "Get the group buckets")
int getGroupBuckets();
/**
* Will return the header key to notify a consumer of a group change.
*/
@Attribute(desc = "Get the header key to notify a consumer of a group change")
String getGroupFirstKey();
}

View File

@ -1687,6 +1687,53 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
@Override
public boolean isGroupRebalance() {
if (AuditLogger.isEnabled()) {
AuditLogger.isGroupRebalance(queue);
}
checkStarted();
clearIO();
try {
return queue.isGroupRebalance();
} finally {
blockOnIO();
}
}
@Override
public int getGroupBuckets() {
if (AuditLogger.isEnabled()) {
AuditLogger.getGroupBuckets(queue);
}
checkStarted();
clearIO();
try {
return queue.getMaxConsumers();
} finally {
blockOnIO();
}
}
@Override
public String getGroupFirstKey() {
if (AuditLogger.isEnabled()) {
AuditLogger.getMaxConsumers(queue);
}
checkStarted();
clearIO();
try {
SimpleString groupFirstKey = queue.getGroupFirstKey();
return groupFirstKey != null ? groupFirstKey.toString() : null;
} finally {
blockOnIO();
}
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------

View File

@ -63,7 +63,10 @@ public class QueueView extends ActiveMQAbstractView<QueueControl> {
.add("deliverDeliver", toString(q.isDirectDeliver()))
.add("exclusive", toString(queue.isExclusive()))
.add("lastValue", toString(queue.isLastValue()))
.add("scheduledCount", toString(queue.getScheduledCount()));
.add("scheduledCount", toString(queue.getScheduledCount()))
.add("groupRebalance", toString(queue.isGroupRebalance()))
.add("groupBuckets", toString(queue.getGroupBuckets()))
.add("groupFirstKey", toString(queue.getGroupFirstKey()));
return obj;
}
@ -117,6 +120,12 @@ public class QueueView extends ActiveMQAbstractView<QueueControl> {
return q.isLastValue();
case "scheduledCount":
return q.getScheduledCount();
case "groupRebalance":
return queue.isGroupRebalance();
case "groupBuckets":
return queue.getGroupBuckets();
case "groupFirstKey":
return queue.getGroupFirstKey();
default:
throw new IllegalArgumentException("Unsupported field, " + fieldName);
}

View File

@ -372,9 +372,9 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
Assert.assertEquals(purgeOnNoConsumers, queueControl.isPurgeOnNoConsumers());
Assert.assertEquals(false, queueControl.isTemporary());
Assert.assertEquals(exclusive, queueControl.isExclusive());
// Assert.assertEquals(groupRebalance, queueControl.getGroupRebalance());
// Assert.assertEquals(groupBuckets, queueControl.getGroupBuckets());
// Assert.assertEquals(groupFirstKey, queueControl.getGroupFirstKey());
Assert.assertEquals(groupRebalance, queueControl.isGroupRebalance());
Assert.assertEquals(groupBuckets, queueControl.getGroupBuckets());
Assert.assertEquals(groupFirstKey, queueControl.getGroupFirstKey());
Assert.assertEquals(lastValue, queueControl.isLastValue());
// Assert.assertEquals(lastValueKey, queueControl.getLastValueKey());
// Assert.assertEquals(nonDestructive, queueControl.isNonDestructive());
@ -2096,7 +2096,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
// as for what we expect it's either true or false through management, we are not testing for directDeliver here, just
// if management works.
Assert.assertTrue(resultDirectDeliver.equals("true") || resultDirectDeliver.equals("false"));
Assert.assertEquals("exclusive", "false", array.getJsonObject(0).getString("exclusive"));
Assert.assertEquals("lastValue", "false", array.getJsonObject(0).getString("lastValue"));
Assert.assertEquals("scheduledCount", "0", array.getJsonObject(0).getString("scheduledCount"));
Assert.assertEquals("groupRebalance", "false", array.getJsonObject(0).getString("groupRebalance"));
Assert.assertEquals("groupBuckets", "-1", array.getJsonObject(0).getString("groupBuckets"));
Assert.assertEquals("groupFirstKey", "", array.getJsonObject(0).getString("groupFirstKey"));
}
@Test

View File

@ -122,6 +122,21 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue);
}
@Test
public void testGroupAttributes() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(address, RoutingType.MULTICAST, queue);
QueueControl queueControl = createManagementControl(address, queue);
Assert.assertFalse(queueControl.isGroupRebalance());
Assert.assertEquals(-1, queueControl.getGroupBuckets());
Assert.assertEquals(null, queueControl.getGroupFirstKey());
session.deleteQueue(queue);
}
@Test
public void testRetroactiveResourceAttribute() throws Exception {
SimpleString baseAddress = RandomUtil.randomSimpleString();

View File

@ -90,6 +90,21 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
}
}
@Override
public boolean isGroupRebalance() {
return (Boolean) proxy.retrieveAttributeValue("groupRebalance");
}
@Override
public int getGroupBuckets() {
return (Integer) proxy.retrieveAttributeValue("groupBuckets", Integer.class);
}
@Override
public String getGroupFirstKey() {
return (String) proxy.retrieveAttributeValue("groupFirstKey");
}
@Override
public boolean changeMessagePriority(final long messageID, final int newPriority) throws Exception {
return (Boolean) proxy.invokeOperation("changeMessagePriority", messageID, newPriority);