From 0a4d1b38c8f2d361f57bf7da7dfa969e1692ec01 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Thu, 15 Dec 2016 11:20:25 +0100 Subject: [PATCH] ARTEMIS-878: Improved CLI commands --- .../apache/activemq/artemis/cli/Artemis.java | 12 +- .../cli/commands/address/CreateAddress.java | 3 +- ...AddRoutingType.java => UpdateAddress.java} | 25 ++-- .../cli/commands/queue/CreateQueue.java | 3 +- .../UpdateQueue.java} | 48 +++++-- .../management/ActiveMQServerControl.java | 31 ++-- .../impl/ActiveMQServerControlImpl.java | 132 ++++++++++++++---- .../core/postoffice/AddressManager.java | 10 +- .../artemis/core/postoffice/PostOffice.java | 15 +- .../core/postoffice/impl/PostOfficeImpl.java | 77 ++++++---- .../postoffice/impl/SimpleAddressManager.java | 54 +++++-- .../core/server/ActiveMQMessageBundle.java | 15 +- .../artemis/core/server/ActiveMQServer.java | 28 ++-- .../activemq/artemis/core/server/Queue.java | 4 + .../core/server/impl/ActiveMQServerImpl.java | 65 ++++++--- .../artemis/core/server/impl/QueueImpl.java | 18 ++- .../impl/ScheduledDeliveryHandlerTest.java | 12 +- .../integration/cli/AddressCommandTest.java | 72 +++------- .../integration/cli/QueueCommandTest.java | 106 ++++++++++++++ .../ActiveMQServerControlUsingCoreTest.java | 38 ++--- .../unit/core/postoffice/impl/FakeQueue.java | 12 +- .../server/impl/fakes/FakePostOffice.java | 20 +-- 22 files changed, 553 insertions(+), 247 deletions(-) rename artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/{AddRoutingType.java => UpdateAddress.java} (74%) rename artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/{address/RemoveRoutingType.java => queue/UpdateQueue.java} (58%) diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java index 13e2f7cada..8edf871d1c 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java @@ -30,18 +30,18 @@ import org.apache.activemq.artemis.cli.commands.Kill; import org.apache.activemq.artemis.cli.commands.Mask; import org.apache.activemq.artemis.cli.commands.Run; import org.apache.activemq.artemis.cli.commands.Stop; -import org.apache.activemq.artemis.cli.commands.address.AddRoutingType; import org.apache.activemq.artemis.cli.commands.address.CreateAddress; import org.apache.activemq.artemis.cli.commands.address.DeleteAddress; import org.apache.activemq.artemis.cli.commands.address.HelpAddress; -import org.apache.activemq.artemis.cli.commands.address.RemoveRoutingType; import org.apache.activemq.artemis.cli.commands.address.ShowAddress; +import org.apache.activemq.artemis.cli.commands.address.UpdateAddress; import org.apache.activemq.artemis.cli.commands.queue.CreateQueue; import org.apache.activemq.artemis.cli.commands.queue.DeleteQueue; import org.apache.activemq.artemis.cli.commands.queue.HelpQueue; import org.apache.activemq.artemis.cli.commands.messages.Browse; import org.apache.activemq.artemis.cli.commands.messages.Consumer; import org.apache.activemq.artemis.cli.commands.messages.Producer; +import org.apache.activemq.artemis.cli.commands.queue.UpdateQueue; import org.apache.activemq.artemis.cli.commands.tools.CompactJournal; import org.apache.activemq.artemis.cli.commands.tools.DecodeJournal; import org.apache.activemq.artemis.cli.commands.tools.EncodeJournal; @@ -134,11 +134,11 @@ public class Artemis { String instance = artemisInstance != null ? artemisInstance.getAbsolutePath() : System.getProperty("artemis.instance"); Cli.CliBuilder builder = Cli.builder("artemis").withDescription("ActiveMQ Artemis Command Line").withCommand(HelpAction.class).withCommand(Producer.class).withCommand(Consumer.class).withCommand(Browse.class).withCommand(Mask.class).withDefaultCommand(HelpAction.class); - builder.withGroup("queue").withDescription("Queue tools group (create|delete) (example ./artemis queue create)"). - withDefaultCommand(HelpQueue.class).withCommands(CreateQueue.class, DeleteQueue.class); + builder.withGroup("queue").withDescription("Queue tools group (create|delete|update) (example ./artemis queue create)"). + withDefaultCommand(HelpQueue.class).withCommands(CreateQueue.class, DeleteQueue.class, UpdateQueue.class); - builder.withGroup("address").withDescription("Address tools group (create|delete|addRoutingType|removeRoutingType|show) (example ./artemis address create)"). - withDefaultCommand(HelpAddress.class).withCommands(CreateAddress.class, DeleteAddress.class, AddRoutingType.class, RemoveRoutingType.class, ShowAddress.class); + builder.withGroup("address").withDescription("Address tools group (create|delete|update|show) (example ./artemis address create)"). + withDefaultCommand(HelpAddress.class).withCommands(CreateAddress.class, DeleteAddress.class, UpdateAddress.class, ShowAddress.class); if (instance != null) { builder.withGroup("data").withDescription("data tools group (print|exp|imp|exp|encode|decode|compact) (example ./artemis data print)"). diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/CreateAddress.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/CreateAddress.java index 2795238383..ab8e69fd2e 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/CreateAddress.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/CreateAddress.java @@ -50,7 +50,8 @@ public class CreateAddress extends AbstractAction { @Override public void requestSuccessful(ClientMessage reply) throws Exception { - context.out.println("Address " + getName() + " created successfully."); + final String result = ManagementHelper.getResult(reply, String.class) + " created successfully."; + context.out.println(result); } @Override diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/AddRoutingType.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/UpdateAddress.java similarity index 74% rename from artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/AddRoutingType.java rename to artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/UpdateAddress.java index fd79620f73..1a9c24718e 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/AddRoutingType.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/UpdateAddress.java @@ -24,32 +24,33 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.cli.commands.AbstractAction; import org.apache.activemq.artemis.cli.commands.ActionContext; -@Command(name = "addRoutingType", description = "add the provided routing types to an address") -public class AddRoutingType extends AbstractAction { +@Command(name = "update", description = "update an address") +public class UpdateAddress extends AbstractAction { @Option(name = "--name", description = "The name of this address", required = true) String name; - @Option(name = "--routingType", description = "The routing types to be added to this address, options are 'anycast' or 'multicast'", required = true) - String routingType; + @Option(name = "--routingTypes", description = "The routing types supported by this address, options are 'anycast' or 'multicast', enter comma separated list") + String routingTypes = null; @Override public Object execute(ActionContext context) throws Exception { super.execute(context); - addRoutingType(context); + updateAddress(context); return null; } - private void addRoutingType(final ActionContext context) throws Exception { + private void updateAddress(final ActionContext context) throws Exception { performCoreManagement(new AbstractAction.ManagementCallback() { @Override public void setUpInvocation(ClientMessage message) throws Exception { - ManagementHelper.putOperationInvocation(message, "broker", "addRoutingType", name, routingType); + ManagementHelper.putOperationInvocation(message, "broker", "updateAddress", name, routingTypes); } @Override public void requestSuccessful(ClientMessage reply) throws Exception { - context.out.println("Address " + name + " updated successfully."); + final String result = ManagementHelper.getResult(reply, String.class) + " updated successfully."; + context.out.println(result); } @Override @@ -68,11 +69,11 @@ public class AddRoutingType extends AbstractAction { this.name = name; } - public String getRoutingType() { - return routingType; + public String getRoutingTypes() { + return routingTypes; } - public void setRoutingType(String routingType) { - this.routingType = routingType; + public void setRoutingTypes(String routingTypes) { + this.routingTypes = routingTypes; } } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/CreateQueue.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/CreateQueue.java index c2497c9d75..78a6d33c31 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/CreateQueue.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/CreateQueue.java @@ -75,7 +75,8 @@ public class CreateQueue extends AbstractAction { @Override public void requestSuccessful(ClientMessage reply) throws Exception { - context.out.println("Core queue " + getName() + " created successfully."); + final String result = ManagementHelper.getResult(reply, String.class) + " created successfully."; + context.out.println(result); } @Override diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/RemoveRoutingType.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/UpdateQueue.java similarity index 58% rename from artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/RemoveRoutingType.java rename to artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/UpdateQueue.java index 637de6b37d..d457771e7a 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/RemoveRoutingType.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/UpdateQueue.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.activemq.artemis.cli.commands.address; +package org.apache.activemq.artemis.cli.commands.queue; import io.airlift.airline.Command; import io.airlift.airline.Option; @@ -24,38 +24,45 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.cli.commands.AbstractAction; import org.apache.activemq.artemis.cli.commands.ActionContext; -@Command(name = "removeRoutingType", description = "remove the provided routing types from an address") -public class RemoveRoutingType extends AbstractAction { +@Command(name = "update", description = "update a core queue") +public class UpdateQueue extends AbstractAction { - @Option(name = "--name", description = "The name of the address", required = true) + @Option(name = "--name", description = "name", required = true) String name; - @Option(name = "--routingType", description = "The routing types to be removed from this address, options are 'anycast' or 'multicast'", required = true) - String routingType; + @Option(name = "--deleteOnNoConsumers", description = "whether to delete when it's last consumers disconnects)") + Boolean deleteOnNoConsumers = null; + + @Option(name = "--maxConsumers", description = "Maximum number of consumers allowed at any one time") + Integer maxConsumers = null; + + @Option(name = "--routingType", description = "The routing type supported by this queue, options are 'anycast' or 'multicast'") + String routingType = null; @Override public Object execute(ActionContext context) throws Exception { super.execute(context); - removeRoutingType(context); + createQueue(context); return null; } - private void removeRoutingType(final ActionContext context) throws Exception { - performCoreManagement(new AbstractAction.ManagementCallback() { + private void createQueue(final ActionContext context) throws Exception { + performCoreManagement(new ManagementCallback() { @Override public void setUpInvocation(ClientMessage message) throws Exception { - ManagementHelper.putOperationInvocation(message, "broker", "removeRoutingType", name, routingType); + ManagementHelper.putOperationInvocation(message, "broker", "updateQueue", name, routingType, maxConsumers, deleteOnNoConsumers); } @Override public void requestSuccessful(ClientMessage reply) throws Exception { - context.out.println("Address " + name + " updated successfully."); + final String result = ManagementHelper.getResult(reply, String.class) + " updated successfully."; + context.out.println(result); } @Override public void requestFailed(ClientMessage reply) throws Exception { String errMsg = (String) ManagementHelper.getResult(reply, String.class); - context.err.println("Failed to update address " + name + ". Reason: " + errMsg); + context.err.println("Failed to update " + name + ". Reason: " + errMsg); } }); } @@ -68,6 +75,22 @@ public class RemoveRoutingType extends AbstractAction { this.name = name; } + public Boolean getDeleteOnNoConsumers() { + return deleteOnNoConsumers; + } + + public void setDeleteOnNoConsumers(boolean deleteOnNoConsumers) { + this.deleteOnNoConsumers = deleteOnNoConsumers; + } + + public Integer getMaxConsumers() { + return maxConsumers; + } + + public void setMaxConsumers(int maxConsumers) { + this.maxConsumers = maxConsumers; + } + public String getRoutingType() { return routingType; } @@ -76,3 +99,4 @@ public class RemoveRoutingType extends AbstractAction { this.routingType = routingType; } } + diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index 38bf20097d..55dcb5755e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -435,16 +435,12 @@ public interface ActiveMQServerControl { // Operations ---------------------------------------------------- @Operation(desc = "create an address", impact = MBeanOperationInfo.ACTION) - void createAddress(@Parameter(name = "name", desc = "The name of the address") String name, + String createAddress(@Parameter(name = "name", desc = "The name of the address") String name, @Parameter(name = "routingTypes", desc = "Comma separated list of Routing Types (anycast/multicast)") String routingTypes) throws Exception; - @Operation(desc = "add the provided routing type to an address", impact = MBeanOperationInfo.ACTION) - void addRoutingType(@Parameter(name = "name", desc = "The name of the address") String name, - @Parameter(name = "routingType", desc = "The routing types to be added to this address, options are 'anycast' or 'multicast'") String routingType) throws Exception; - - @Operation(desc = "remove the provided routing type to an address", impact = MBeanOperationInfo.ACTION) - void removeRoutingType(@Parameter(name = "name", desc = "The name of the address") String name, - @Parameter(name = "routingType", desc = "The routing types to be added to this address, options are 'anycast' or 'multicast'") String routingType) throws Exception; + @Operation(desc = "update an address", impact = MBeanOperationInfo.ACTION) + String updateAddress(@Parameter(name = "name", desc = "The name of the address") String name, + @Parameter(name = "routingTypes", desc = "Comma separated list of Routing Types (anycast/multicast)") String routingTypes) throws Exception; @Operation(desc = "delete an address", impact = MBeanOperationInfo.ACTION) void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception; @@ -571,9 +567,10 @@ public interface ActiveMQServerControl { * @param maxConsumers the maximum number of consumers allowed on this queue at any one time * @param deleteOnNoConsumers delete this queue when the last consumer disconnects * @param autoCreateAddress create an address with default values should a matching address not be found + * @return a textual summary of the queue * @throws Exception */ - void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, + String createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType, @Parameter(name = "name", desc = "Name of the queue") String name, @Parameter(name = "filter", desc = "Filter of the queue") String filterStr, @@ -582,6 +579,22 @@ public interface ActiveMQServerControl { @Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers, @Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception; + /** + * Update a queue. + * + * @param name name of the queue + * @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST} + * @param maxConsumers the maximum number of consumers allowed on this queue at any one time + * @param deleteOnNoConsumers delete this queue when the last consumer disconnects + * @return a textual summary of the queue + * @throws Exception + */ + String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name, + @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType, + @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers, + @Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean deleteOnNoConsumers) throws Exception; + + /** * Deploy a durable queue. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 382b3e364d..c43f509218 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -60,6 +60,7 @@ import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration; import org.apache.activemq.artemis.core.config.DivertConfiguration; +import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager; import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -562,8 +563,58 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } } + private enum AddressInfoTextFormatter { + Long { + @Override + public StringBuilder format(AddressInfo addressInfo, StringBuilder output) { + output.append("Address [name=").append(addressInfo.getName()); + output.append(", routingTypes={"); + final Set routingTypes = addressInfo.getRoutingTypes(); + if (!routingTypes.isEmpty()) { + for (RoutingType routingType : routingTypes) { + output.append(routingType).append(','); + } + // delete hanging comma + output.deleteCharAt(output.length() - 1); + } + output.append('}'); + output.append(", autoCreated=").append(addressInfo.isAutoCreated()); + output.append(']'); + return output; + } + }; + + public abstract StringBuilder format(AddressInfo addressInfo, StringBuilder output); + } + + public enum QueueTextFormatter { + Long { + @Override + StringBuilder format(Queue queue, StringBuilder output) { + output.append("Queue [name=").append(queue.getName()); + output.append(", address=").append(queue.getAddress()); + output.append(", routingType=").append(queue.getRoutingType()); + final Filter filter = queue.getFilter(); + if (filter != null) { + output.append(", filter=").append(filter.getFilterString()); + } + output.append(", durable=").append(queue.isDurable()); + final int maxConsumers = queue.getMaxConsumers(); + if (maxConsumers != Queue.MAX_CONSUMERS_UNLIMITED) { + output.append(", maxConsumers=").append(queue.getMaxConsumers()); + } + output.append(", deleteOnNoConsumers=").append(queue.isDeleteOnNoConsumers()); + output.append(", autoCreateAddress=").append(queue.isAutoCreated()); + output.append(']'); + return output; + } + }; + + abstract StringBuilder format(Queue queue, StringBuilder output); + } + @Override - public void createAddress(String name, String routingTypes) throws Exception { + public String createAddress(String name, String routingTypes) throws Exception { checkStarted(); clearIO(); @@ -572,33 +623,38 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active for (String routingType : toList(routingTypes)) { set.add(RoutingType.valueOf(routingType)); } - server.createAddressInfo(new AddressInfo(new SimpleString(name), set)); + final AddressInfo addressInfo = new AddressInfo(new SimpleString(name), set); + if (server.createAddressInfo(addressInfo)) { + return AddressInfoTextFormatter.Long.format(addressInfo, new StringBuilder()).toString(); + } else { + return ""; + } } finally { blockOnIO(); } } @Override - public void addRoutingType(String name, String routingTypeName) throws Exception { + public String updateAddress(String name, String routingTypes) throws Exception { checkStarted(); clearIO(); try { - final RoutingType routingType = RoutingType.valueOf(routingTypeName); - server.addRoutingType(name, routingType); - } finally { - blockOnIO(); - } - } - - @Override - public void removeRoutingType(String name, String routingTypeName) throws Exception { - checkStarted(); - - clearIO(); - try { - final RoutingType routingType = RoutingType.valueOf(routingTypeName); - server.removeRoutingType(name, routingType); + final Set routingTypeSet; + if (routingTypes == null) { + routingTypeSet = null; + } else { + routingTypeSet = new HashSet<>(); + final String[] routingTypeNames = routingTypes.split(","); + for (String routingTypeName : routingTypeNames) { + routingTypeSet.add(RoutingType.valueOf(routingTypeName)); + } + } + final AddressInfo updatedAddressInfo = server.updateAddressInfo(name, routingTypeSet); + if (updatedAddressInfo == null) { + throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(name)); + } + return AddressInfoTextFormatter.Long.format(updatedAddressInfo, new StringBuilder()).toString(); } finally { blockOnIO(); } @@ -672,14 +728,14 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } @Override - public void createQueue(String address, - String routingType, - String name, - String filterStr, - boolean durable, - int maxConsumers, - boolean deleteOnNoConsumers, - boolean autoCreateAddress) throws Exception { + public String createQueue(String address, + String routingType, + String name, + String filterStr, + boolean durable, + int maxConsumers, + boolean deleteOnNoConsumers, + boolean autoCreateAddress) throws Exception { checkStarted(); clearIO(); @@ -690,12 +746,32 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active filter = new SimpleString(filterStr); } - server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress); + final Queue queue = server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress); + return QueueTextFormatter.Long.format(queue, new StringBuilder()).toString(); } finally { blockOnIO(); } } + @Override + public String updateQueue(String name, + String routingType, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception { + checkStarted(); + + clearIO(); + + try { + final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, maxConsumers, deleteOnNoConsumers); + if (queue == null) { + throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(new SimpleString(name)); + } + return QueueTextFormatter.Long.format(queue, new StringBuilder()).toString(); + } finally { + blockOnIO(); + } + } @Override public String[] getQueueNames() { @@ -804,7 +880,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active if (addressInfo == null) { throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(address)); } else { - return addressInfo.toString(); + return AddressInfoTextFormatter.Long.format(addressInfo, new StringBuilder()).toString(); } } finally { blockOnIO(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java index a5a1109373..6a492076d9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java @@ -16,9 +16,9 @@ */ package org.apache.activemq.artemis.core.postoffice; +import java.util.Collection; import java.util.Map; import java.util.Set; -import java.util.function.BiFunction; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.RoutingType; @@ -64,14 +64,14 @@ public interface AddressManager { */ boolean addAddressInfo(AddressInfo addressInfo); - AddressInfo updateAddressInfoIfPresent(SimpleString addressName, - BiFunction remappingFunction); + AddressInfo updateAddressInfo(SimpleString addressName, + Collection routingTypes) throws Exception; /** * @param addressInfo - * @return true if the address was added, false if it was updated + * @return the same provided {@code addressInfo} if the address was added, another if it was updated */ - boolean addOrUpdateAddressInfo(AddressInfo addressInfo); + AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo); AddressInfo removeAddressInfo(SimpleString address); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index cb787c7e30..c81b14d2b2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -16,11 +16,11 @@ */ package org.apache.activemq.artemis.core.postoffice; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQComponent; @@ -53,20 +53,25 @@ public interface PostOffice extends ActiveMQComponent { /** * @param addressInfo - * @return true if the address was added, false if it was updated + * @return the same provided {@code addressInfo} if the address was added, another if it was updated */ - boolean addOrUpdateAddressInfo(AddressInfo addressInfo); + AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo); AddressInfo removeAddressInfo(SimpleString address) throws Exception; AddressInfo getAddressInfo(SimpleString address); - void addRoutingType(SimpleString addressName, RoutingType routingType) throws ActiveMQAddressDoesNotExistException; + AddressInfo updateAddressInfo(SimpleString addressName, Collection routingTypes) throws Exception; - void removeRoutingType(SimpleString addressName, RoutingType routingType) throws Exception; + QueueBinding updateQueue(SimpleString name, + RoutingType routingType, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception; List listQueuesForAddress(SimpleString address) throws Exception; + + void addBinding(Binding binding) throws Exception; Binding removeBinding(SimpleString uniqueName, Transaction tx, boolean deleteData) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index a1e6a219e2..4103a82778 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.postoffice.impl; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -32,7 +33,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; @@ -440,53 +440,68 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override - public boolean addOrUpdateAddressInfo(AddressInfo addressInfo) { + public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) { synchronized (addressLock) { - boolean result = addressManager.addOrUpdateAddressInfo(addressInfo); + final AddressInfo updatedAddressInfo = addressManager.addOrUpdateAddressInfo(addressInfo); // only register address if it is newly added - if (result) { + final boolean isNew = updatedAddressInfo == addressInfo; + if (isNew) { try { managementService.registerAddress(addressInfo); } catch (Exception e) { e.printStackTrace(); } } - return result; + return updatedAddressInfo; } } @Override - public void addRoutingType(SimpleString addressName, RoutingType routingType) throws ActiveMQAddressDoesNotExistException { + public QueueBinding updateQueue(SimpleString name, + RoutingType routingType, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception { synchronized (addressLock) { - final AddressInfo updateAddressInfo = addressManager.updateAddressInfoIfPresent(addressName, (name, addressInfo) -> { - addressInfo.getRoutingTypes().add(routingType); - return addressInfo; - }); - if (updateAddressInfo == null) { - throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName); + final QueueBinding queueBinding = (QueueBinding) addressManager.getBinding(name); + if (queueBinding == null) { + return null; } - } - } - - @Override - public void removeRoutingType(SimpleString addressName, RoutingType routingType) throws Exception { - synchronized (addressLock) { - if (RoutingType.MULTICAST.equals(routingType)) { - final Bindings bindings = addressManager.getBindingsForRoutingAddress(addressName); - if (bindings != null) { - final boolean existsQueueBindings = bindings.getBindings().stream().anyMatch(QueueBinding.class::isInstance); - if (existsQueueBindings) { - throw ActiveMQMessageBundle.BUNDLE.invalidMulticastRoutingTypeDelete(); - } + final Queue queue = queueBinding.getQueue(); + //TODO put the whole update logic on Queue + //validate update + if (maxConsumers != null) { + final int consumerCount = queue.getConsumerCount(); + if (consumerCount > maxConsumers) { + throw ActiveMQMessageBundle.BUNDLE.invalidMaxConsumersUpdate(name.toString(), maxConsumers, consumerCount); } } - final AddressInfo updateAddressInfo = addressManager.updateAddressInfoIfPresent(addressName, (name, addressInfo) -> { - addressInfo.getRoutingTypes().remove(routingType); - return addressInfo; - }); - if (updateAddressInfo == null) { - throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName); + if (routingType != null) { + final SimpleString address = queue.getAddress(); + final AddressInfo addressInfo = addressManager.getAddressInfo(address); + final Set addressRoutingTypes = addressInfo.getRoutingTypes(); + if (!addressRoutingTypes.contains(routingType)) { + throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeUpdate(name.toString(), routingType, address.toString(), addressRoutingTypes); + } } + //atomic update + if (maxConsumers != null) { + queue.setMaxConsumer(maxConsumers); + } + if (routingType != null) { + queue.setRoutingType(routingType); + } + if (deleteOnNoConsumers != null) { + queue.setDeleteOnNoConsumers(deleteOnNoConsumers); + } + return queueBinding; + } + } + + @Override + public AddressInfo updateAddressInfo(SimpleString addressName, + Collection routingTypes) throws Exception { + synchronized (addressLock) { + return addressManager.updateAddressInfo(addressName, routingTypes); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java index 16849e80b3..98d2c94597 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java @@ -16,12 +16,13 @@ */ package org.apache.activemq.artemis.core.postoffice.impl; +import java.util.Collection; +import java.util.EnumSet; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.function.BiFunction; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.Address; @@ -29,6 +30,7 @@ import org.apache.activemq.artemis.core.postoffice.AddressManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.BindingsFactory; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; @@ -220,26 +222,48 @@ public class SimpleAddressManager implements AddressManager { } @Override - public AddressInfo updateAddressInfoIfPresent(SimpleString addressName, - BiFunction remappingFunction) { - return addressInfoMap.computeIfPresent(addressName, remappingFunction); + public AddressInfo updateAddressInfo(SimpleString addressName, + Collection routingTypes) throws Exception { + if (routingTypes == null) { + return this.addressInfoMap.get(addressName); + } else { + return this.addressInfoMap.computeIfPresent(addressName, (name, oldAddressInfo) -> { + validateRoutingTypes(name, routingTypes); + final Set updatedRoutingTypes = EnumSet.copyOf(routingTypes); + oldAddressInfo.setRoutingTypes(updatedRoutingTypes); + return oldAddressInfo; + }); + } } - @Override - public boolean addOrUpdateAddressInfo(AddressInfo addressInfo) { - boolean isNew = addAddressInfo(addressInfo); - - // address already exists so update it - if (!isNew) { - AddressInfo toUpdate = getAddressInfo(addressInfo.getName()); - synchronized (toUpdate) { - for (RoutingType routingType : addressInfo.getRoutingTypes()) { - toUpdate.addRoutingType(routingType); + private void validateRoutingTypes(SimpleString addressName, Collection routingTypes) { + final Bindings bindings = this.mappings.get(addressName); + if (bindings != null) { + for (Binding binding : bindings.getBindings()) { + if (binding instanceof QueueBinding) { + final QueueBinding queueBinding = (QueueBinding) binding; + final RoutingType routingType = queueBinding.getQueue().getRoutingType(); + if (!routingTypes.contains(routingType)) { + throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeDelete(routingType, addressName.toString()); + } } } } + } - return isNew; + @Override + public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) { + return this.addressInfoMap.compute(addressInfo.getName(), (name, oldAddressInfo) -> { + if (oldAddressInfo != null) { + final Set routingTypes = addressInfo.getRoutingTypes(); + validateRoutingTypes(name, routingTypes); + final Set updatedRoutingTypes = EnumSet.copyOf(routingTypes); + oldAddressInfo.setRoutingTypes(updatedRoutingTypes); + return oldAddressInfo; + } else { + return addressInfo; + } + }); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index 1bc1441f10..8cf3845c78 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -414,6 +414,17 @@ public interface ActiveMQMessageBundle { @Message(id = 119208, value = "Invalid routing type {0}", format = Message.Format.MESSAGE_FORMAT) IllegalArgumentException invalidRoutingType(String val); - @Message(id = 119209, value = "Can't remove MULTICAST routing type, queues exists. Please delete queues before removing this routing type.") - IllegalStateException invalidMulticastRoutingTypeDelete(); + @Message(id = 119209, value = "Can''t remove routing type {0}, queues exists for address: {1}. Please delete queues before removing this routing type.", format = Message.Format.MESSAGE_FORMAT) + IllegalStateException invalidRoutingTypeDelete(RoutingType routingType, String address); + + @Message(id = 119210, value = "Can''t update queue {0} with maxConsumers: {1}. Current consumers are {2}.", format = Message.Format.MESSAGE_FORMAT) + IllegalStateException invalidMaxConsumersUpdate(String queueName, int maxConsumers, int consumers); + + @Message(id = 119211, value = "Can''t update queue {0} with routing type: {1}, Supported routing types for address: {2} are {3}", format = Message.Format.MESSAGE_FORMAT) + IllegalStateException invalidRoutingTypeUpdate(String queueName, + RoutingType routingType, + String address, + Set supportedRoutingTypes); + + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index f90a697010..4450267b3f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -17,13 +17,13 @@ package org.apache.activemq.artemis.core.server; import javax.management.MBeanServer; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.Configuration; @@ -422,6 +422,11 @@ public interface ActiveMQServer extends ActiveMQComponent { boolean deleteOnNoConsumers, boolean autoCreateAddress) throws Exception; + Queue updateQueue(String name, + RoutingType routingType, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception; + /* * add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will * replace any factories with the same protocol @@ -455,28 +460,11 @@ public interface ActiveMQServer extends ActiveMQComponent { void removeClientConnection(String clientId); - /** - * Add the {@code routingType} from the specified {@code address}. - * - * @param address the address name - * @param routingType the routing type to be added - * @throws ActiveMQAddressDoesNotExistException - */ - void addRoutingType(String address, RoutingType routingType) throws ActiveMQAddressDoesNotExistException; - - /** - * Remove the {@code routingType} from the specified {@code address}. - * - * @param address the address name - * @param routingType the routing type to be removed - * @throws ActiveMQAddressDoesNotExistException - * @throws IllegalStateException when a binding already exists and is requested to remove {@link org.apache.activemq.artemis.core.server.RoutingType#MULTICAST}. - */ - void removeRoutingType(String address, RoutingType routingType) throws Exception; + AddressInfo updateAddressInfo(String name, Collection routingTypes) throws Exception; boolean createAddressInfo(AddressInfo addressInfo) throws Exception; - boolean createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception; + AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception; void removeAddressInfo(SimpleString address, SecurityAuth session) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index cf044f1222..bfb2c8424f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -54,8 +54,12 @@ public interface Queue extends Bindable { boolean isDeleteOnNoConsumers(); + void setDeleteOnNoConsumers(boolean value); + int getMaxConsumers(); + void setMaxConsumer(int maxConsumers); + void addConsumer(Consumer consumer) throws Exception; void removeConsumer(Consumer consumer); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 3d8b9a9917..f186614f76 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -27,6 +27,7 @@ import java.net.URL; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -48,7 +49,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; -import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; @@ -2403,15 +2403,21 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override - public void addRoutingType(String address, RoutingType routingType) throws ActiveMQAddressDoesNotExistException { + public AddressInfo updateAddressInfo(String address, Collection routingTypes) throws Exception { final SimpleString addressName = new SimpleString(address); - postOffice.addRoutingType(addressName,routingType); - } - - @Override - public void removeRoutingType(String address, RoutingType routingType) throws Exception { - final SimpleString addressName = new SimpleString(address); - postOffice.removeRoutingType(addressName,routingType); + //after the postOffice call, updatedAddressInfo could change further (concurrently)! + final AddressInfo updatedAddressInfo = postOffice.updateAddressInfo(addressName, routingTypes); + if (updatedAddressInfo != null) { + //it change the address info without any lock! + final long txID = storageManager.generateID(); + try { + storageManager.deleteAddressBinding(txID, updatedAddressInfo.getId()); + storageManager.addAddressBinding(txID, updatedAddressInfo); + } finally { + storageManager.commitBindings(txID); + } + } + return updatedAddressInfo; } @Override @@ -2430,21 +2436,21 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override - public boolean createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception { - boolean result = postOffice.addOrUpdateAddressInfo(addressInfo); + public AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception { + final AddressInfo updatedAddressInfo = postOffice.addOrUpdateAddressInfo(addressInfo); + final boolean isNew = updatedAddressInfo == addressInfo; - long txID = storageManager.generateID(); - if (result) { + final long txID = storageManager.generateID(); + if (isNew) { storageManager.addAddressBinding(txID, addressInfo); storageManager.commitBindings(txID); } else { - AddressInfo updatedAddressInfo = getAddressInfo(addressInfo.getName()); storageManager.deleteAddressBinding(txID, updatedAddressInfo.getId()); storageManager.addAddressBinding(txID, updatedAddressInfo); storageManager.commitBindings(txID); } - return result; + return updatedAddressInfo; } @Override @@ -2510,12 +2516,12 @@ public class ActiveMQServerImpl implements ActiveMQServer { queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, addressName); } - AddressInfo defaultAddressInfo = new AddressInfo(addressName); - defaultAddressInfo.addRoutingType(routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType); AddressInfo info = postOffice.getAddressInfo(addressName); if (autoCreateAddress) { if (info == null || !info.getRoutingTypes().contains(routingType)) { + final AddressInfo defaultAddressInfo = new AddressInfo(addressName); + defaultAddressInfo.addRoutingType(routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType); createOrUpdateAddressInfo(defaultAddressInfo.setAutoCreated(true)); } } else if (info == null) { @@ -2571,6 +2577,31 @@ public class ActiveMQServerImpl implements ActiveMQServer { return queue; } + @Override + public Queue updateQueue(String name, + RoutingType routingType, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception { + final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, maxConsumers, deleteOnNoConsumers); + if (queueBinding != null) { + final Queue queue = queueBinding.getQueue(); + if (queue.isDurable()) { + final long txID = storageManager.generateID(); + try { + storageManager.deleteQueueBinding(txID, queueBinding.getID()); + storageManager.addQueueBinding(txID, queueBinding); + storageManager.commitBindings(txID); + } catch (Throwable throwable) { + storageManager.rollbackBindings(txID); + throw throwable; + } + } + return queue; + } else { + return null; + } + } + private void deployDiverts() throws Exception { for (DivertConfiguration config : configuration.getDivertConfigurations()) { deployDivert(config); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 4a06ce19af..3caa22c028 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -65,11 +65,11 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Consumer; -import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; @@ -240,15 +240,15 @@ public class QueueImpl implements Queue { private SlowConsumerReaperRunnable slowConsumerReaperRunnable; - private int maxConsumers; + private volatile int maxConsumers; - private boolean deleteOnNoConsumers; + private volatile boolean deleteOnNoConsumers; private final AddressInfo addressInfo; private final AtomicInteger noConsumers = new AtomicInteger(0); - private RoutingType routingType; + private volatile RoutingType routingType; /** * This is to avoid multi-thread races on calculating direct delivery, @@ -482,11 +482,21 @@ public class QueueImpl implements Queue { return deleteOnNoConsumers; } + @Override + public synchronized void setDeleteOnNoConsumers(boolean value) { + this.deleteOnNoConsumers = value; + } + @Override public int getMaxConsumers() { return maxConsumers; } + @Override + public synchronized void setMaxConsumer(int maxConsumers) { + this.maxConsumers = maxConsumers; + } + @Override public SimpleString getName() { return name; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index d4ec32ae3f..c68aeed685 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -41,10 +41,10 @@ import org.apache.activemq.artemis.core.message.BodyEncoder; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.server.Consumer; -import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; @@ -837,6 +837,16 @@ public class ScheduledDeliveryHandlerTest extends Assert { public class FakeQueueForScheduleUnitTest implements Queue { + @Override + public void setDeleteOnNoConsumers(boolean value) { + + } + + @Override + public void setMaxConsumer(int maxConsumers) { + + } + @Override public void unproposed(SimpleString groupID) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/AddressCommandTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/AddressCommandTest.java index 226cfbc28c..c44dba2c93 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/AddressCommandTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/AddressCommandTest.java @@ -18,16 +18,16 @@ package org.apache.activemq.artemis.tests.integration.cli; import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.text.MessageFormat; import java.util.EnumSet; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.cli.commands.AbstractAction; import org.apache.activemq.artemis.cli.commands.ActionContext; -import org.apache.activemq.artemis.cli.commands.address.AddRoutingType; import org.apache.activemq.artemis.cli.commands.address.CreateAddress; import org.apache.activemq.artemis.cli.commands.address.DeleteAddress; -import org.apache.activemq.artemis.cli.commands.address.RemoveRoutingType; import org.apache.activemq.artemis.cli.commands.address.ShowAddress; +import org.apache.activemq.artemis.cli.commands.address.UpdateAddress; import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; @@ -160,16 +160,16 @@ public class AddressCommandTest extends JMSTestBase { } @Test - public void testAddRoutingType() throws Exception { + public void testUpdateAddressRoutingTypes() throws Exception { final String addressName = "address"; final SimpleString address = new SimpleString(addressName); server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); - final AddRoutingType addRoutingType = new AddRoutingType(); - addRoutingType.setName(addressName); - addRoutingType.setRoutingType(RoutingType.MULTICAST.toString()); - addRoutingType.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); - checkExecutionPassed(addRoutingType); + final UpdateAddress updateAddress = new UpdateAddress(); + updateAddress.setName(addressName); + updateAddress.setRoutingTypes(RoutingType.MULTICAST.toString() + ',' + RoutingType.ANYCAST.toString()); + updateAddress.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + checkExecutionPassed(updateAddress); final AddressInfo addressInfo = server.getAddressInfo(address); assertNotNull(addressInfo); @@ -177,59 +177,29 @@ public class AddressCommandTest extends JMSTestBase { } @Test - public void testFailAddRoutingTypeAddressDoesNotExist() throws Exception { + public void testFailUpdateAddressDoesNotExist() throws Exception { final String addressName = "address"; - final AddRoutingType addRoutingType = new AddRoutingType(); - addRoutingType.setName(addressName); - addRoutingType.setRoutingType(RoutingType.MULTICAST.toString()); - addRoutingType.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); - checkExecutionFailure(addRoutingType, "Address Does Not Exist"); - final AddressInfo addressInfo = server.getAddressInfo(new SimpleString(addressName)); - assertNull(addressInfo); + final UpdateAddress updateAddress = new UpdateAddress(); + updateAddress.setName(addressName); + updateAddress.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + checkExecutionFailure(updateAddress, "Address Does Not Exist"); } @Test - public void testRemoveRoutingType() throws Exception { - final String addressName = "address"; - final SimpleString address = new SimpleString(addressName); - server.createAddressInfo(new AddressInfo(address, EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST))); - - final RemoveRoutingType removeRoutingType = new RemoveRoutingType(); - removeRoutingType.setName(addressName); - removeRoutingType.setRoutingType(RoutingType.MULTICAST.toString()); - removeRoutingType.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); - checkExecutionPassed(removeRoutingType); - - final AddressInfo addressInfo = server.getAddressInfo(new SimpleString(addressName)); - assertNotNull(addressInfo); - assertEquals(EnumSet.of(RoutingType.ANYCAST), addressInfo.getRoutingTypes()); - } - - @Test - public void testFailRemoveRoutingTypeAddressDoesNotExist() throws Exception { - final String addressName = "address"; - final RemoveRoutingType removeRoutingType = new RemoveRoutingType(); - removeRoutingType.setName(addressName); - removeRoutingType.setRoutingType(RoutingType.MULTICAST.toString()); - removeRoutingType.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); - checkExecutionFailure(removeRoutingType, "Address Does Not Exist"); - final AddressInfo addressInfo = server.getAddressInfo(new SimpleString(addressName)); - assertNull(addressInfo); - } - - @Test - public void testFailRemoveMulticastRoutingTypeWhenExistsQueues() throws Exception { + public void testFailUpdateAddressRoutingTypesWhenExistsQueues() throws Exception { final String addressName = "address"; final SimpleString addressSimpleString = new SimpleString(addressName); final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST)); server.createAddressInfo(addressInfo); server.createQueue(addressSimpleString, RoutingType.MULTICAST, new SimpleString("queue1"), null, true, false); - final RemoveRoutingType removeRoutingType = new RemoveRoutingType(); - removeRoutingType.setName(addressName); - removeRoutingType.setRoutingType(RoutingType.MULTICAST.toString()); - removeRoutingType.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); - checkExecutionFailure(removeRoutingType, "Can't remove MULTICAST routing type, queues exists. Please delete queues before removing this routing type."); + final UpdateAddress updateAddress = new UpdateAddress(); + updateAddress.setName(addressName); + updateAddress.setRoutingTypes(RoutingType.ANYCAST.toString()); + updateAddress.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + + final String expectedErrorMessage = MessageFormat.format("Can''t remove routing type {0}, queues exists for address: {1}. Please delete queues before removing this routing type.", RoutingType.MULTICAST, addressName); + checkExecutionFailure(updateAddress, expectedErrorMessage); } private void checkExecutionPassed(AbstractAction command) throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/QueueCommandTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/QueueCommandTest.java index b7f1bd62e9..58f0019938 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/QueueCommandTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/QueueCommandTest.java @@ -18,6 +18,9 @@ package org.apache.activemq.artemis.tests.integration.cli; import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.text.MessageFormat; +import java.util.EnumSet; +import java.util.Set; import java.util.UUID; import org.apache.activemq.artemis.api.core.SimpleString; @@ -25,7 +28,9 @@ import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.cli.commands.queue.CreateQueue; import org.apache.activemq.artemis.cli.commands.queue.DeleteQueue; import org.apache.activemq.artemis.cli.commands.AbstractAction; +import org.apache.activemq.artemis.cli.commands.queue.UpdateQueue; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.tests.util.JMSTestBase; @@ -224,6 +229,107 @@ public class QueueCommandTest extends JMSTestBase { assertNull(server.getAddressInfo(queueName)); } + @Test + public void testUpdateCoreQueue() throws Exception { + final String queueName = "updateQueue"; + final SimpleString queueNameString = new SimpleString(queueName); + final String addressName = "address"; + final SimpleString addressSimpleString = new SimpleString(addressName); + final int oldMaxConsumers = -1; + final RoutingType oldRoutingType = RoutingType.MULTICAST; + final boolean oldDeleteOnNoConsumers = false; + final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST)); + server.createAddressInfo(addressInfo); + server.createQueue(addressSimpleString, oldRoutingType, queueNameString, null, true, false, oldMaxConsumers, oldDeleteOnNoConsumers, false); + + final int newMaxConsumers = 1; + final RoutingType newRoutingType = RoutingType.ANYCAST; + final boolean newDeleteOnNoConsumers = true; + final UpdateQueue updateQueue = new UpdateQueue(); + updateQueue.setName(queueName); + updateQueue.setDeleteOnNoConsumers(newDeleteOnNoConsumers); + updateQueue.setRoutingType(newRoutingType.name()); + updateQueue.setMaxConsumers(newMaxConsumers); + updateQueue.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + + checkExecutionPassed(updateQueue); + + final QueueQueryResult queueQueryResult = server.queueQuery(queueNameString); + assertEquals("maxConsumers", newMaxConsumers, queueQueryResult.getMaxConsumers()); + assertEquals("routingType", newRoutingType, queueQueryResult.getRoutingType()); + assertTrue("deleteOnNoConsumers", newDeleteOnNoConsumers == queueQueryResult.isDeleteOnNoConsumers()); + } + + @Test + public void testUpdateCoreQueueCannotChangeRoutingType() throws Exception { + final String queueName = "updateQueue"; + final SimpleString queueNameString = new SimpleString(queueName); + final String addressName = "address"; + final SimpleString addressSimpleString = new SimpleString(addressName); + final int oldMaxConsumers = 10; + final RoutingType oldRoutingType = RoutingType.MULTICAST; + final boolean oldDeleteOnNoConsumers = false; + final Set supportedRoutingTypes = EnumSet.of(oldRoutingType); + final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.copyOf(supportedRoutingTypes)); + server.createAddressInfo(addressInfo); + server.createQueue(addressSimpleString, oldRoutingType, queueNameString, null, true, false, oldMaxConsumers, oldDeleteOnNoConsumers, false); + + final RoutingType newRoutingType = RoutingType.ANYCAST; + final UpdateQueue updateQueue = new UpdateQueue(); + updateQueue.setName(queueName); + updateQueue.setRoutingType(newRoutingType.name()); + updateQueue.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + + final String expectedErrorMessage = MessageFormat.format("Can''t update queue {0} with routing type: {1}, Supported routing types for address: {2} are {3}", queueName, newRoutingType, addressName, supportedRoutingTypes); + checkExecutionFailure(updateQueue, expectedErrorMessage); + + final QueueQueryResult queueQueryResult = server.queueQuery(queueNameString); + assertEquals("maxConsumers", oldMaxConsumers, queueQueryResult.getMaxConsumers()); + assertEquals("routingType", oldRoutingType, queueQueryResult.getRoutingType()); + assertTrue("deleteOnNoConsumers", oldDeleteOnNoConsumers == queueQueryResult.isDeleteOnNoConsumers()); + } + + @Test + public void testUpdateCoreQueueCannotLowerMaxConsumers() throws Exception { + final String queueName = "updateQueue"; + final SimpleString queueNameString = new SimpleString(queueName); + final String addressName = "address"; + final SimpleString addressSimpleString = new SimpleString(addressName); + final int oldMaxConsumers = 2; + final RoutingType oldRoutingType = RoutingType.MULTICAST; + final boolean oldDeleteOnNoConsumers = false; + final AddressInfo addressInfo = new AddressInfo(addressSimpleString, oldRoutingType); + server.createAddressInfo(addressInfo); + server.createQueue(addressSimpleString, oldRoutingType, queueNameString, null, true, false, oldMaxConsumers, oldDeleteOnNoConsumers, false); + + server.locateQueue(queueNameString).addConsumer(new DummyServerConsumer()); + server.locateQueue(queueNameString).addConsumer(new DummyServerConsumer()); + + final int newMaxConsumers = 1; + final UpdateQueue updateQueue = new UpdateQueue(); + updateQueue.setName(queueName); + updateQueue.setMaxConsumers(newMaxConsumers); + updateQueue.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + + final String expectedErrorMessage = MessageFormat.format("Can''t update queue {0} with maxConsumers: {1}. Current consumers are {2}.", queueName, newMaxConsumers, 2); + checkExecutionFailure(updateQueue, expectedErrorMessage); + + final QueueQueryResult queueQueryResult = server.queueQuery(queueNameString); + assertEquals("maxConsumers", oldMaxConsumers, queueQueryResult.getMaxConsumers()); + } + + @Test + public void testUpdateCoreQueueDoesNotExist() throws Exception { + SimpleString queueName = new SimpleString("updateQueue"); + + UpdateQueue updateQueue = new UpdateQueue(); + updateQueue.setName(queueName.toString()); + updateQueue.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + checkExecutionFailure(updateQueue, "AMQ119017: Queue " + queueName + " does not exist"); + + assertFalse(server.queueQuery(queueName).isExists()); + } + private void checkExecutionPassed(AbstractAction command) throws Exception { String fullMessage = output.toString(); System.out.println("output: " + fullMessage); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index 58bb3fdcf8..f74b090583 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -60,13 +60,8 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes return new ActiveMQServerControl() { @Override - public void addRoutingType(String name, String routingType) throws Exception { - proxy.invokeOperation("addRoutingType", name, routingType); - } - - @Override - public void removeRoutingType(String name, String routingType) throws Exception { - proxy.invokeOperation("removeRoutingType", name, routingType); + public String updateAddress(String name, String routingTypes) throws Exception { + return (String) proxy.invokeOperation("updateAddress", name, routingTypes); } @Override @@ -112,17 +107,24 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes } @Override - public void createQueue(String address, - String routingType, - String name, - String filterStr, - boolean durable, - int maxConsumers, - boolean deleteOnNoConsumers, - boolean autoCreateAddress) throws Exception { - proxy.invokeOperation("createQueue", address, routingType, name, filterStr, durable, maxConsumers, deleteOnNoConsumers, autoCreateAddress); + public String createQueue(String address, + String routingType, + String name, + String filterStr, + boolean durable, + int maxConsumers, + boolean deleteOnNoConsumers, + boolean autoCreateAddress) throws Exception { + return (String) proxy.invokeOperation("createQueue", address, routingType, name, filterStr, durable, maxConsumers, deleteOnNoConsumers, autoCreateAddress); } + @Override + public String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name, + @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType, + @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers, + @Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean deleteOnNoConsumers) throws Exception { + return (String) proxy.invokeOperation("updateQueue", name, routingType, maxConsumers, deleteOnNoConsumers); + } @Override public void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception { @@ -591,8 +593,8 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes } @Override - public void createAddress(String name, String routingTypes) throws Exception { - proxy.invokeOperation("createAddress", name, routingTypes); + public String createAddress(String name, String routingTypes) throws Exception { + return (String) proxy.invokeOperation("createAddress", name, routingTypes); } @Override diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 0a72178075..bc7d61a8ae 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -26,10 +26,10 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.server.Consumer; -import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -38,6 +38,16 @@ import org.apache.activemq.artemis.utils.ReferenceCounter; public class FakeQueue implements Queue { + @Override + public void setDeleteOnNoConsumers(boolean value) { + + } + + @Override + public void setMaxConsumer(int maxConsumers) { + + } + @Override public boolean isInternalQueue() { // no-op diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index dc5fedfce3..39271c4fc9 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -16,11 +16,11 @@ */ package org.apache.activemq.artemis.tests.unit.core.server.impl.fakes; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl; import org.apache.activemq.artemis.core.server.MessageReference; @@ -42,14 +43,17 @@ import org.apache.activemq.artemis.core.transaction.Transaction; public class FakePostOffice implements PostOffice { @Override - public void addRoutingType(SimpleString addressName, - RoutingType routingType) throws ActiveMQAddressDoesNotExistException { - + public QueueBinding updateQueue(SimpleString name, + RoutingType routingType, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception { + return null; } @Override - public void removeRoutingType(SimpleString addressName, RoutingType routingType) throws Exception { - + public AddressInfo updateAddressInfo(SimpleString addressName, + Collection routingTypes) throws Exception { + return null; } @Override @@ -90,8 +94,8 @@ public class FakePostOffice implements PostOffice { } @Override - public boolean addOrUpdateAddressInfo(AddressInfo addressInfo) { - return false; + public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) { + return null; }