diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java index 17c4457010..94779fc6f2 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java @@ -30,9 +30,11 @@ import org.apache.activemq.artemis.cli.commands.Kill; import org.apache.activemq.artemis.cli.commands.Mask; import org.apache.activemq.artemis.cli.commands.Run; import org.apache.activemq.artemis.cli.commands.Stop; -import org.apache.activemq.artemis.cli.commands.destination.CreateDestination; -import org.apache.activemq.artemis.cli.commands.destination.DeleteDestination; -import org.apache.activemq.artemis.cli.commands.destination.HelpDestination; +import org.apache.activemq.artemis.cli.commands.address.CreateAddress; +import org.apache.activemq.artemis.cli.commands.address.DeleteAddress; +import org.apache.activemq.artemis.cli.commands.queue.CreateQueue; +import org.apache.activemq.artemis.cli.commands.queue.DeleteQueue; +import org.apache.activemq.artemis.cli.commands.queue.HelpQueue; import org.apache.activemq.artemis.cli.commands.messages.Browse; import org.apache.activemq.artemis.cli.commands.messages.Consumer; import org.apache.activemq.artemis.cli.commands.messages.Producer; @@ -128,8 +130,11 @@ public class Artemis { String instance = artemisInstance != null ? artemisInstance.getAbsolutePath() : System.getProperty("artemis.instance"); Cli.CliBuilder builder = Cli.builder("artemis").withDescription("ActiveMQ Artemis Command Line").withCommand(HelpAction.class).withCommand(Producer.class).withCommand(Consumer.class).withCommand(Browse.class).withCommand(Mask.class).withDefaultCommand(HelpAction.class); - builder.withGroup("destination").withDescription("Destination tools group (create|delete) (example ./artemis destination create)"). - withDefaultCommand(HelpDestination.class).withCommands(CreateDestination.class, DeleteDestination.class); + builder.withGroup("queue").withDescription("Queue tools group (create|delete) (example ./artemis queue create)"). + withDefaultCommand(HelpQueue.class).withCommands(CreateQueue.class, DeleteQueue.class); + + builder.withGroup("address").withDescription("Queue tools group (create|delete) (example ./artemis queue create)"). + withDefaultCommand(HelpQueue.class).withCommands(CreateAddress.class, DeleteAddress.class); if (instance != null) { builder.withGroup("data").withDescription("data tools group (print|exp|imp|exp|encode|decode|compact) (example ./artemis data print)"). diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/destination/DestinationAction.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java similarity index 53% rename from artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/destination/DestinationAction.java rename to artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java index 55353d9e30..b4dbba8295 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/destination/DestinationAction.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java @@ -14,12 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.cli.commands.destination; - -import javax.jms.Message; -import javax.jms.Queue; -import javax.jms.QueueRequestor; -import javax.jms.Session; +package org.apache.activemq.artemis.cli.commands; import io.airlift.airline.Option; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; @@ -29,51 +24,10 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.management.ManagementHelper; -import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; -import org.apache.activemq.artemis.api.jms.management.JMSManagementHelper; import org.apache.activemq.artemis.cli.commands.messages.ConnectionAbstract; -import org.apache.activemq.artemis.jms.client.ActiveMQConnection; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; -import org.apache.activemq.artemis.jms.client.ActiveMQSession; -public abstract class DestinationAction extends ConnectionAbstract { - - public static final String JMS_QUEUE = "jms-queue"; - public static final String JMS_TOPIC = "topic"; - public static final String CORE_QUEUE = "core-queue"; - - @Option(name = "--type", description = "type of destination to be created (one of jms-queue, topic and core-queue, default jms-queue") - String destType = JMS_QUEUE; - - @Option(name = "--name", description = "destination name") - String name; - - public void performJmsManagement(ManagementCallback cb) throws Exception { - - try (ActiveMQConnectionFactory factory = createConnectionFactory(); - ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); - ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { - - Queue managementQueue = ActiveMQJMSClient.createQueue("activemq.management"); - QueueRequestor requestor = new QueueRequestor(session, managementQueue); - - connection.start(); - - Message message = session.createMessage(); - - cb.setUpInvocation(message); - - Message reply = requestor.request(message); - - boolean result = JMSManagementHelper.hasOperationSucceeded(reply); - - if (result) { - cb.requestSuccessful(reply); - } else { - cb.requestFailed(reply); - } - } - } +public abstract class AbstractAction extends ConnectionAbstract { public void performCoreManagement(ManagementCallback cb) throws Exception { @@ -97,26 +51,6 @@ public abstract class DestinationAction extends ConnectionAbstract { } } - public void setName(String name) { - this.name = name; - } - - public String getName() { - if (name == null) { - name = input("--name", "Please provide the destination name:", ""); - } - - return name; - } - - public String getDestType() { - return destType; - } - - public void setDestType(String destType) { - this.destType = destType; - } - public interface ManagementCallback { void setUpInvocation(T message) throws Exception; diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/CreateAddress.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/CreateAddress.java new file mode 100644 index 0000000000..6c92dc64d0 --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/CreateAddress.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.cli.commands.address; + +import io.airlift.airline.Command; +import io.airlift.airline.Option; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.cli.commands.AbstractAction; +import org.apache.activemq.artemis.cli.commands.ActionContext; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; + +@Command(name = "create", description = "create an address") +public class CreateAddress extends AbstractAction { + + @Option(name = "--name", description = "The name of this address") + String name; + + @Option(name = "--routingType", description = "The routing type of the address, options are 'anycast' or 'multicast', defaults to 1 = 'multicast'") + String routingType = "multicast"; + + @Option(name = "--defaultMaxConsumers", description = "Sets the default max consumers for any queues created under this address, default = -1 (no limit)") + int defaultMaxConsumers = -1; + + @Option(name = "--defaultDeleteOnNoConsumers", description = "Sets the default delete on no consumers for any queues created under this address, default = false") + boolean defaultDeleteOnNoConsumers = false; + + @Override + public Object execute(ActionContext context) throws Exception { + super.execute(context); + createAddress(context); + return null; + } + + private void createAddress(final ActionContext context) throws Exception { + performCoreManagement(new ManagementCallback() { + @Override + public void setUpInvocation(ClientMessage message) throws Exception { + ManagementHelper.putOperationInvocation(message, "broker", "createAddress", getName(), routingType, defaultDeleteOnNoConsumers, defaultMaxConsumers); + } + + @Override + public void requestSuccessful(ClientMessage reply) throws Exception { + context.out.println("Address " + getName() + " created successfully."); + } + + @Override + public void requestFailed(ClientMessage reply) throws Exception { + String errMsg = (String) ManagementHelper.getResult(reply, String.class); + context.err.println("Failed to create address " + getName() + ". Reason: " + errMsg); + } + }); + } + + public void setName(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public String getRoutingType() { + return routingType; + } + + public void setRoutingType(String routingType) { + this.routingType = routingType; + } + + public int getDefaultMaxConsumers() { + return defaultMaxConsumers; + } + + public void setDefaultMaxConsumers(int defaultMaxConsumers) { + this.defaultMaxConsumers = defaultMaxConsumers; + } + + public boolean getDefaultDeleteOnNoConsumers() { + return defaultDeleteOnNoConsumers; + } + + public void setDefaultDeleteOnNoConsumers(boolean defaultDeleteOnNoConsumers) { + this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers; + } +} diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/DeleteAddress.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/DeleteAddress.java new file mode 100644 index 0000000000..36c922451b --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/DeleteAddress.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.cli.commands.address; + +import io.airlift.airline.Command; +import io.airlift.airline.Option; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.cli.commands.AbstractAction; +import org.apache.activemq.artemis.cli.commands.ActionContext; + +@Command(name = "delete", description = "delete a queue") +public class DeleteAddress extends AbstractAction { + + @Option(name = "--name", description = "The name of this address") + String name; + + @Override + public Object execute(ActionContext context) throws Exception { + super.execute(context); + deleteAddress(context); + return null; + } + + private void deleteAddress(final ActionContext context) throws Exception { + performCoreManagement(new ManagementCallback() { + @Override + public void setUpInvocation(ClientMessage message) throws Exception { + ManagementHelper.putOperationInvocation(message, "broker", "deleteAddress", getName()); + } + + @Override + public void requestSuccessful(ClientMessage reply) throws Exception { + context.out.println("Address " + getName() + " deleted successfully."); + } + + @Override + public void requestFailed(ClientMessage reply) throws Exception { + String errMsg = (String) ManagementHelper.getResult(reply, String.class); + context.err.println("Failed to delete address " + getName() + ". Reason: " + errMsg); + } + }); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } +} diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/destination/HelpDestination.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/HelpAddress.java similarity index 90% rename from artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/destination/HelpDestination.java rename to artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/HelpAddress.java index 3455520f4a..c086c01abf 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/destination/HelpDestination.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/HelpAddress.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.activemq.artemis.cli.commands.destination; +package org.apache.activemq.artemis.cli.commands.address; import java.io.File; import java.util.ArrayList; @@ -25,7 +25,7 @@ import io.airlift.airline.Help; import org.apache.activemq.artemis.cli.commands.Action; import org.apache.activemq.artemis.cli.commands.ActionContext; -public class HelpDestination extends Help implements Action { +public class HelpAddress extends Help implements Action { @Override public boolean isVerbose() { @@ -49,7 +49,7 @@ public class HelpDestination extends Help implements Action { @Override public Object execute(ActionContext context) throws Exception { List commands = new ArrayList<>(1); - commands.add("destination"); + commands.add("queue"); help(global, commands); return null; } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/ShowAddress.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/ShowAddress.java new file mode 100644 index 0000000000..34331bbeae --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/ShowAddress.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.cli.commands.address; + +import io.airlift.airline.Command; +import io.airlift.airline.Option; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.cli.commands.AbstractAction; +import org.apache.activemq.artemis.cli.commands.ActionContext; + +@Command(name = "show", description = "delete a queue") +public class ShowAddress extends AbstractAction { + + @Option(name = "--name", description = "The name of this address") + String name; + + @Option(name = "--bindings", description = "Shows the bindings for this address") + boolean bindings; + + @Override + public Object execute(ActionContext context) throws Exception { + super.execute(context); + showAddress(context); + return null; + } + + private void showAddress(final ActionContext context) throws Exception { + performCoreManagement(new ManagementCallback() { + @Override + public void setUpInvocation(ClientMessage message) throws Exception { + if (bindings) { + ManagementHelper.putOperationInvocation(message, "broker", "listBindingsForAddress", getName()); + } + else { + ManagementHelper.putOperationInvocation(message, "broker", "getAddressInfo", getName()); + } + } + + @Override + public void requestSuccessful(ClientMessage reply) throws Exception { + String result = (String) ManagementHelper.getResult(reply, String.class); + context.out.println(result); + } + + @Override + public void requestFailed(ClientMessage reply) throws Exception { + String errMsg = (String) ManagementHelper.getResult(reply, String.class); + context.err.println("Failed to show address " + getName() + ". Reason: " + errMsg); + } + }); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public boolean isBindings() { + return bindings; + } + + public void setBindings(boolean bindings) { + this.bindings = bindings; + } +} diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/destination/CreateDestination.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/destination/CreateDestination.java deleted file mode 100644 index 4cbaaa66be..0000000000 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/destination/CreateDestination.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.cli.commands.destination; - -import javax.jms.Message; - -import io.airlift.airline.Command; -import io.airlift.airline.Option; -import org.apache.activemq.artemis.api.core.client.ClientMessage; -import org.apache.activemq.artemis.api.core.management.ManagementHelper; -import org.apache.activemq.artemis.api.jms.management.JMSManagementHelper; -import org.apache.activemq.artemis.cli.commands.ActionContext; - -@Command(name = "create", description = "create a queue or topic") -public class CreateDestination extends DestinationAction { - - @Option(name = "--filter", description = "queue's filter string (default null)") - String filter = null; - - @Option(name = "--address", description = "address of the core queue (default queue's name)") - String address; - - @Option(name = "--durable", description = "whether the queue is durable or not (default false)") - boolean durable = false; - - @Option(name = "--bindings", description = "comma separated jndi binding names (default null)") - String bindings = null; - - @Override - public Object execute(ActionContext context) throws Exception { - super.execute(context); - - if (JMS_QUEUE.equals(destType)) { - createJmsQueue(context); - } else if (CORE_QUEUE.equals(destType)) { - createCoreQueue(context); - } else if (JMS_TOPIC.equals(destType)) { - createJmsTopic(context); - } else { - throw new IllegalArgumentException("--type can only be one of " + JMS_QUEUE + ", " + JMS_TOPIC + " and " + CORE_QUEUE); - } - return null; - } - - private void createJmsTopic(final ActionContext context) throws Exception { - performJmsManagement(new ManagementCallback() { - @Override - public void setUpInvocation(Message message) throws Exception { - JMSManagementHelper.putOperationInvocation(message, "jms.server", "createTopic", getName(), bindings); - } - - @Override - public void requestSuccessful(Message reply) throws Exception { - boolean result = (boolean) JMSManagementHelper.getResult(reply, Boolean.class); - if (result) { - context.out.println("Topic " + getName() + " created successfully."); - } else { - context.err.println("Failed to create topic " + getName() + "."); - } - } - - @Override - public void requestFailed(Message reply) throws Exception { - String errorMsg = (String) JMSManagementHelper.getResult(reply, String.class); - context.err.println("Failed to create topic " + getName() + ". Reason: " + errorMsg); - } - }); - } - - public String getAddress() { - if (address == null || "".equals(address.trim())) { - address = getName(); - } - return address.trim(); - } - - private void createCoreQueue(final ActionContext context) throws Exception { - performCoreManagement(new ManagementCallback() { - @Override - public void setUpInvocation(ClientMessage message) throws Exception { - String address = getAddress(); - ManagementHelper.putOperationInvocation(message, "core.server", "createQueue", address, getName(), filter, durable); - } - - @Override - public void requestSuccessful(ClientMessage reply) throws Exception { - context.out.println("Core queue " + getName() + " created successfully."); - } - - @Override - public void requestFailed(ClientMessage reply) throws Exception { - String errMsg = (String) ManagementHelper.getResult(reply, String.class); - context.err.println("Failed to create queue " + getName() + ". Reason: " + errMsg); - } - }); - } - - private void createJmsQueue(final ActionContext context) throws Exception { - - performJmsManagement(new ManagementCallback() { - - @Override - public void setUpInvocation(Message message) throws Exception { - JMSManagementHelper.putOperationInvocation(message, "jms.server", "createQueue", getName(), bindings, filter, durable); - } - - @Override - public void requestSuccessful(Message reply) throws Exception { - boolean result = (boolean) JMSManagementHelper.getResult(reply, Boolean.class); - if (result) { - context.out.println("Jms queue " + getName() + " created successfully."); - } else { - context.err.println("Failed to create jms queue " + getName() + "."); - } - } - - @Override - public void requestFailed(Message reply) throws Exception { - String errorMsg = (String) JMSManagementHelper.getResult(reply, String.class); - context.err.println("Failed to create jms queue " + getName() + ". Reason: " + errorMsg); - } - }); - } - - public void setFilter(String filter) { - this.filter = filter; - } - - public void setBindings(String bindings) { - this.bindings = bindings; - } -} diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/destination/DeleteDestination.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/destination/DeleteDestination.java deleted file mode 100644 index 93dbf5e8b5..0000000000 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/destination/DeleteDestination.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.cli.commands.destination; - -import javax.jms.Message; - -import io.airlift.airline.Command; -import io.airlift.airline.Option; -import org.apache.activemq.artemis.api.core.client.ClientMessage; -import org.apache.activemq.artemis.api.core.management.ManagementHelper; -import org.apache.activemq.artemis.api.jms.management.JMSManagementHelper; -import org.apache.activemq.artemis.cli.commands.ActionContext; - -@Command(name = "delete", description = "delete a queue or topic") -public class DeleteDestination extends DestinationAction { - - @Option(name = "--removeConsumers", description = "whether deleting destination with consumers or not (default false)") - boolean removeConsumers = false; - - @Override - public Object execute(ActionContext context) throws Exception { - super.execute(context); - - if (JMS_QUEUE.equals(destType)) { - deleteJmsQueue(context); - } else if (CORE_QUEUE.equals(destType)) { - deleteCoreQueue(context); - } else if (JMS_TOPIC.equals(destType)) { - deleteJmsTopic(context); - } else { - throw new IllegalArgumentException("--type can only be one of " + JMS_QUEUE + ", " + JMS_TOPIC + " and " + CORE_QUEUE); - } - return null; - } - - private void deleteJmsTopic(final ActionContext context) throws Exception { - performJmsManagement(new ManagementCallback() { - @Override - public void setUpInvocation(Message message) throws Exception { - JMSManagementHelper.putOperationInvocation(message, "jms.server", "destroyTopic", getName(), removeConsumers); - } - - @Override - public void requestSuccessful(Message reply) throws Exception { - boolean result = (boolean) JMSManagementHelper.getResult(reply, Boolean.class); - if (result) { - context.out.println("Topic " + getName() + " deleted successfully."); - } else { - context.err.println("Failed to delete topic " + getName()); - } - } - - @Override - public void requestFailed(Message reply) throws Exception { - String errorMsg = (String) JMSManagementHelper.getResult(reply, String.class); - context.err.println("Failed to delete topic " + getName() + ". Reason: " + errorMsg); - } - }); - } - - private void deleteJmsQueue(final ActionContext context) throws Exception { - performJmsManagement(new ManagementCallback() { - @Override - public void setUpInvocation(Message message) throws Exception { - JMSManagementHelper.putOperationInvocation(message, "jms.server", "destroyQueue", getName(), removeConsumers); - } - - @Override - public void requestSuccessful(Message reply) throws Exception { - boolean result = (boolean) JMSManagementHelper.getResult(reply, Boolean.class); - if (result) { - context.out.println("Jms queue " + getName() + " deleted successfully."); - } else { - context.err.println("Failed to delete queue " + getName()); - } - } - - @Override - public void requestFailed(Message reply) throws Exception { - String errorMsg = (String) JMSManagementHelper.getResult(reply, String.class); - context.err.println("Failed to create " + getName() + " with reason: " + errorMsg); - } - }); - } - - private void deleteCoreQueue(final ActionContext context) throws Exception { - performCoreManagement(new ManagementCallback() { - @Override - public void setUpInvocation(ClientMessage message) throws Exception { - ManagementHelper.putOperationInvocation(message, "core.server", "destroyQueue", getName()); - } - - @Override - public void requestSuccessful(ClientMessage reply) throws Exception { - context.out.println("Queue " + getName() + " deleted successfully."); - } - - @Override - public void requestFailed(ClientMessage reply) throws Exception { - String errMsg = (String) ManagementHelper.getResult(reply, String.class); - context.err.println("Failed to delete queue " + getName() + ". Reason: " + errMsg); - } - }); - } - -} diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/CreateQueue.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/CreateQueue.java new file mode 100644 index 0000000000..76cea6e947 --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/CreateQueue.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.cli.commands.queue; + +import io.airlift.airline.Command; +import io.airlift.airline.Option; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.cli.commands.ActionContext; +import org.apache.activemq.artemis.cli.commands.AbstractAction; + +@Command(name = "create", description = "create a queue or topic") +public class CreateQueue extends AbstractAction { + + @Option(name = "--name", description = "queue name") + String name; + + @Option(name = "--filter", description = "queue's filter string (default null)") + String filter = null; + + @Option(name = "--address", description = "address of the queue (default queue's name)") + String address; + + @Option(name = "--durable", description = "whether the queue is durable or not (default false)") + boolean durable = false; + + @Option(name = "--deleteOnNoConsumers", description = "whether to delete this queue when it's last consumers disconnects)") + boolean deleteOnNoConsumers = false; + + @Option(name = "--maxConsumers", description = "Maximum number of consumers allowed on this queue at any one time (default no limit)") + int maxConsumers = -1; + + @Option(name = "--autoCreateAddress", description = "Auto create the address (if it doesn't exist) with default values") + boolean autoCreateAddress = false; + + @Override + public Object execute(ActionContext context) throws Exception { + super.execute(context); + createQueue(context); + return null; + } + + public String getAddress() { + if (address == null || "".equals(address.trim())) { + address = getName(); + } + return address.trim(); + } + + private void createQueue(final ActionContext context) throws Exception { + performCoreManagement(new ManagementCallback() { + @Override + public void setUpInvocation(ClientMessage message) throws Exception { + String address = getAddress(); + ManagementHelper.putOperationInvocation(message, "broker", "createQueue", address, getName(), filter, durable, maxConsumers, deleteOnNoConsumers, autoCreateAddress); + } + + @Override + public void requestSuccessful(ClientMessage reply) throws Exception { + context.out.println("Core queue " + getName() + " created successfully."); + } + + @Override + public void requestFailed(ClientMessage reply) throws Exception { + String errMsg = (String) ManagementHelper.getResult(reply, String.class); + context.err.println("Failed to create queue " + getName() + ". Reason: " + errMsg); + } + }); + } + + public void setFilter(String filter) { + this.filter = filter; + } + + public void setAutoCreateAddress(boolean autoCreateAddress) { + this.autoCreateAddress = autoCreateAddress; + } + + public void setMaxConsumers(int maxConsumers) { + this.maxConsumers = maxConsumers; + } + + public void setDeleteOnNoConsumers(boolean deleteOnNoConsumers) { + this.deleteOnNoConsumers = deleteOnNoConsumers; + } + + public void setAddress(String address) { + this.address = address; + } + + public void setName(String name) { + this.name = name; + } + + public String getName() { + if (name == null) { + name = input("--name", "Please provide the destination name:", ""); + } + + return name; + } +} diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/DeleteQueue.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/DeleteQueue.java new file mode 100644 index 0000000000..19d2e9939f --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/DeleteQueue.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.cli.commands.queue; + +import io.airlift.airline.Command; +import io.airlift.airline.Option; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.cli.commands.ActionContext; +import org.apache.activemq.artemis.cli.commands.AbstractAction; + +@Command(name = "delete", description = "delete a queue") +public class DeleteQueue extends AbstractAction { + + @Option(name = "--name", description = "queue name") + String name; + + @Option(name = "--removeConsumers", description = "whether deleting destination with consumers or not (default false)") + boolean removeConsumers = false; + + @Option(name = "--autoDeleteAddress", description = "delete the address if this it's last last queue") + boolean autoDeleteAddress = false; + + @Override + public Object execute(ActionContext context) throws Exception { + super.execute(context); + deleteQueue(context); + return null; + } + + private void deleteQueue(final ActionContext context) throws Exception { + performCoreManagement(new ManagementCallback() { + @Override + public void setUpInvocation(ClientMessage message) throws Exception { + ManagementHelper.putOperationInvocation(message, "broker", "destroyQueue", getName(), removeConsumers, autoDeleteAddress); + } + + @Override + public void requestSuccessful(ClientMessage reply) throws Exception { + context.out.println("Queue " + getName() + " deleted successfully."); + } + + @Override + public void requestFailed(ClientMessage reply) throws Exception { + String errMsg = (String) ManagementHelper.getResult(reply, String.class); + context.err.println("Failed to delete queue " + getName() + ". Reason: " + errMsg); + } + }); + } + + public void setRemoveConsumers(boolean removeConsumers) { + this.removeConsumers = removeConsumers; + } + + public void setAutoDeleteAddress(boolean autoDeleteAddress) { + this.autoDeleteAddress = autoDeleteAddress; + } + + public void setName(String name) { + this.name = name; + } + + public String getName() { + if (name == null) { + name = input("--name", "Please provide the destination name:", ""); + } + + return name; + } +} diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/HelpQueue.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/HelpQueue.java new file mode 100644 index 0000000000..687e0f4421 --- /dev/null +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/HelpQueue.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.cli.commands.queue; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import io.airlift.airline.Help; +import org.apache.activemq.artemis.cli.commands.Action; +import org.apache.activemq.artemis.cli.commands.ActionContext; + +public class HelpQueue extends Help implements Action { + + @Override + public boolean isVerbose() { + return false; + } + + @Override + public void setHomeValues(File brokerHome, File brokerInstance) { + } + + @Override + public String getBrokerInstance() { + return null; + } + + @Override + public String getBrokerHome() { + return null; + } + + @Override + public Object execute(ActionContext context) throws Exception { + List commands = new ArrayList<>(1); + commands.add("queue"); + help(global, commands); + return null; + } +} diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java index eb3d48ad9b..3c03ab2cd3 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java @@ -538,13 +538,16 @@ public class ArtemisTest { // This is usually set when run from the command line via artemis.profile - Run.setEmbedded(true); + Run.setEmbedded(false); Artemis.main("create", instanceFolder.getAbsolutePath(), "--force", "--silent", "--no-web", "--queues", queues, "--topics", topics, "--no-autotune", "--require-login"); System.setProperty("artemis.instance", instanceFolder.getAbsolutePath()); // Some exceptions may happen on the initialization, but they should be ok on start the basic core protocol Artemis.internalExecute("run"); + Artemis.main("queue", "create", "--name", "q1", "--address", "q1", "--user", "admin", "--password", "admin"); + Artemis.main("queue", "create", "--name", "t2", "--address", "t2", "--user", "admin", "--password", "admin"); + try { try (ServerLocator locator = ServerLocatorImpl.newLocator("tcp://localhost:61616"); ClientSessionFactory factory = locator.createSessionFactory(); diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQAddressDoesNotExistException.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQAddressDoesNotExistException.java new file mode 100644 index 0000000000..46a82b59fb --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQAddressDoesNotExistException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core; + +/** + * An operation failed because an address exists on the server. + */ +public final class ActiveMQAddressDoesNotExistException extends ActiveMQException { + + public ActiveMQAddressDoesNotExistException() { + super(ActiveMQExceptionType.ADDRESS_EXISTS); + } + + public ActiveMQAddressDoesNotExistException(String msg) { + super(ActiveMQExceptionType.ADDRESS_EXISTS, msg); + } +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java new file mode 100644 index 0000000000..9c8030649c --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQDeleteAddressException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core; + +/** + * An operation failed because an address exists on the server. + */ +public final class ActiveMQDeleteAddressException extends ActiveMQException { + + public ActiveMQDeleteAddressException() { + super(ActiveMQExceptionType.DELETE_ADDRESS_ERROR); + } + + public ActiveMQDeleteAddressException(String msg) { + super(ActiveMQExceptionType.DELETE_ADDRESS_ERROR, msg); + } +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQException.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQException.java index 6404c74c96..16e2b411b8 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQException.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQException.java @@ -76,5 +76,4 @@ public class ActiveMQException extends Exception { public String toString() { return this.getClass().getSimpleName() + "[errorType=" + type + " message=" + getMessage() + "]"; } - } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java index 785dac375b..64518ecb8b 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java @@ -106,6 +106,12 @@ public enum ActiveMQExceptionType { return new ActiveMQSecurityException(msg); } }, + ADDRESS_DOES_NOT_EXIST(106) { + @Override + public ActiveMQException createException(String msg) { + return new ActiveMQAddressDoesNotExistException(msg); + } + }, ADDRESS_EXISTS(107) { @Override public ActiveMQException createException(String msg) { @@ -231,6 +237,12 @@ public enum ActiveMQExceptionType { public ActiveMQException createException(String msg) { return new ActiveMQInvalidQueueConfiguration(msg); } + }, + DELETE_ADDRESS_ERROR(217) { + @Override + public ActiveMQException createException(String msg) { + return new ActiveMQDeleteAddressException(msg); + } }; private static final Map TYPE_MAP; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index 7772459abd..0654dbf0fd 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -19,6 +19,8 @@ package org.apache.activemq.artemis.api.core.management; import javax.management.MBeanOperationInfo; import java.util.Map; +import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; + /** * An ActiveMQServerControl is used to manage ActiveMQ Artemis servers. */ @@ -438,6 +440,13 @@ public interface ActiveMQServerControl { @Parameter(name = "defaultDeleteOnNoConsumers", desc = "Whether or not a queue with this address is deleted when it has no consumers") boolean defaultDeleteOnNoConsumers, @Parameter(name = "defaultMaxConsumers", desc = "The maximim number of consumer a queue with this address can have") int defaultMaxConsumers) throws Exception; + + @Operation(desc = "create an address", impact = MBeanOperationInfo.ACTION) + void createAddress(@Parameter(name = "name", desc = "The name of the address") String name, + @Parameter(name = "routingType", desc = "The routing type for the address either 'MULTICAST' or 'ANYCAST'") String routingType, + @Parameter(name = "defaultDeleteOnNoConsumers", desc = "Whether or not a queue with this address is deleted when it has no consumers") boolean defaultDeleteOnNoConsumers, + @Parameter(name = "defaultMaxConsumers", desc = "The maximim number of consumer a queue with this address can have") int defaultMaxConsumers) throws Exception; + @Operation(desc = "delete an address", impact = MBeanOperationInfo.ACTION) void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception; @@ -455,6 +464,7 @@ public interface ActiveMQServerControl { void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, @Parameter(name = "name", desc = "Name of the queue") String name) throws Exception; + /** * Create a queue. *
@@ -489,6 +499,25 @@ public interface ActiveMQServerControl { @Parameter(name = "name", desc = "Name of the queue") String name, @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable) throws Exception; + /** + * Create a queue. + *
+ * If {@code address} is {@code null} it will be defaulted to {@code name}. + *
+ * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. + * + * @param address address to bind the queue to + * @param name name of the queue + * @param durable whether the queue is durable + */ + @Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION) + void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, + @Parameter(name = "name", desc = "Name of the queue") String name, + @Parameter(name = "filter", desc = "Filter of the queue") String filter, + @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable, + @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers, + @Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers, + @Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception; /** * Deploy a durable queue. *
@@ -536,6 +565,14 @@ public interface ActiveMQServerControl { void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name, @Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers) throws Exception; + /** + * Destroys the queue corresponding to the specified name and delete it's address if there are no other queues + */ + @Operation(desc = "Destroy a queue", impact = MBeanOperationInfo.ACTION) + void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name, + @Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers, boolean autoDeleteAddress) throws Exception; + + /** * Enables message counters for this server. */ @@ -901,5 +938,10 @@ public interface ActiveMQServerControl { @Operation(desc = "List the Network Topology", impact = MBeanOperationInfo.INFO) String listNetworkTopology() throws Exception; + + String getAddressInfo(String address) throws ActiveMQAddressDoesNotExistException; + + @Operation(desc = "Get a list of bindings associated with an address", impact = MBeanOperationInfo.INFO) + String[] listBindingsForAddress(String address) throws Exception; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index a1831870a9..c38b2cf2d7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -42,6 +42,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; @@ -49,6 +50,7 @@ import org.apache.activemq.artemis.api.core.management.AddressControl; import org.apache.activemq.artemis.api.core.management.BridgeControl; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.DivertControl; +import org.apache.activemq.artemis.api.core.management.Parameter; import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.core.client.impl.Topology; import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; @@ -62,8 +64,10 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting; import org.apache.activemq.artemis.core.persistence.config.PersistedRoles; import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.remoting.server.RemotingService; import org.apache.activemq.artemis.core.security.CheckType; @@ -71,6 +75,7 @@ import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.ConnectorServiceFactory; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.JournalType; @@ -562,12 +567,21 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active clearIO(); try { - server.createOrUpdateAddressInfo(new AddressInfo(new SimpleString(name), AddressInfo.RoutingType.getType((byte)routingType), defaultDeleteOnNoConsumers, defaultMaxConsumers)); + server.createAddressInfo(new AddressInfo(new SimpleString(name), AddressInfo.RoutingType.getType((byte) routingType), defaultDeleteOnNoConsumers, defaultMaxConsumers)); } finally { blockOnIO(); } } + @Override + public void createAddress(@Parameter(name = "name", desc = "The name of the address") String name, + @Parameter(name = "routingType", desc = "The routing type for the address either 'MULTICAST' or 'ANYCAST'") String routingType, + @Parameter(name = "defaultDeleteOnNoConsumers", desc = "Whether or not a queue with this address is deleted when it has no consumers") boolean defaultDeleteOnNoConsumers, + @Parameter(name = "defaultMaxConsumers", desc = "The maximim number of consumer a queue with this address can have") int defaultMaxConsumers) throws Exception { + AddressInfo.RoutingType rt = AddressInfo.RoutingType.valueOf(routingType.toUpperCase()); + createAddress(name, rt.ordinal(), defaultDeleteOnNoConsumers, defaultMaxConsumers); + } + @Override public void deleteAddress(String name) throws Exception { checkStarted(); @@ -632,6 +646,30 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } } + @Override + public void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, + @Parameter(name = "name", desc = "Name of the queue") String name, + @Parameter(name = "filter", desc = "Filter of the queue") String filterStr, + @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable, + @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers, + @Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers, + @Parameter(name = "autoCreateAddress", desc = "Create an address with default values if one does not exist") boolean autoCreateAddress) throws Exception { + checkStarted(); + + clearIO(); + + SimpleString filter = filterStr == null ? null : new SimpleString(filterStr); + try { + if (filterStr != null && !filterStr.trim().equals("")) { + filter = new SimpleString(filterStr); + } + + server.createQueue(SimpleString.toSimpleString(address), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress); + } finally { + blockOnIO(); + } + } + @Override public void createQueue(final String address, final String name, @@ -727,23 +765,51 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } @Override - public void destroyQueue(final String name, final boolean removeConsumers) throws Exception { + public void destroyQueue(final String name, final boolean removeConsumers, final boolean autoDeleteAddress) throws Exception { checkStarted(); clearIO(); try { SimpleString queueName = new SimpleString(name); - server.destroyQueue(queueName, null, !removeConsumers, removeConsumers); + server.destroyQueue(queueName, null, !removeConsumers, removeConsumers, autoDeleteAddress); } finally { blockOnIO(); } } + @Override + public void destroyQueue(final String name, final boolean removeConsumers) throws Exception { + destroyQueue(name, removeConsumers, false); + } + @Override public void destroyQueue(final String name) throws Exception { destroyQueue(name, false); } + @Override + public String getAddressInfo(String address) throws ActiveMQAddressDoesNotExistException { + AddressInfo addressInfo = server.getAddressInfo(SimpleString.toSimpleString(address)); + if (addressInfo == null) { + throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(address)); + } + else { + return addressInfo.toString(); + } + } + + @Override + public String[] listBindingsForAddress(String address) throws Exception { + Bindings bindings = server.getPostOffice().getBindingsForAddress(new SimpleString(address)); + List result = new ArrayList<>(bindings.getBindings().size()); + + int i = 0; + for (Binding binding : bindings.getBindings()) { + + } + return (String[]) result.toArray(); + } + @Override public int getConnectionCount() { checkStarted(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index f1225c15c3..48ec7dbab8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.postoffice; +import java.util.List; import java.util.Map; import java.util.Set; @@ -50,6 +51,8 @@ public interface PostOffice extends ActiveMQComponent { AddressInfo getAddressInfo(SimpleString address); + List listQueuesForAddress(SimpleString address) throws Exception; + void addBinding(Binding binding) throws Exception; Binding removeBinding(SimpleString uniqueName, Transaction tx, boolean deleteData) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 135597f7a7..30643639a6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.BindingsFactory; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.QueueInfo; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; @@ -131,6 +132,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding private final ActiveMQServer server; + private Object addressLock = new Object(); + public PostOfficeImpl(final ActiveMQServer server, final StorageManager storageManager, final PagingManager pagingManager, @@ -420,39 +423,61 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding @Override public AddressInfo addAddressInfo(AddressInfo addressInfo) { - try { - managementService.registerAddress(addressInfo); - } catch (Exception e) { - e.printStackTrace(); + synchronized (addressLock) { + try { + managementService.registerAddress(addressInfo); + } catch (Exception e) { + e.printStackTrace(); + } + return addressManager.addAddressInfo(addressInfo); } - return addressManager.addAddressInfo(addressInfo); } @Override public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) { - try { - managementService.registerAddress(addressInfo); - } catch (Exception e) { - e.printStackTrace(); + synchronized (addressLock) { + try { + managementService.registerAddress(addressInfo); + } catch (Exception e) { + e.printStackTrace(); + } + return addressManager.addOrUpdateAddressInfo(addressInfo); } - return addressManager.addOrUpdateAddressInfo(addressInfo); } @Override - public AddressInfo removeAddressInfo(SimpleString address) { - try { - getServer().getManagementService().unregisterAddress(address); - } catch (Exception e) { - e.printStackTrace(); + public AddressInfo removeAddressInfo(SimpleString address) throws Exception { + synchronized (addressLock) { + Bindings bindingsForAddress = getBindingsForAddress(address); + if (bindingsForAddress.getBindings().size() > 0) { + throw new IllegalStateException("Address has bindings"); + } + managementService.unregisterAddress(address); + return addressManager.removeAddressInfo(address); } - return addressManager.removeAddressInfo(address); } @Override public AddressInfo getAddressInfo(SimpleString addressName) { - return addressManager.getAddressInfo(addressName); + synchronized (addressLock) { + return addressManager.getAddressInfo(addressName); + } } + @Override + public List listQueuesForAddress(SimpleString address) throws Exception { + Bindings bindingsForAddress = getBindingsForAddress(address); + List queues = new ArrayList<>(); + for (Binding b : bindingsForAddress.getBindings()) { + if (b instanceof QueueBinding) { + Queue q = ((QueueBinding) b).getQueue(); + queues.add(q); + } + } + return queues; + } + + // TODO - needs to be synchronized to prevent happening concurrently with activate() // (and possible removeBinding and other methods) // Otherwise can have situation where createQueue comes in before failover, then failover occurs diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index 6d8cf309aa..5d39df0843 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -18,9 +18,12 @@ package org.apache.activemq.artemis.core.server; import java.io.File; +import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; +import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException; import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException; +import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException; import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException; import org.apache.activemq.artemis.api.core.ActiveMQDuplicateMetaDataException; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -390,4 +393,13 @@ public interface ActiveMQMessageBundle { @Message(id = 119202, value = "Invalid Queue Configuration for Queue {0}, Address {1}. Expected {2} to be {3} but was {4}", format = Message.Format.MESSAGE_FORMAT) ActiveMQInvalidQueueConfiguration invalidQueueConfiguration(SimpleString address, SimpleString queueName, String queuePropertyName, Object expectedValue, Object actualValue); + + @Message(id = 119203, value = "Address Does Not Exist: {0}", format = Message.Format.MESSAGE_FORMAT) + ActiveMQAddressDoesNotExistException addressDoesNotExist(SimpleString address); + + @Message(id = 119204, value = "Address already exists: {0}", format = Message.Format.MESSAGE_FORMAT) + ActiveMQAddressExistsException addressAlreadyExists(SimpleString address); + + @Message(id = 119205, value = "Address {0} has bindings", format = Message.Format.MESSAGE_FORMAT) + ActiveMQDeleteAddressException addressHasBindings(SimpleString address); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 84f554d7ee..833f8ce2bb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -22,6 +22,7 @@ import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Queues; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.Configuration; @@ -122,6 +123,12 @@ public interface ActiveMQServer extends ActiveMQComponent { */ ActiveMQServerControlImpl getActiveMQServerControl(); + void destroyQueue(SimpleString queueName, + SecurityAuth session, + boolean checkConsumerCount, + boolean removeConsumers, + boolean autoDeleteAddress) throws Exception; + void registerActivateCallback(ActivateCallback callback); void unregisterActivateCallback(ActivateCallback callback); @@ -289,7 +296,8 @@ public interface ActiveMQServer extends ActiveMQComponent { boolean durable, boolean temporary, Integer maxConsumers, - Boolean deleteOnNoConsumers) throws Exception; + Boolean deleteOnNoConsumers, + boolean autoCreateAddress) throws Exception; Queue createQueue(SimpleString address, SimpleString queueName, @@ -305,7 +313,8 @@ public interface ActiveMQServer extends ActiveMQComponent { boolean durable, boolean temporary, Integer maxConsumers, - Boolean deleteOnNoConsumers) throws Exception; + Boolean deleteOnNoConsumers, + boolean autoCreateAddress) throws Exception; Queue createQueue(SimpleString address, SimpleString queueName, @@ -323,7 +332,8 @@ public interface ActiveMQServer extends ActiveMQComponent { boolean temporary, boolean autoCreated, Integer maxConsumers, - Boolean deleteOnNoConsumers) throws Exception; + Boolean deleteOnNoConsumers, + boolean autoCreateAddress) throws Exception; Queue deployQueue(SimpleString address, SimpleString queueName, @@ -351,7 +361,8 @@ public interface ActiveMQServer extends ActiveMQComponent { boolean temporary, boolean autoCreated, Integer maxConsumers, - Boolean deleteOnNoConsumers) throws Exception; + Boolean deleteOnNoConsumers, + boolean autoCreateAddress) throws Exception; void destroyQueue(SimpleString queueName) throws Exception; @@ -414,7 +425,8 @@ public interface ActiveMQServer extends ActiveMQComponent { boolean transientQueue, boolean autoCreated, Integer maxConsumers, - Boolean deleteOnNoConsumers) throws Exception; + Boolean deleteOnNoConsumers, + boolean autoCreateAddress) throws Exception; /* * add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will @@ -451,6 +463,8 @@ public interface ActiveMQServer extends ActiveMQComponent { AddressInfo putAddressInfoIfAbsent(AddressInfo addressInfo) throws Exception; + void createAddressInfo(AddressInfo addressInfo) throws Exception; + AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception; void removeAddressInfo(SimpleString address) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 7e219c688e..564aabd83c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -785,7 +785,7 @@ public interface ActiveMQServerLogger extends BasicLogger { format = Message.Format.MESSAGE_FORMAT) void noQueueIdDefined(ServerMessage message, ServerMessage messageCopy, SimpleString idsHeaderName); - @LogMessage(level = Logger.Level.WARN) + @LogMessage(level = Logger.Level.TRACE) @Message(id = 222111, value = "exception while invoking {0} on {1}", format = Message.Format.MESSAGE_FORMAT) void managementOperationError(@Cause Exception e, String op, String resourceName); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 7aa802b54d..ea72beaaa1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; @@ -1476,8 +1477,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean durable, final boolean temporary, final Integer maxConsumers, - final Boolean deleteOnNoConsumers) throws Exception { - return createQueue(address, queueName, filterString, null, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers); + final Boolean deleteOnNoConsumers, + final boolean autoCreateAddress) throws Exception { + return createQueue(address, queueName, filterString, null, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress); } @Override @@ -1498,8 +1500,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { boolean durable, boolean temporary, Integer maxConsumers, - Boolean deleteOnNoConsumers) throws Exception { - return createQueue(address, queueName, filter, user, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers); + Boolean deleteOnNoConsumers, + boolean autoCreateAddress) throws Exception { + return createQueue(address, queueName, filter, user, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress); } @Override @@ -1522,8 +1525,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { boolean temporary, boolean autoCreated, Integer maxConsumers, - Boolean deleteOnNoConsumers) throws Exception { - return createQueue(address, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, deleteOnNoConsumers); + Boolean deleteOnNoConsumers, + boolean autoCreateAddress) throws Exception { + return createQueue(address, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress); } @Override @@ -1585,7 +1589,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean durable, final boolean temporary, final boolean autoCreated) throws Exception { - return deployQueue(address, queueName, filterString, durable, temporary, autoCreated, null, null); + return deployQueue(address, queueName, filterString, durable, temporary, autoCreated, null, null, true); } @Override @@ -1596,12 +1600,13 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean temporary, final boolean autoCreated, final Integer maxConsumers, - final Boolean deleteOnNoConsumers) throws Exception { + final Boolean deleteOnNoConsumers, + final boolean autoCreateAddress) throws Exception { // TODO: fix logging here as this could be for a topic or queue ActiveMQServerLogger.LOGGER.deployQueue(queueName); - return createQueue(address, queueName, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers); + return createQueue(address, queueName, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress); } @Override @@ -1621,7 +1626,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { public void destroyQueue(final SimpleString queueName, final SecurityAuth session, final boolean checkConsumerCount) throws Exception { - destroyQueue(queueName, session, checkConsumerCount, false); + destroyQueue(queueName, session, checkConsumerCount, false, true); } @Override @@ -1629,6 +1634,15 @@ public class ActiveMQServerImpl implements ActiveMQServer { final SecurityAuth session, final boolean checkConsumerCount, final boolean removeConsumers) throws Exception { + destroyQueue(queueName, session, checkConsumerCount, removeConsumers, true); + } + + @Override + public void destroyQueue(final SimpleString queueName, + final SecurityAuth session, + final boolean checkConsumerCount, + final boolean removeConsumers, + final boolean autoDeleteAddress) throws Exception { if (postOffice == null) { return; } @@ -1662,6 +1676,14 @@ public class ActiveMQServerImpl implements ActiveMQServer { queue.deleteQueue(removeConsumers); + if (autoDeleteAddress && postOffice != null) { + try { + postOffice.removeAddressInfo(address); + } catch (ActiveMQDeleteAddressException e) { + // Could be thrown if the address has bindings or is not deletable. + } + } + callPostQueueDeletionCallbacks(address, queueName); } @@ -2101,15 +2123,15 @@ public class ActiveMQServerImpl implements ActiveMQServer { // Deploy any predefined queues deployQueuesFromConfiguration(); - registerPostQueueDeletionCallback(new PostQueueDeletionCallback() { - // TODO delete auto-created addresses when queueCount == 0 - @Override - public void callback(SimpleString address, SimpleString queueName) throws Exception { - if (getAddressInfo(address).isAutoCreated() && postOffice.getBindingsForAddress(address).getBindings().size() == 0) { - removeAddressInfo(address); - } - } - }); + // registerPostQueueDeletionCallback(new PostQueueDeletionCallback() { + // // TODO delete auto-created addresses when queueCount == 0 + // @Override + // public void callback(SimpleString address, SimpleString queueName) throws Exception { + // if (getAddressInfo(address).isAutoCreated()) { + // removeAddressInfo(address); + // } + // } + // }); // We need to call this here, this gives any dependent server a chance to deploy its own addresses // this needs to be done before clustering is fully activated @@ -2200,7 +2222,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { private void deployQueuesFromListCoreQueueConfiguration(List queues) throws Exception { for (CoreQueueConfiguration config : queues) { - deployQueue(SimpleString.toSimpleString(config.getAddress()), SimpleString.toSimpleString(config.getName()), SimpleString.toSimpleString(config.getFilterString()), config.isDurable(), false, false, config.getMaxConsumers(), config.getDeleteOnNoConsumers()); + deployQueue(SimpleString.toSimpleString(config.getAddress()), SimpleString.toSimpleString(config.getName()), SimpleString.toSimpleString(config.getFilterString()), config.isDurable(), false, false, config.getMaxConsumers(), config.getDeleteOnNoConsumers(), true); } } @@ -2315,6 +2337,13 @@ public class ActiveMQServerImpl implements ActiveMQServer { return result; } + @Override + public void createAddressInfo(AddressInfo addressInfo) throws Exception { + if (putAddressInfoIfAbsent(addressInfo) != null) { + throw ActiveMQMessageBundle.BUNDLE.addressAlreadyExists(addressInfo.getName()); + } + } + @Override public AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception { AddressInfo result = postOffice.addOrUpdateAddressInfo(addressInfo); @@ -2329,12 +2358,14 @@ public class ActiveMQServerImpl implements ActiveMQServer { @Override public void removeAddressInfo(SimpleString address) throws Exception { - postOffice.removeAddressInfo(address); + if (postOffice.removeAddressInfo(address) == null) { + throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address); + }; // TODO: is this the right way to do this? -// long txID = storageManager.generateID(); -// storageManager.deleteAddressBinding(txID, getAddressInfo(address).getID()); -// storageManager.commitBindings(txID); + // long txID = storageManager.generateID(); + // storageManager.deleteAddressBinding(txID, getAddressInfo(address).getID()); + // storageManager.commitBindings(txID); } @@ -2357,17 +2388,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean ignoreIfExists, final boolean transientQueue, final boolean autoCreated) throws Exception { - return createQueue(addressName, - queueName, - filterString, - user, - durable, - temporary, - ignoreIfExists, - transientQueue, - autoCreated, - null, - null); + return createQueue(addressName, queueName, filterString, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, null, null, true); } @Override @@ -2381,7 +2402,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean transientQueue, final boolean autoCreated, final Integer maxConsumers, - final Boolean deleteOnNoConsumers) throws Exception { + final Boolean deleteOnNoConsumers, + final boolean autoCreateAddress) throws Exception { + final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName); if (binding != null) { if (ignoreIfExists) { @@ -2404,34 +2427,26 @@ public class ActiveMQServerImpl implements ActiveMQServer { } AddressInfo defaultAddressInfo = new AddressInfo(addressName); - // FIXME This boils down to a putIfAbsent (avoids race). This should be reflected in the API. AddressInfo info = postOffice.getAddressInfo(addressName); if (info == null) { - info = defaultAddressInfo; + if (autoCreateAddress) { + info = defaultAddressInfo; + } else { + throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName); + } } final boolean isDeleteOnNoConsumers = deleteOnNoConsumers == null ? info.isDefaultDeleteOnNoConsumers() : deleteOnNoConsumers; final int noMaxConsumers = maxConsumers == null ? info.getDefaultMaxQueueConsumers() : maxConsumers; - final QueueConfig queueConfig = queueConfigBuilder - .filter(filter) - .pagingManager(pagingManager) - .user(user) - .durable(durable) - .temporary(temporary) - .autoCreated(autoCreated) - .deleteOnNoConsumers(isDeleteOnNoConsumers) - .maxConsumers(noMaxConsumers) - .build(); + final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).deleteOnNoConsumers(isDeleteOnNoConsumers).maxConsumers(noMaxConsumers).build(); final Queue queue = queueFactory.createQueueWith(queueConfig); boolean addressAlreadyExists = true; if (postOffice.getAddressInfo(queue.getAddress()) == null) { - postOffice.addAddressInfo(new AddressInfo(queue.getAddress()) - .setRoutingType(AddressInfo.RoutingType.MULTICAST) - .setDefaultMaxQueueConsumers(maxConsumers == null ? ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers() : maxConsumers)); + postOffice.addAddressInfo(new AddressInfo(queue.getAddress()).setRoutingType(AddressInfo.RoutingType.MULTICAST).setDefaultMaxQueueConsumers(maxConsumers == null ? ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers() : maxConsumers)); addressAlreadyExists = false; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java index 6ad40fa5fc..a92e8d332b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java @@ -31,6 +31,8 @@ public class AddressInfo { private boolean autoCreated = false; + private boolean deletable = false; + public AddressInfo(SimpleString name) { this.name = name; } @@ -85,7 +87,7 @@ public class AddressInfo { @Override public String toString() { StringBuffer buff = new StringBuffer(); - buff.append("AddressInfo [name=" + name); + buff.append("Address [name=" + name); buff.append(", routingType=" + routingType); buff.append(", defaultMaxQueueConsumers=" + defaultMaxQueueConsumers); buff.append(", defaultDeleteOnNoConsumers=" + defaultDeleteOnNoConsumers); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 7c614aea78..f6da245ed7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1421,7 +1421,8 @@ public class QueueImpl implements Queue { @Override public void deleteQueue(boolean removeConsumers) throws Exception { synchronized (this) { - if (this.queueDestroyed) return; + if (this.queueDestroyed) + return; this.queueDestroyed = true; } @@ -1454,7 +1455,6 @@ public class QueueImpl implements Queue { tx.rollback(); throw e; } - } @Override @@ -1799,7 +1799,7 @@ public class QueueImpl implements Queue { } @Override - public synchronized void pause(boolean persist) { + public synchronized void pause(boolean persist) { try { this.flushDeliveriesInTransit(); if (persist && isDurable()) { @@ -1887,6 +1887,7 @@ public class QueueImpl implements Queue { return "QueueImpl[name=" + name.toString() + ", postOffice=" + this.postOffice + ", temp=" + this.temporary + "]@" + Integer.toHexString(System.identityHashCode(this)); } + private synchronized void internalAddTail(final MessageReference ref) { refAdded(ref); messageReferences.addTail(ref, getPriority(ref)); @@ -2960,8 +2961,6 @@ public class QueueImpl implements Queue { return false; } - - @Override public MessageReference next() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index b3fc5acc1e..88ab0c014c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -510,7 +510,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { server.checkQueueCreationLimit(getUsername()); - Queue queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers); + Queue queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, maxConsumers, deleteOnNoConsumers, true); if (temporary) { // Temporary queue in core simply means the queue will be deleted if diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java index 242cbc7adc..4cfa57afcd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java @@ -739,7 +739,6 @@ public class ManagementServiceImpl implements ManagementService { } Object result = method.invoke(resource, params); - return result; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java index b6e4de7414..827e1b3a6f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java @@ -229,7 +229,7 @@ public class AddressingTest extends ActiveMQTestBase { SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); // For each address, create 2 Queues with the same address, assert both queues receive message boolean deleteOnNoConsumers = true; - Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers); + Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers, false); ClientSession session = sessionFactory.createSession(); session.start(); @@ -246,7 +246,7 @@ public class AddressingTest extends ActiveMQTestBase { SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); // For each address, create 2 Queues with the same address, assert both queues receive message boolean deleteOnNoConsumers = false; - Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers); + Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers, false); ClientSession session = sessionFactory.createSession(); session.start(); @@ -263,7 +263,7 @@ public class AddressingTest extends ActiveMQTestBase { SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); // For each address, create 2 Queues with the same address, assert both queues receive message boolean deleteOnNoConsumers = false; - Queue q1 = server.createQueue(address, queueName, null, true, false, 0, deleteOnNoConsumers); + Queue q1 = server.createQueue(address, queueName, null, true, false, 0, deleteOnNoConsumers, false); Exception expectedException = null; String expectedMessage = "Maximum Consumer Limit Reached on Queue"; @@ -290,7 +290,7 @@ public class AddressingTest extends ActiveMQTestBase { SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); // For each address, create 2 Queues with the same address, assert both queues receive message boolean deleteOnNoConsumers = false; - Queue q1 = server.createQueue(address, queueName, null, true, false, -1, deleteOnNoConsumers); + Queue q1 = server.createQueue(address, queueName, null, true, false, -1, deleteOnNoConsumers, false); ClientSession session = sessionFactory.createSession(); session.start(); @@ -310,7 +310,7 @@ public class AddressingTest extends ActiveMQTestBase { boolean deleteOnNoConsumers = false; AddressInfo addressInfo = new AddressInfo(address); addressInfo.setDefaultMaxQueueConsumers(0); - Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers); + Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers, false); ClientSession session = sessionFactory.createSession(); session.start(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/AddressCommandTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/AddressCommandTest.java new file mode 100644 index 0000000000..9589f47327 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/AddressCommandTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cli; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.UUID; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.cli.commands.AbstractAction; +import org.apache.activemq.artemis.cli.commands.ActionContext; +import org.apache.activemq.artemis.cli.commands.address.CreateAddress; +import org.apache.activemq.artemis.cli.commands.address.DeleteAddress; +import org.apache.activemq.artemis.cli.commands.address.ShowAddress; +import org.apache.activemq.artemis.cli.commands.queue.CreateQueue; +import org.apache.activemq.artemis.cli.commands.queue.DeleteQueue; +import org.apache.activemq.artemis.core.config.DivertConfiguration; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.junit.Before; +import org.junit.Test; + +public class AddressCommandTest extends JMSTestBase { + + //the command + private ByteArrayOutputStream output; + private ByteArrayOutputStream error; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + this.output = new ByteArrayOutputStream(1024); + this.error = new ByteArrayOutputStream(1024); + } + + @Test + public void testCreateAddress() throws Exception { + String address = "address"; + CreateAddress command = new CreateAddress(); + command.setName(address); + command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + checkExecutionPassed(command); + assertNotNull(server.getAddressInfo(new SimpleString(address))); + } + + @Test + public void testCreateAddressAlreadyExistsShowsError() throws Exception { + String address = "address"; + CreateAddress command = new CreateAddress(); + command.setName(address); + command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + checkExecutionPassed(command); + assertNotNull(server.getAddressInfo(new SimpleString(address))); + + command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + checkExecutionFailure(command, "Address already exists"); + } + + @Test + public void testDeleteAddress() throws Exception { + String address = "address"; + CreateAddress command = new CreateAddress(); + command.setName(address); + command.execute(new ActionContext()); + assertNotNull(server.getAddressInfo(new SimpleString(address))); + + DeleteAddress deleteAddress = new DeleteAddress(); + deleteAddress.setName(address); + deleteAddress.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + checkExecutionPassed(deleteAddress); + assertNull(server.getAddressInfo(new SimpleString(address))); + } + + @Test + public void testDeleteAddressDoesNotExistsShowsError() throws Exception { + String address = "address"; + DeleteAddress deleteAddress = new DeleteAddress(); + deleteAddress.setName(address); + deleteAddress.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + checkExecutionFailure(deleteAddress, "Address Does Not Exist"); + } + + @Test + public void testShowAddress() throws Exception { + String address = "address"; + CreateAddress command = new CreateAddress(); + command.setName(address); + command.execute(new ActionContext()); + assertNotNull(server.getAddressInfo(new SimpleString(address))); + + ShowAddress showAddress = new ShowAddress(); + showAddress.setName(address); + showAddress.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + System.out.println(output.toString()); + } + + @Test + public void testShowAddressDoesNotExist() throws Exception { + String address = "address"; + ShowAddress showAddress = new ShowAddress(); + showAddress.setName(address); + showAddress.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + checkExecutionFailure(showAddress, "Address Does Not Exist"); + } + + @Test + public void testShowAddressBindings() throws Exception { + + // Create bindings + SimpleString address = new SimpleString("address"); + server.createAddressInfo(new AddressInfo(address)); + server.createQueue(address, new SimpleString("queue1"), null, true, false); + server.createQueue(address, new SimpleString("queue2"), null, true, false); + server.createQueue(address, new SimpleString("queue3"), null, true, false); + + DivertConfiguration divertConfiguration = new DivertConfiguration(); + divertConfiguration.setName(address.toString()); + divertConfiguration.setAddress(address.toString()); + server.deployDivert(divertConfiguration); + + ShowAddress showAddress = new ShowAddress(); + showAddress.setName(address.toString()); + showAddress.setBindings(true); + showAddress.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + System.out.println(output.toString()); + } + + private void checkExecutionPassed(AbstractAction command) throws Exception { + String fullMessage = output.toString(); + System.out.println("output: " + fullMessage); + assertTrue(fullMessage, fullMessage.contains("successfully")); + } + + private void checkExecutionFailure(AbstractAction command, String message) throws Exception { + String fullMessage = error.toString(); + System.out.println("error: " + fullMessage); + assertTrue(fullMessage, fullMessage.contains(message)); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DestinationCommandTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DestinationCommandTest.java deleted file mode 100644 index a9266ef696..0000000000 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DestinationCommandTest.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.integration.cli; - -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.util.Map; - -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.cli.commands.ActionContext; -import org.apache.activemq.artemis.cli.commands.destination.CreateDestination; -import org.apache.activemq.artemis.cli.commands.destination.DeleteDestination; -import org.apache.activemq.artemis.cli.commands.destination.DestinationAction; -import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.postoffice.Binding; -import org.apache.activemq.artemis.tests.util.JMSTestBase; -import org.junit.Before; -import org.junit.Test; - -public class DestinationCommandTest extends JMSTestBase { - - //the command - private ByteArrayOutputStream output; - private ByteArrayOutputStream error; - - @Before - @Override - public void setUp() throws Exception { - super.setUp(); - this.output = new ByteArrayOutputStream(1024); - this.error = new ByteArrayOutputStream(1024); - } - - @Test - public void testCreateJmsQueue() throws Exception { - CreateDestination command = new CreateDestination(); - command.setName("jmsQueue1"); - command.setBindings("jmsQueue1Binding"); - command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); - checkExecutionResult(command); - } - - @Test - public void testDeleteJmsQueue() throws Exception { - CreateDestination command = new CreateDestination(); - command.setName("jmsQueue1"); - command.setBindings("jmsQueue1Binding"); - command.execute(new ActionContext()); - - DeleteDestination delete = new DeleteDestination(); - delete.setName("jmsQueue1"); - delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); - checkExecutionResult(delete); - } - - @Test - public void testDeleteNonExistJmsQueue() throws Exception { - DeleteDestination delete = new DeleteDestination(); - delete.setName("jmsQueue1NotExist"); - delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); - checkExecutionResult(delete); - } - - @Test - public void testCreateJmsQueueWithFilter() throws Exception { - CreateDestination command = new CreateDestination(); - command.setName("jmsQueue2"); - command.setBindings("jmsQueue2Binding"); - command.setFilter("color='red'"); - command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); - checkExecutionResult(command); - assertTrue(checkBindingExists(command, "color='red'")); - } - - @Test - public void testCreateJmsTopic() throws Exception { - CreateDestination command = new CreateDestination(); - command.setDestType(DestinationAction.JMS_TOPIC); - command.setName("jmsTopic1"); - command.setBindings("jmsTopic1Binding"); - command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); - checkExecutionResult(command); - } - - @Test - public void testDeleteJmsTopic() throws Exception { - CreateDestination command = new CreateDestination(); - command.setDestType(DestinationAction.JMS_TOPIC); - command.setName("jmsTopic1"); - command.setBindings("jmsTopic1Binding"); - command.execute(new ActionContext()); - - DeleteDestination delete = new DeleteDestination(); - delete.setDestType(DestinationAction.JMS_TOPIC); - delete.setName("jmsTopic1"); - delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); - checkExecutionResult(delete); - } - - @Test - public void testDeleteJmsTopicNotExist() throws Exception { - DeleteDestination delete = new DeleteDestination(); - delete.setDestType(DestinationAction.JMS_TOPIC); - delete.setName("jmsTopic1NotExist"); - delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); - checkExecutionResult(delete); - } - - @Test - public void testCreateCoreQueue() throws Exception { - CreateDestination command = new CreateDestination(); - command.setDestType(DestinationAction.CORE_QUEUE); - command.setName("coreQueue1"); - command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); - checkExecutionResult(command); - } - - @Test - public void testCreateCoreQueueWithFilter() throws Exception { - CreateDestination command = new CreateDestination(); - command.setName("coreQueue2"); - command.setDestType(DestinationAction.CORE_QUEUE); - command.setFilter("color='green'"); - command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); - checkExecutionResult(command); - } - - @Test - public void testDeleteCoreQueue() throws Exception { - CreateDestination command = new CreateDestination(); - command.setName("coreQueue2"); - command.setDestType(DestinationAction.CORE_QUEUE); - command.setFilter("color='green'"); - command.execute(new ActionContext()); - - DeleteDestination delete = new DeleteDestination(); - delete.setName("coreQueue2"); - delete.setDestType(DestinationAction.CORE_QUEUE); - delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); - checkExecutionResult(delete); - } - - @Test - public void testDeleteCoreQueueNotExist() throws Exception { - DeleteDestination delete = new DeleteDestination(); - delete.setName("coreQueue2NotExist"); - delete.setDestType(DestinationAction.CORE_QUEUE); - delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); - checkExecutionResult(delete); - } - - private boolean isCreateCommand(DestinationAction command) { - return command instanceof CreateDestination; - } - - private boolean isJms(DestinationAction command) { - String destType = command.getDestType(); - return !DestinationAction.CORE_QUEUE.equals(destType); - } - - private boolean isTopic(DestinationAction command) { - String destType = command.getDestType(); - return DestinationAction.JMS_TOPIC.equals(destType); - } - - private void checkExecutionResult(DestinationAction command) throws Exception { - if (isCreateCommand(command)) { - String fullMessage = output.toString(); - System.out.println("output: " + fullMessage); - assertTrue(fullMessage, fullMessage.contains("successfully")); - assertTrue(checkBindingExists(command, null)); - } else { - if (command.getName().equals("jmsQueue1") || command.getName().equals("coreQueue2") || command.getName().equals("jmsTopic1")) { - String fullMessage = output.toString(); - System.out.println("output: " + fullMessage); - assertTrue(fullMessage, fullMessage.contains("successfully")); - assertFalse(checkBindingExists(command, null)); - } else { - String errorMessage = error.toString(); - System.out.println("error: " + errorMessage); - assertTrue(errorMessage, errorMessage.contains("Failed to")); - assertFalse(checkBindingExists(command, null)); - } - } - } - - private boolean checkBindingExists(DestinationAction command, String filter) { - String bindingKey = command.getName(); - if (isJms(command)) { - if (isTopic(command)) { -// bindingKey = bindingKey; - } else { -// bindingKey = bindingKey; - } - } - Map bindings = server.getPostOffice().getAllBindings(); - System.out.println("bindings: " + bindings); - Binding binding = bindings.get(new SimpleString(bindingKey)); - System.out.println("got binding: " + binding); - if (binding == null) { - System.out.println("No bindings for " + bindingKey); - return false; - } - if (filter != null) { - Filter bindingFilter = binding.getFilter(); - assertNotNull(bindingFilter); - assertEquals(filter, bindingFilter.getFilterString().toString()); - } - return true; - } - -} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java new file mode 100644 index 0000000000..d79b444360 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cli; + +import java.util.List; + +import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.server.HandleStatus; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; +import org.apache.activemq.artemis.core.transaction.Transaction; + +public class DummyServerConsumer implements ServerConsumer { + + @Override + public void setlowConsumerDetection(SlowConsumerDetectionListener listener) { + + } + + @Override + public SlowConsumerDetectionListener getSlowConsumerDetecion() { + return null; + } + + @Override + public void fireSlowConsumer() { + + } + + @Override + public Object getProtocolData() { + return null; + } + + @Override + public void setProtocolData(Object protocolData) { + + } + + @Override + public void setProtocolContext(Object protocolContext) { + + } + + @Override + public Object getProtocolContext() { + return null; + } + + @Override + public long getID() { + return 0; + } + + @Override + public Object getConnectionID() { + return null; + } + + @Override + public void close(boolean failed) throws Exception { + + } + + @Override + public void removeItself() throws Exception { + + } + + @Override + public List cancelRefs(boolean failed, + boolean lastConsumedAsDelivered, + Transaction tx) throws Exception { + return null; + } + + @Override + public void setStarted(boolean started) { + + } + + @Override + public void receiveCredits(int credits) { + + } + + @Override + public Queue getQueue() { + return null; + } + + @Override + public MessageReference removeReferenceByID(long messageID) throws Exception { + return null; + } + + @Override + public void backToDelivering(MessageReference reference) { + + } + + @Override + public List getDeliveringReferencesBasedOnProtocol(boolean remove, + Object protocolDataStart, + Object protocolDataEnd) { + return null; + } + + @Override + public void acknowledge(Transaction tx, long messageID) throws Exception { + + } + + @Override + public void individualAcknowledge(Transaction tx, long messageID) throws Exception { + + } + + @Override + public void individualCancel(long messageID, boolean failed) throws Exception { + + } + + @Override + public void forceDelivery(long sequence) { + + } + + @Override + public void setTransferring(boolean transferring) { + + } + + @Override + public boolean isBrowseOnly() { + return false; + } + + @Override + public long getCreationTime() { + return 0; + } + + @Override + public String getSessionID() { + return null; + } + + @Override + public void promptDelivery() { + + } + + @Override + public HandleStatus handle(MessageReference reference) throws Exception { + return null; + } + + @Override + public void proceedDeliver(MessageReference reference) throws Exception { + + } + + @Override + public Filter getFilter() { + return null; + } + + @Override + public List getDeliveringMessages() { + return null; + } + + @Override + public String debug() { + return null; + } + + @Override + public String toManagementString() { + return null; + } + + @Override + public void disconnect() { + + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/QueueCommandTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/QueueCommandTest.java new file mode 100644 index 0000000000..5d7a695afb --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/QueueCommandTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cli; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.UUID; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.cli.commands.ActionContext; +import org.apache.activemq.artemis.cli.commands.queue.CreateQueue; +import org.apache.activemq.artemis.cli.commands.queue.DeleteQueue; +import org.apache.activemq.artemis.cli.commands.AbstractAction; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.junit.Before; +import org.junit.Test; + +public class QueueCommandTest extends JMSTestBase { + + //the command + private ByteArrayOutputStream output; + private ByteArrayOutputStream error; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + this.output = new ByteArrayOutputStream(1024); + this.error = new ByteArrayOutputStream(1024); + } + + @Test + public void testCreateCoreQueueShowsErrorWhenAddressDoesNotExists() throws Exception { + String queueName = "queue1"; + CreateQueue command = new CreateQueue(); + command.setName(queueName); + command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + checkExecutionFailure(command, "AMQ119203: Address Does Not Exist:");; + assertFalse(server.queueQuery(new SimpleString(queueName)).isExists()); + } + + @Test + public void testCreateCoreQueueAutoCreateAddressDefaultAddress() throws Exception { + String queueName = UUID.randomUUID().toString(); + CreateQueue command = new CreateQueue(); + command.setName(queueName); + command.setAutoCreateAddress(true); + command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + checkExecutionPassed(command); + assertNotNull(server.getAddressInfo(new SimpleString(queueName))); + + Queue queue = server.locateQueue(new SimpleString(queueName)); + assertEquals(-1, queue.getMaxConsumers()); + assertEquals(false, queue.isDeleteOnNoConsumers()); + assertTrue(server.queueQuery(new SimpleString(queueName)).isExists()); + } + + @Test + public void testCreateCoreQueueAddressExists() throws Exception { + String queueName = "queue"; + String address= "address"; + + CreateQueue command = new CreateQueue(); + command.setName(queueName); + command.setAutoCreateAddress(false); + command.setAddress(address); + + server.createOrUpdateAddressInfo(new AddressInfo(new SimpleString(address))); + + command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + checkExecutionPassed(command); + assertNotNull(server.getAddressInfo(new SimpleString(address))); + + Queue queue = server.locateQueue(new SimpleString(queueName)); + assertEquals(-1, queue.getMaxConsumers()); + assertEquals(false, queue.isDeleteOnNoConsumers()); + assertTrue(server.queueQuery(new SimpleString(queueName)).isExists()); + } + + @Test + public void testCreateCoreQueueWithFilter() throws Exception { + String queueName = "queue2"; + String filerString = "color='green'"; + + CreateQueue command = new CreateQueue(); + command.setName(queueName); + command.setFilter("color='green'"); + command.setAutoCreateAddress(true); + command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + + checkExecutionPassed(command); + Queue queue = server.locateQueue(new SimpleString(queueName)); + assertNotNull(queue); + assertEquals(new SimpleString(filerString), queue.getFilter().getFilterString()); + } + + @Test + public void testCreateQueueAlreadyExists() throws Exception { + String queueName = "queue2"; + String filerString = "color='green'"; + + CreateQueue command = new CreateQueue(); + command.setName(queueName); + command.setFilter("color='green'"); + command.setAutoCreateAddress(true); + command.execute(new ActionContext()); + command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + checkExecutionFailure(command, "AMQ119019: Queue already exists " + queueName); + } + + @Test + public void testDeleteCoreQueue() throws Exception { + SimpleString queueName = new SimpleString("deleteQueue"); + + CreateQueue command = new CreateQueue(); + command.setName(queueName.toString()); + command.setFilter("color='green'"); + command.setAutoCreateAddress(true); + command.execute(new ActionContext()); + + DeleteQueue delete = new DeleteQueue(); + delete.setName(queueName.toString()); + delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + checkExecutionPassed(delete); + + assertFalse(server.queueQuery(queueName).isExists()); + } + + @Test + public void testDeleteQueueDoesNotExist() throws Exception { + SimpleString queueName = new SimpleString("deleteQueue"); + + DeleteQueue delete = new DeleteQueue(); + delete.setName(queueName.toString()); + delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + checkExecutionFailure(delete, "AMQ119017: Queue " + queueName + " does not exist"); + + assertFalse(server.queueQuery(queueName).isExists()); + } + + @Test + public void testDeleteQueueWithConsumersFails() throws Exception { + SimpleString queueName = new SimpleString("deleteQueue"); + + CreateQueue command = new CreateQueue(); + command.setName(queueName.toString()); + command.setFilter("color='green'"); + command.setAutoCreateAddress(true); + command.execute(new ActionContext()); + + server.locateQueue(queueName).addConsumer(new DummyServerConsumer()); + + DeleteQueue delete = new DeleteQueue(); + delete.setName(queueName.toString()); + delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + checkExecutionFailure(delete, "AMQ119025: Cannot delete queue " + queueName + " on binding deleteQueue"); + } + + @Test + public void testDeleteQueueWithConsumersFailsAndRemoveConsumersTrue() throws Exception { + SimpleString queueName = new SimpleString("deleteQueue"); + + CreateQueue command = new CreateQueue(); + command.setName(queueName.toString()); + command.setFilter("color='green'"); + command.setAutoCreateAddress(true); + command.execute(new ActionContext()); + + server.locateQueue(queueName).addConsumer(new DummyServerConsumer()); + + DeleteQueue delete = new DeleteQueue(); + delete.setName(queueName.toString()); + delete.setRemoveConsumers(true); + delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + checkExecutionPassed(command); + } + + @Test + public void testAutoDeleteAddress() throws Exception { + SimpleString queueName = new SimpleString("deleteQueue"); + + CreateQueue command = new CreateQueue(); + command.setName(queueName.toString()); + command.setFilter("color='green'"); + command.setAutoCreateAddress(true); + command.execute(new ActionContext()); + assertNotNull(server.getAddressInfo(queueName)); + + server.locateQueue(queueName).addConsumer(new DummyServerConsumer()); + + DeleteQueue delete = new DeleteQueue(); + delete.setName(queueName.toString()); + delete.setRemoveConsumers(true); + delete.setAutoDeleteAddress(true); + delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error))); + + checkExecutionPassed(command); + assertNull(server.getAddressInfo(queueName)); + } + + private void checkExecutionPassed(AbstractAction command) throws Exception { + String fullMessage = output.toString(); + System.out.println("output: " + fullMessage); + assertTrue(fullMessage, fullMessage.contains("successfully")); + } + + private void checkExecutionFailure(AbstractAction command, String message) throws Exception { + String fullMessage = error.toString(); + System.out.println("error: " + fullMessage); + assertTrue(fullMessage, fullMessage.contains(message)); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index ad36598842..2547bb6b54 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.management; +import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException; import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; import org.apache.activemq.artemis.api.core.management.Parameter; import org.apache.activemq.artemis.api.core.management.ResourceNames; @@ -103,6 +104,14 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes proxy.invokeOperation("createAddress", name, routingType, defaultDeleteOnNoConsumers, defaultMaxConsumers); } + @Override + public void createAddress(@Parameter(name = "name", desc = "The name of the address") String name, + @Parameter(name = "routingType", desc = "The routing type for the address either 'MULTICAST' or 'ANYCAST'") String routingType, + @Parameter(name = "defaultDeleteOnNoConsumers", desc = "Whether or not a queue with this address is deleted when it has no consumers") boolean defaultDeleteOnNoConsumers, + @Parameter(name = "defaultMaxConsumers", desc = "The maximim number of consumer a queue with this address can have") int defaultMaxConsumers) throws Exception { + proxy.invokeOperation("createAddress", name, routingType, defaultDeleteOnNoConsumers, defaultMaxConsumers); + } + @Override public void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception { proxy.invokeOperation("deleteAddress", name); @@ -121,6 +130,17 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes proxy.invokeOperation("createQueue", address, name, durable); } + @Override + public void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, + @Parameter(name = "name", desc = "Name of the queue") String name, + @Parameter(name = "filter", desc = "Filter of the queue") String filter, + @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable, + @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers, + @Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers, + @Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception { + + } + @Override public void deployQueue(final String address, final String name, @@ -144,6 +164,12 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes proxy.invokeOperation("destroyQueue", name, removeConsumers); } + @Override + public void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name, + @Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers, + boolean autoDeleteAddress) throws Exception { + } + @Override public void disableMessageCounters() throws Exception { proxy.invokeOperation("disableMessageCounters"); @@ -631,6 +657,16 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes return (String) proxy.invokeOperation("listNetworkTopology"); } + @Override + public String getAddressInfo(String address) throws ActiveMQAddressDoesNotExistException { + return null; + } + + @Override + public String[] listBindingsForAddress(String address) throws Exception { + return new String[0]; + } + @Override public void removeAddressSettings(String addressMatch) throws Exception { proxy.invokeOperation("removeAddressSettings", addressMatch); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 00d220b8d7..3095cb5cb0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -1628,7 +1628,7 @@ public class MQTTTest extends MQTTTestSupport { addressInfo.setDefaultMaxQueueConsumers(0); getServer().createOrUpdateAddressInfo(addressInfo); - getServer().createQueue(coreAddress, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false); + getServer().createQueue(coreAddress, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false, false); MQTT mqtt = createMQTTConnection(); mqtt.setClientId(clientId); @@ -1674,7 +1674,7 @@ public class MQTTTest extends MQTTTestSupport { try { String clientId = "testMqtt"; SimpleString coreAddress = new SimpleString("foo.bar"); - getServer().createQueue(coreAddress, new SimpleString(clientId + "." + coreAddress), null, false, true, -1, true); + getServer().createQueue(coreAddress, new SimpleString(clientId + "." + coreAddress), null, false, true, -1, true, false); Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index 35fe5cc9ad..f2c844ee34 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.unit.core.server.impl.fakes; +import java.util.List; import java.util.Map; import java.util.Set; @@ -80,6 +81,11 @@ public class FakePostOffice implements PostOffice { return null; } + @Override + public List listQueuesForAddress(SimpleString address) throws Exception { + return null; + } + @Override public void addBinding(final Binding binding) throws Exception {