This closes #932
This commit is contained in:
commit
be38f4dd45
|
@ -37,7 +37,7 @@ public class CreateQueue extends QueueAbstract {
|
|||
@Override
|
||||
public void setUpInvocation(ClientMessage message) throws Exception {
|
||||
String address = getAddress();
|
||||
ManagementHelper.putOperationInvocation(message, "broker", "createQueue", address, getRoutingType(), getName(), getFilter(), isDurable(), getMaxConsumers(), treatNoConsumers(true), isAutoCreateAddress());
|
||||
ManagementHelper.putOperationInvocation(message, "broker", "createQueue", address, getRoutingType(), getName(), getFilter(), isDurable(), getMaxConsumers(-1), treatNoConsumers(true), isAutoCreateAddress());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -41,7 +41,7 @@ public class QueueAbstract extends AbstractAction {
|
|||
private boolean keepOnNoConsumers = false;
|
||||
|
||||
@Option(name = "--max-consumers", description = "Maximum number of consumers allowed on this queue at any one time (default no limit)")
|
||||
private int maxConsumers = -1;
|
||||
private Integer maxConsumers;
|
||||
|
||||
@Option(name = "--auto-create-ddress", description = "Auto create the address (if it doesn't exist) with default values")
|
||||
private Boolean autoCreateAddress = false;
|
||||
|
@ -89,7 +89,10 @@ public class QueueAbstract extends AbstractAction {
|
|||
return this;
|
||||
}
|
||||
|
||||
public int getMaxConsumers() {
|
||||
public Integer getMaxConsumers(Integer defaultValue) {
|
||||
if (maxConsumers == null) {
|
||||
return defaultValue;
|
||||
}
|
||||
return maxConsumers;
|
||||
}
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ public class UpdateQueue extends QueueAbstract {
|
|||
performCoreManagement(new ManagementCallback<ClientMessage>() {
|
||||
@Override
|
||||
public void setUpInvocation(ClientMessage message) throws Exception {
|
||||
ManagementHelper.putOperationInvocation(message, "broker", "updateQueue", getName(), getRoutingType(), getMaxConsumers(), isDeleteOnNoConsumers());
|
||||
ManagementHelper.putOperationInvocation(message, "broker", "updateQueue", getName(), getRoutingType(), getMaxConsumers(null), isDeleteOnNoConsumers());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -469,7 +469,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
final Queue queue = queueBinding.getQueue();
|
||||
//TODO put the whole update logic on Queue
|
||||
//validate update
|
||||
if (maxConsumers != null) {
|
||||
if (maxConsumers != null && maxConsumers.intValue() != Queue.MAX_CONSUMERS_UNLIMITED) {
|
||||
final int consumerCount = queue.getConsumerCount();
|
||||
if (consumerCount > maxConsumers) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.invalidMaxConsumersUpdate(name.toString(), maxConsumers, consumerCount);
|
||||
|
|
|
@ -59,7 +59,7 @@ public class QueueCommandTest extends JMSTestBase {
|
|||
command.setMulticast(true);
|
||||
command.setAnycast(false);
|
||||
command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
|
||||
checkExecutionFailure(command, "AMQ119203: Address Does Not Exist:");
|
||||
checkExecutionFailure(command, "AMQ119203");
|
||||
assertFalse(server.queueQuery(new SimpleString(queueName)).isExists());
|
||||
}
|
||||
|
||||
|
@ -137,7 +137,7 @@ public class QueueCommandTest extends JMSTestBase {
|
|||
command.setAnycast(false);
|
||||
command.execute(new ActionContext());
|
||||
command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
|
||||
checkExecutionFailure(command, "AMQ119019: Queue already exists " + queueName);
|
||||
checkExecutionFailure(command, "AMQ119019");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -167,7 +167,7 @@ public class QueueCommandTest extends JMSTestBase {
|
|||
DeleteQueue delete = new DeleteQueue();
|
||||
delete.setName(queueName.toString());
|
||||
delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
|
||||
checkExecutionFailure(delete, "AMQ119017: Queue " + queueName + " does not exist");
|
||||
checkExecutionFailure(delete, "AMQ119017");
|
||||
|
||||
assertFalse(server.queueQuery(queueName).isExists());
|
||||
}
|
||||
|
@ -189,7 +189,7 @@ public class QueueCommandTest extends JMSTestBase {
|
|||
DeleteQueue delete = new DeleteQueue();
|
||||
delete.setName(queueName.toString());
|
||||
delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
|
||||
checkExecutionFailure(delete, "AMQ119025: Cannot delete queue " + queueName + " on binding deleteQueue");
|
||||
checkExecutionFailure(delete, "AMQ119025");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -257,7 +257,8 @@ public class QueueCommandTest extends JMSTestBase {
|
|||
final UpdateQueue updateQueue = new UpdateQueue();
|
||||
updateQueue.setName(queueName);
|
||||
updateQueue.setDeleteOnNoConsumers(newDeleteOnNoConsumers);
|
||||
updateQueue.setRoutingType(newRoutingType.name());
|
||||
updateQueue.setAnycast(true);
|
||||
updateQueue.setMulticast(false);
|
||||
updateQueue.setMaxConsumers(newMaxConsumers);
|
||||
updateQueue.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
|
||||
|
||||
|
@ -286,11 +287,12 @@ public class QueueCommandTest extends JMSTestBase {
|
|||
final RoutingType newRoutingType = RoutingType.ANYCAST;
|
||||
final UpdateQueue updateQueue = new UpdateQueue();
|
||||
updateQueue.setName(queueName);
|
||||
updateQueue.setRoutingType(newRoutingType.name());
|
||||
updateQueue.setAnycast(true);
|
||||
updateQueue.setMulticast(false);
|
||||
updateQueue.setMaxConsumers(-1);
|
||||
updateQueue.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
|
||||
|
||||
final String expectedErrorMessage = MessageFormat.format("Can''t update queue {0} with routing type: {1}, Supported routing types for address: {2} are {3}", queueName, newRoutingType, addressName, supportedRoutingTypes);
|
||||
checkExecutionFailure(updateQueue, expectedErrorMessage);
|
||||
checkExecutionFailure(updateQueue, "AMQ119211");
|
||||
|
||||
final QueueQueryResult queueQueryResult = server.queueQuery(queueNameString);
|
||||
assertEquals("maxConsumers", oldMaxConsumers, queueQueryResult.getMaxConsumers());
|
||||
|
@ -320,8 +322,7 @@ public class QueueCommandTest extends JMSTestBase {
|
|||
updateQueue.setMaxConsumers(newMaxConsumers);
|
||||
updateQueue.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
|
||||
|
||||
final String expectedErrorMessage = MessageFormat.format("Can''t update queue {0} with maxConsumers: {1}. Current consumers are {2}.", queueName, newMaxConsumers, 2);
|
||||
checkExecutionFailure(updateQueue, expectedErrorMessage);
|
||||
checkExecutionFailure(updateQueue, "AMQ119210");
|
||||
|
||||
final QueueQueryResult queueQueryResult = server.queueQuery(queueNameString);
|
||||
assertEquals("maxConsumers", oldMaxConsumers, queueQueryResult.getMaxConsumers());
|
||||
|
|
Loading…
Reference in New Issue