ARTEMIS-878 Update the CLI to incorporate Addresses and new Queue

This commit is contained in:
Martyn Taylor 2016-11-11 14:08:49 +00:00
parent a88853fe53
commit ec8f06138c
36 changed files with 1501 additions and 664 deletions

View File

@ -30,9 +30,11 @@ import org.apache.activemq.artemis.cli.commands.Kill;
import org.apache.activemq.artemis.cli.commands.Mask; import org.apache.activemq.artemis.cli.commands.Mask;
import org.apache.activemq.artemis.cli.commands.Run; import org.apache.activemq.artemis.cli.commands.Run;
import org.apache.activemq.artemis.cli.commands.Stop; import org.apache.activemq.artemis.cli.commands.Stop;
import org.apache.activemq.artemis.cli.commands.destination.CreateDestination; import org.apache.activemq.artemis.cli.commands.address.CreateAddress;
import org.apache.activemq.artemis.cli.commands.destination.DeleteDestination; import org.apache.activemq.artemis.cli.commands.address.DeleteAddress;
import org.apache.activemq.artemis.cli.commands.destination.HelpDestination; import org.apache.activemq.artemis.cli.commands.queue.CreateQueue;
import org.apache.activemq.artemis.cli.commands.queue.DeleteQueue;
import org.apache.activemq.artemis.cli.commands.queue.HelpQueue;
import org.apache.activemq.artemis.cli.commands.messages.Browse; import org.apache.activemq.artemis.cli.commands.messages.Browse;
import org.apache.activemq.artemis.cli.commands.messages.Consumer; import org.apache.activemq.artemis.cli.commands.messages.Consumer;
import org.apache.activemq.artemis.cli.commands.messages.Producer; import org.apache.activemq.artemis.cli.commands.messages.Producer;
@ -128,8 +130,11 @@ public class Artemis {
String instance = artemisInstance != null ? artemisInstance.getAbsolutePath() : System.getProperty("artemis.instance"); String instance = artemisInstance != null ? artemisInstance.getAbsolutePath() : System.getProperty("artemis.instance");
Cli.CliBuilder<Action> builder = Cli.<Action>builder("artemis").withDescription("ActiveMQ Artemis Command Line").withCommand(HelpAction.class).withCommand(Producer.class).withCommand(Consumer.class).withCommand(Browse.class).withCommand(Mask.class).withDefaultCommand(HelpAction.class); Cli.CliBuilder<Action> builder = Cli.<Action>builder("artemis").withDescription("ActiveMQ Artemis Command Line").withCommand(HelpAction.class).withCommand(Producer.class).withCommand(Consumer.class).withCommand(Browse.class).withCommand(Mask.class).withDefaultCommand(HelpAction.class);
builder.withGroup("destination").withDescription("Destination tools group (create|delete) (example ./artemis destination create)"). builder.withGroup("queue").withDescription("Queue tools group (create|delete) (example ./artemis queue create)").
withDefaultCommand(HelpDestination.class).withCommands(CreateDestination.class, DeleteDestination.class); withDefaultCommand(HelpQueue.class).withCommands(CreateQueue.class, DeleteQueue.class);
builder.withGroup("address").withDescription("Queue tools group (create|delete) (example ./artemis queue create)").
withDefaultCommand(HelpQueue.class).withCommands(CreateAddress.class, DeleteAddress.class);
if (instance != null) { if (instance != null) {
builder.withGroup("data").withDescription("data tools group (print|exp|imp|exp|encode|decode|compact) (example ./artemis data print)"). builder.withGroup("data").withDescription("data tools group (print|exp|imp|exp|encode|decode|compact) (example ./artemis data print)").

View File

@ -14,12 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.artemis.cli.commands.destination; package org.apache.activemq.artemis.cli.commands;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueRequestor;
import javax.jms.Session;
import io.airlift.airline.Option; import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@ -29,51 +24,10 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.management.JMSManagementHelper;
import org.apache.activemq.artemis.cli.commands.messages.ConnectionAbstract; import org.apache.activemq.artemis.cli.commands.messages.ConnectionAbstract;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQSession;
public abstract class DestinationAction extends ConnectionAbstract { public abstract class AbstractAction extends ConnectionAbstract {
public static final String JMS_QUEUE = "jms-queue";
public static final String JMS_TOPIC = "topic";
public static final String CORE_QUEUE = "core-queue";
@Option(name = "--type", description = "type of destination to be created (one of jms-queue, topic and core-queue, default jms-queue")
String destType = JMS_QUEUE;
@Option(name = "--name", description = "destination name")
String name;
public void performJmsManagement(ManagementCallback<Message> cb) throws Exception {
try (ActiveMQConnectionFactory factory = createConnectionFactory();
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
Queue managementQueue = ActiveMQJMSClient.createQueue("activemq.management");
QueueRequestor requestor = new QueueRequestor(session, managementQueue);
connection.start();
Message message = session.createMessage();
cb.setUpInvocation(message);
Message reply = requestor.request(message);
boolean result = JMSManagementHelper.hasOperationSucceeded(reply);
if (result) {
cb.requestSuccessful(reply);
} else {
cb.requestFailed(reply);
}
}
}
public void performCoreManagement(ManagementCallback<ClientMessage> cb) throws Exception { public void performCoreManagement(ManagementCallback<ClientMessage> cb) throws Exception {
@ -97,26 +51,6 @@ public abstract class DestinationAction extends ConnectionAbstract {
} }
} }
public void setName(String name) {
this.name = name;
}
public String getName() {
if (name == null) {
name = input("--name", "Please provide the destination name:", "");
}
return name;
}
public String getDestType() {
return destType;
}
public void setDestType(String destType) {
this.destType = destType;
}
public interface ManagementCallback<T> { public interface ManagementCallback<T> {
void setUpInvocation(T message) throws Exception; void setUpInvocation(T message) throws Exception;

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.address;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.cli.commands.AbstractAction;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@Command(name = "create", description = "create an address")
public class CreateAddress extends AbstractAction {
@Option(name = "--name", description = "The name of this address")
String name;
@Option(name = "--routingType", description = "The routing type of the address, options are 'anycast' or 'multicast', defaults to 1 = 'multicast'")
String routingType = "multicast";
@Option(name = "--defaultMaxConsumers", description = "Sets the default max consumers for any queues created under this address, default = -1 (no limit)")
int defaultMaxConsumers = -1;
@Option(name = "--defaultDeleteOnNoConsumers", description = "Sets the default delete on no consumers for any queues created under this address, default = false")
boolean defaultDeleteOnNoConsumers = false;
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
createAddress(context);
return null;
}
private void createAddress(final ActionContext context) throws Exception {
performCoreManagement(new ManagementCallback<ClientMessage>() {
@Override
public void setUpInvocation(ClientMessage message) throws Exception {
ManagementHelper.putOperationInvocation(message, "broker", "createAddress", getName(), routingType, defaultDeleteOnNoConsumers, defaultMaxConsumers);
}
@Override
public void requestSuccessful(ClientMessage reply) throws Exception {
context.out.println("Address " + getName() + " created successfully.");
}
@Override
public void requestFailed(ClientMessage reply) throws Exception {
String errMsg = (String) ManagementHelper.getResult(reply, String.class);
context.err.println("Failed to create address " + getName() + ". Reason: " + errMsg);
}
});
}
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
public String getRoutingType() {
return routingType;
}
public void setRoutingType(String routingType) {
this.routingType = routingType;
}
public int getDefaultMaxConsumers() {
return defaultMaxConsumers;
}
public void setDefaultMaxConsumers(int defaultMaxConsumers) {
this.defaultMaxConsumers = defaultMaxConsumers;
}
public boolean getDefaultDeleteOnNoConsumers() {
return defaultDeleteOnNoConsumers;
}
public void setDefaultDeleteOnNoConsumers(boolean defaultDeleteOnNoConsumers) {
this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers;
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.address;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.cli.commands.AbstractAction;
import org.apache.activemq.artemis.cli.commands.ActionContext;
@Command(name = "delete", description = "delete a queue")
public class DeleteAddress extends AbstractAction {
@Option(name = "--name", description = "The name of this address")
String name;
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
deleteAddress(context);
return null;
}
private void deleteAddress(final ActionContext context) throws Exception {
performCoreManagement(new ManagementCallback<ClientMessage>() {
@Override
public void setUpInvocation(ClientMessage message) throws Exception {
ManagementHelper.putOperationInvocation(message, "broker", "deleteAddress", getName());
}
@Override
public void requestSuccessful(ClientMessage reply) throws Exception {
context.out.println("Address " + getName() + " deleted successfully.");
}
@Override
public void requestFailed(ClientMessage reply) throws Exception {
String errMsg = (String) ManagementHelper.getResult(reply, String.class);
context.err.println("Failed to delete address " + getName() + ". Reason: " + errMsg);
}
});
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}

View File

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.artemis.cli.commands.destination; package org.apache.activemq.artemis.cli.commands.address;
import java.io.File; import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
@ -25,7 +25,7 @@ import io.airlift.airline.Help;
import org.apache.activemq.artemis.cli.commands.Action; import org.apache.activemq.artemis.cli.commands.Action;
import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.cli.commands.ActionContext;
public class HelpDestination extends Help implements Action { public class HelpAddress extends Help implements Action {
@Override @Override
public boolean isVerbose() { public boolean isVerbose() {
@ -49,7 +49,7 @@ public class HelpDestination extends Help implements Action {
@Override @Override
public Object execute(ActionContext context) throws Exception { public Object execute(ActionContext context) throws Exception {
List<String> commands = new ArrayList<>(1); List<String> commands = new ArrayList<>(1);
commands.add("destination"); commands.add("queue");
help(global, commands); help(global, commands);
return null; return null;
} }

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.address;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.cli.commands.AbstractAction;
import org.apache.activemq.artemis.cli.commands.ActionContext;
@Command(name = "show", description = "delete a queue")
public class ShowAddress extends AbstractAction {
@Option(name = "--name", description = "The name of this address")
String name;
@Option(name = "--bindings", description = "Shows the bindings for this address")
boolean bindings;
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
showAddress(context);
return null;
}
private void showAddress(final ActionContext context) throws Exception {
performCoreManagement(new ManagementCallback<ClientMessage>() {
@Override
public void setUpInvocation(ClientMessage message) throws Exception {
if (bindings) {
ManagementHelper.putOperationInvocation(message, "broker", "listBindingsForAddress", getName());
}
else {
ManagementHelper.putOperationInvocation(message, "broker", "getAddressInfo", getName());
}
}
@Override
public void requestSuccessful(ClientMessage reply) throws Exception {
String result = (String) ManagementHelper.getResult(reply, String.class);
context.out.println(result);
}
@Override
public void requestFailed(ClientMessage reply) throws Exception {
String errMsg = (String) ManagementHelper.getResult(reply, String.class);
context.err.println("Failed to show address " + getName() + ". Reason: " + errMsg);
}
});
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean isBindings() {
return bindings;
}
public void setBindings(boolean bindings) {
this.bindings = bindings;
}
}

View File

@ -1,147 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.destination;
import javax.jms.Message;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.jms.management.JMSManagementHelper;
import org.apache.activemq.artemis.cli.commands.ActionContext;
@Command(name = "create", description = "create a queue or topic")
public class CreateDestination extends DestinationAction {
@Option(name = "--filter", description = "queue's filter string (default null)")
String filter = null;
@Option(name = "--address", description = "address of the core queue (default queue's name)")
String address;
@Option(name = "--durable", description = "whether the queue is durable or not (default false)")
boolean durable = false;
@Option(name = "--bindings", description = "comma separated jndi binding names (default null)")
String bindings = null;
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
if (JMS_QUEUE.equals(destType)) {
createJmsQueue(context);
} else if (CORE_QUEUE.equals(destType)) {
createCoreQueue(context);
} else if (JMS_TOPIC.equals(destType)) {
createJmsTopic(context);
} else {
throw new IllegalArgumentException("--type can only be one of " + JMS_QUEUE + ", " + JMS_TOPIC + " and " + CORE_QUEUE);
}
return null;
}
private void createJmsTopic(final ActionContext context) throws Exception {
performJmsManagement(new ManagementCallback<Message>() {
@Override
public void setUpInvocation(Message message) throws Exception {
JMSManagementHelper.putOperationInvocation(message, "jms.server", "createTopic", getName(), bindings);
}
@Override
public void requestSuccessful(Message reply) throws Exception {
boolean result = (boolean) JMSManagementHelper.getResult(reply, Boolean.class);
if (result) {
context.out.println("Topic " + getName() + " created successfully.");
} else {
context.err.println("Failed to create topic " + getName() + ".");
}
}
@Override
public void requestFailed(Message reply) throws Exception {
String errorMsg = (String) JMSManagementHelper.getResult(reply, String.class);
context.err.println("Failed to create topic " + getName() + ". Reason: " + errorMsg);
}
});
}
public String getAddress() {
if (address == null || "".equals(address.trim())) {
address = getName();
}
return address.trim();
}
private void createCoreQueue(final ActionContext context) throws Exception {
performCoreManagement(new ManagementCallback<ClientMessage>() {
@Override
public void setUpInvocation(ClientMessage message) throws Exception {
String address = getAddress();
ManagementHelper.putOperationInvocation(message, "core.server", "createQueue", address, getName(), filter, durable);
}
@Override
public void requestSuccessful(ClientMessage reply) throws Exception {
context.out.println("Core queue " + getName() + " created successfully.");
}
@Override
public void requestFailed(ClientMessage reply) throws Exception {
String errMsg = (String) ManagementHelper.getResult(reply, String.class);
context.err.println("Failed to create queue " + getName() + ". Reason: " + errMsg);
}
});
}
private void createJmsQueue(final ActionContext context) throws Exception {
performJmsManagement(new ManagementCallback<Message>() {
@Override
public void setUpInvocation(Message message) throws Exception {
JMSManagementHelper.putOperationInvocation(message, "jms.server", "createQueue", getName(), bindings, filter, durable);
}
@Override
public void requestSuccessful(Message reply) throws Exception {
boolean result = (boolean) JMSManagementHelper.getResult(reply, Boolean.class);
if (result) {
context.out.println("Jms queue " + getName() + " created successfully.");
} else {
context.err.println("Failed to create jms queue " + getName() + ".");
}
}
@Override
public void requestFailed(Message reply) throws Exception {
String errorMsg = (String) JMSManagementHelper.getResult(reply, String.class);
context.err.println("Failed to create jms queue " + getName() + ". Reason: " + errorMsg);
}
});
}
public void setFilter(String filter) {
this.filter = filter;
}
public void setBindings(String bindings) {
this.bindings = bindings;
}
}

View File

@ -1,121 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.destination;
import javax.jms.Message;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.jms.management.JMSManagementHelper;
import org.apache.activemq.artemis.cli.commands.ActionContext;
@Command(name = "delete", description = "delete a queue or topic")
public class DeleteDestination extends DestinationAction {
@Option(name = "--removeConsumers", description = "whether deleting destination with consumers or not (default false)")
boolean removeConsumers = false;
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
if (JMS_QUEUE.equals(destType)) {
deleteJmsQueue(context);
} else if (CORE_QUEUE.equals(destType)) {
deleteCoreQueue(context);
} else if (JMS_TOPIC.equals(destType)) {
deleteJmsTopic(context);
} else {
throw new IllegalArgumentException("--type can only be one of " + JMS_QUEUE + ", " + JMS_TOPIC + " and " + CORE_QUEUE);
}
return null;
}
private void deleteJmsTopic(final ActionContext context) throws Exception {
performJmsManagement(new ManagementCallback<Message>() {
@Override
public void setUpInvocation(Message message) throws Exception {
JMSManagementHelper.putOperationInvocation(message, "jms.server", "destroyTopic", getName(), removeConsumers);
}
@Override
public void requestSuccessful(Message reply) throws Exception {
boolean result = (boolean) JMSManagementHelper.getResult(reply, Boolean.class);
if (result) {
context.out.println("Topic " + getName() + " deleted successfully.");
} else {
context.err.println("Failed to delete topic " + getName());
}
}
@Override
public void requestFailed(Message reply) throws Exception {
String errorMsg = (String) JMSManagementHelper.getResult(reply, String.class);
context.err.println("Failed to delete topic " + getName() + ". Reason: " + errorMsg);
}
});
}
private void deleteJmsQueue(final ActionContext context) throws Exception {
performJmsManagement(new ManagementCallback<Message>() {
@Override
public void setUpInvocation(Message message) throws Exception {
JMSManagementHelper.putOperationInvocation(message, "jms.server", "destroyQueue", getName(), removeConsumers);
}
@Override
public void requestSuccessful(Message reply) throws Exception {
boolean result = (boolean) JMSManagementHelper.getResult(reply, Boolean.class);
if (result) {
context.out.println("Jms queue " + getName() + " deleted successfully.");
} else {
context.err.println("Failed to delete queue " + getName());
}
}
@Override
public void requestFailed(Message reply) throws Exception {
String errorMsg = (String) JMSManagementHelper.getResult(reply, String.class);
context.err.println("Failed to create " + getName() + " with reason: " + errorMsg);
}
});
}
private void deleteCoreQueue(final ActionContext context) throws Exception {
performCoreManagement(new ManagementCallback<ClientMessage>() {
@Override
public void setUpInvocation(ClientMessage message) throws Exception {
ManagementHelper.putOperationInvocation(message, "core.server", "destroyQueue", getName());
}
@Override
public void requestSuccessful(ClientMessage reply) throws Exception {
context.out.println("Queue " + getName() + " deleted successfully.");
}
@Override
public void requestFailed(ClientMessage reply) throws Exception {
String errMsg = (String) ManagementHelper.getResult(reply, String.class);
context.err.println("Failed to delete queue " + getName() + ". Reason: " + errMsg);
}
});
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.queue;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.AbstractAction;
@Command(name = "create", description = "create a queue or topic")
public class CreateQueue extends AbstractAction {
@Option(name = "--name", description = "queue name")
String name;
@Option(name = "--filter", description = "queue's filter string (default null)")
String filter = null;
@Option(name = "--address", description = "address of the queue (default queue's name)")
String address;
@Option(name = "--durable", description = "whether the queue is durable or not (default false)")
boolean durable = false;
@Option(name = "--deleteOnNoConsumers", description = "whether to delete this queue when it's last consumers disconnects)")
boolean deleteOnNoConsumers = false;
@Option(name = "--maxConsumers", description = "Maximum number of consumers allowed on this queue at any one time (default no limit)")
int maxConsumers = -1;
@Option(name = "--autoCreateAddress", description = "Auto create the address (if it doesn't exist) with default values")
boolean autoCreateAddress = false;
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
createQueue(context);
return null;
}
public String getAddress() {
if (address == null || "".equals(address.trim())) {
address = getName();
}
return address.trim();
}
private void createQueue(final ActionContext context) throws Exception {
performCoreManagement(new ManagementCallback<ClientMessage>() {
@Override
public void setUpInvocation(ClientMessage message) throws Exception {
String address = getAddress();
ManagementHelper.putOperationInvocation(message, "broker", "createQueue", address, getName(), filter, durable, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
}
@Override
public void requestSuccessful(ClientMessage reply) throws Exception {
context.out.println("Core queue " + getName() + " created successfully.");
}
@Override
public void requestFailed(ClientMessage reply) throws Exception {
String errMsg = (String) ManagementHelper.getResult(reply, String.class);
context.err.println("Failed to create queue " + getName() + ". Reason: " + errMsg);
}
});
}
public void setFilter(String filter) {
this.filter = filter;
}
public void setAutoCreateAddress(boolean autoCreateAddress) {
this.autoCreateAddress = autoCreateAddress;
}
public void setMaxConsumers(int maxConsumers) {
this.maxConsumers = maxConsumers;
}
public void setDeleteOnNoConsumers(boolean deleteOnNoConsumers) {
this.deleteOnNoConsumers = deleteOnNoConsumers;
}
public void setAddress(String address) {
this.address = address;
}
public void setName(String name) {
this.name = name;
}
public String getName() {
if (name == null) {
name = input("--name", "Please provide the destination name:", "");
}
return name;
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.queue;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.AbstractAction;
@Command(name = "delete", description = "delete a queue")
public class DeleteQueue extends AbstractAction {
@Option(name = "--name", description = "queue name")
String name;
@Option(name = "--removeConsumers", description = "whether deleting destination with consumers or not (default false)")
boolean removeConsumers = false;
@Option(name = "--autoDeleteAddress", description = "delete the address if this it's last last queue")
boolean autoDeleteAddress = false;
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
deleteQueue(context);
return null;
}
private void deleteQueue(final ActionContext context) throws Exception {
performCoreManagement(new ManagementCallback<ClientMessage>() {
@Override
public void setUpInvocation(ClientMessage message) throws Exception {
ManagementHelper.putOperationInvocation(message, "broker", "destroyQueue", getName(), removeConsumers, autoDeleteAddress);
}
@Override
public void requestSuccessful(ClientMessage reply) throws Exception {
context.out.println("Queue " + getName() + " deleted successfully.");
}
@Override
public void requestFailed(ClientMessage reply) throws Exception {
String errMsg = (String) ManagementHelper.getResult(reply, String.class);
context.err.println("Failed to delete queue " + getName() + ". Reason: " + errMsg);
}
});
}
public void setRemoveConsumers(boolean removeConsumers) {
this.removeConsumers = removeConsumers;
}
public void setAutoDeleteAddress(boolean autoDeleteAddress) {
this.autoDeleteAddress = autoDeleteAddress;
}
public void setName(String name) {
this.name = name;
}
public String getName() {
if (name == null) {
name = input("--name", "Please provide the destination name:", "");
}
return name;
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.queue;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import io.airlift.airline.Help;
import org.apache.activemq.artemis.cli.commands.Action;
import org.apache.activemq.artemis.cli.commands.ActionContext;
public class HelpQueue extends Help implements Action {
@Override
public boolean isVerbose() {
return false;
}
@Override
public void setHomeValues(File brokerHome, File brokerInstance) {
}
@Override
public String getBrokerInstance() {
return null;
}
@Override
public String getBrokerHome() {
return null;
}
@Override
public Object execute(ActionContext context) throws Exception {
List<String> commands = new ArrayList<>(1);
commands.add("queue");
help(global, commands);
return null;
}
}

View File

@ -538,13 +538,16 @@ public class ArtemisTest {
// This is usually set when run from the command line via artemis.profile // This is usually set when run from the command line via artemis.profile
Run.setEmbedded(true); Run.setEmbedded(false);
Artemis.main("create", instanceFolder.getAbsolutePath(), "--force", "--silent", "--no-web", "--queues", queues, "--topics", topics, "--no-autotune", "--require-login"); Artemis.main("create", instanceFolder.getAbsolutePath(), "--force", "--silent", "--no-web", "--queues", queues, "--topics", topics, "--no-autotune", "--require-login");
System.setProperty("artemis.instance", instanceFolder.getAbsolutePath()); System.setProperty("artemis.instance", instanceFolder.getAbsolutePath());
// Some exceptions may happen on the initialization, but they should be ok on start the basic core protocol // Some exceptions may happen on the initialization, but they should be ok on start the basic core protocol
Artemis.internalExecute("run"); Artemis.internalExecute("run");
Artemis.main("queue", "create", "--name", "q1", "--address", "q1", "--user", "admin", "--password", "admin");
Artemis.main("queue", "create", "--name", "t2", "--address", "t2", "--user", "admin", "--password", "admin");
try { try {
try (ServerLocator locator = ServerLocatorImpl.newLocator("tcp://localhost:61616"); try (ServerLocator locator = ServerLocatorImpl.newLocator("tcp://localhost:61616");
ClientSessionFactory factory = locator.createSessionFactory(); ClientSessionFactory factory = locator.createSessionFactory();

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core;
/**
* An operation failed because an address exists on the server.
*/
public final class ActiveMQAddressDoesNotExistException extends ActiveMQException {
public ActiveMQAddressDoesNotExistException() {
super(ActiveMQExceptionType.ADDRESS_EXISTS);
}
public ActiveMQAddressDoesNotExistException(String msg) {
super(ActiveMQExceptionType.ADDRESS_EXISTS, msg);
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core;
/**
* An operation failed because an address exists on the server.
*/
public final class ActiveMQDeleteAddressException extends ActiveMQException {
public ActiveMQDeleteAddressException() {
super(ActiveMQExceptionType.DELETE_ADDRESS_ERROR);
}
public ActiveMQDeleteAddressException(String msg) {
super(ActiveMQExceptionType.DELETE_ADDRESS_ERROR, msg);
}
}

View File

@ -76,5 +76,4 @@ public class ActiveMQException extends Exception {
public String toString() { public String toString() {
return this.getClass().getSimpleName() + "[errorType=" + type + " message=" + getMessage() + "]"; return this.getClass().getSimpleName() + "[errorType=" + type + " message=" + getMessage() + "]";
} }
} }

View File

@ -106,6 +106,12 @@ public enum ActiveMQExceptionType {
return new ActiveMQSecurityException(msg); return new ActiveMQSecurityException(msg);
} }
}, },
ADDRESS_DOES_NOT_EXIST(106) {
@Override
public ActiveMQException createException(String msg) {
return new ActiveMQAddressDoesNotExistException(msg);
}
},
ADDRESS_EXISTS(107) { ADDRESS_EXISTS(107) {
@Override @Override
public ActiveMQException createException(String msg) { public ActiveMQException createException(String msg) {
@ -231,6 +237,12 @@ public enum ActiveMQExceptionType {
public ActiveMQException createException(String msg) { public ActiveMQException createException(String msg) {
return new ActiveMQInvalidQueueConfiguration(msg); return new ActiveMQInvalidQueueConfiguration(msg);
} }
},
DELETE_ADDRESS_ERROR(217) {
@Override
public ActiveMQException createException(String msg) {
return new ActiveMQDeleteAddressException(msg);
}
}; };
private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP; private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;

View File

@ -19,6 +19,8 @@ package org.apache.activemq.artemis.api.core.management;
import javax.management.MBeanOperationInfo; import javax.management.MBeanOperationInfo;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
/** /**
* An ActiveMQServerControl is used to manage ActiveMQ Artemis servers. * An ActiveMQServerControl is used to manage ActiveMQ Artemis servers.
*/ */
@ -438,6 +440,13 @@ public interface ActiveMQServerControl {
@Parameter(name = "defaultDeleteOnNoConsumers", desc = "Whether or not a queue with this address is deleted when it has no consumers") boolean defaultDeleteOnNoConsumers, @Parameter(name = "defaultDeleteOnNoConsumers", desc = "Whether or not a queue with this address is deleted when it has no consumers") boolean defaultDeleteOnNoConsumers,
@Parameter(name = "defaultMaxConsumers", desc = "The maximim number of consumer a queue with this address can have") int defaultMaxConsumers) throws Exception; @Parameter(name = "defaultMaxConsumers", desc = "The maximim number of consumer a queue with this address can have") int defaultMaxConsumers) throws Exception;
@Operation(desc = "create an address", impact = MBeanOperationInfo.ACTION)
void createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
@Parameter(name = "routingType", desc = "The routing type for the address either 'MULTICAST' or 'ANYCAST'") String routingType,
@Parameter(name = "defaultDeleteOnNoConsumers", desc = "Whether or not a queue with this address is deleted when it has no consumers") boolean defaultDeleteOnNoConsumers,
@Parameter(name = "defaultMaxConsumers", desc = "The maximim number of consumer a queue with this address can have") int defaultMaxConsumers) throws Exception;
@Operation(desc = "delete an address", impact = MBeanOperationInfo.ACTION) @Operation(desc = "delete an address", impact = MBeanOperationInfo.ACTION)
void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception; void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception;
@ -455,6 +464,7 @@ public interface ActiveMQServerControl {
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "name", desc = "Name of the queue") String name) throws Exception; @Parameter(name = "name", desc = "Name of the queue") String name) throws Exception;
/** /**
* Create a queue. * Create a queue.
* <br> * <br>
@ -489,6 +499,25 @@ public interface ActiveMQServerControl {
@Parameter(name = "name", desc = "Name of the queue") String name, @Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable) throws Exception; @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable) throws Exception;
/**
* Create a queue.
* <br>
* If {@code address} is {@code null} it will be defaulted to {@code name}.
* <br>
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
*
* @param address address to bind the queue to
* @param name name of the queue
* @param durable whether the queue is durable
*/
@Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION)
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "filter", desc = "Filter of the queue") String filter,
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
@Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers,
@Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception;
/** /**
* Deploy a durable queue. * Deploy a durable queue.
* <br> * <br>
@ -536,6 +565,14 @@ public interface ActiveMQServerControl {
void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name, void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name,
@Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers) throws Exception; @Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers) throws Exception;
/**
* Destroys the queue corresponding to the specified name and delete it's address if there are no other queues
*/
@Operation(desc = "Destroy a queue", impact = MBeanOperationInfo.ACTION)
void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name,
@Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers, boolean autoDeleteAddress) throws Exception;
/** /**
* Enables message counters for this server. * Enables message counters for this server.
*/ */
@ -901,5 +938,10 @@ public interface ActiveMQServerControl {
@Operation(desc = "List the Network Topology", impact = MBeanOperationInfo.INFO) @Operation(desc = "List the Network Topology", impact = MBeanOperationInfo.INFO)
String listNetworkTopology() throws Exception; String listNetworkTopology() throws Exception;
String getAddressInfo(String address) throws ActiveMQAddressDoesNotExistException;
@Operation(desc = "Get a list of bindings associated with an address", impact = MBeanOperationInfo.INFO)
String[] listBindingsForAddress(String address) throws Exception;
} }

View File

@ -42,6 +42,7 @@ import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
@ -49,6 +50,7 @@ import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.BridgeControl; import org.apache.activemq.artemis.api.core.management.BridgeControl;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.DivertControl; import org.apache.activemq.artemis.api.core.management.DivertControl;
import org.apache.activemq.artemis.api.core.management.Parameter;
import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.client.impl.Topology; import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
@ -62,8 +64,10 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting; import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles; import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.remoting.server.RemotingService; import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.CheckType;
@ -71,6 +75,7 @@ import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory; import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.JournalType;
@ -562,12 +567,21 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO(); clearIO();
try { try {
server.createOrUpdateAddressInfo(new AddressInfo(new SimpleString(name), AddressInfo.RoutingType.getType((byte)routingType), defaultDeleteOnNoConsumers, defaultMaxConsumers)); server.createAddressInfo(new AddressInfo(new SimpleString(name), AddressInfo.RoutingType.getType((byte) routingType), defaultDeleteOnNoConsumers, defaultMaxConsumers));
} finally { } finally {
blockOnIO(); blockOnIO();
} }
} }
@Override
public void createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
@Parameter(name = "routingType", desc = "The routing type for the address either 'MULTICAST' or 'ANYCAST'") String routingType,
@Parameter(name = "defaultDeleteOnNoConsumers", desc = "Whether or not a queue with this address is deleted when it has no consumers") boolean defaultDeleteOnNoConsumers,
@Parameter(name = "defaultMaxConsumers", desc = "The maximim number of consumer a queue with this address can have") int defaultMaxConsumers) throws Exception {
AddressInfo.RoutingType rt = AddressInfo.RoutingType.valueOf(routingType.toUpperCase());
createAddress(name, rt.ordinal(), defaultDeleteOnNoConsumers, defaultMaxConsumers);
}
@Override @Override
public void deleteAddress(String name) throws Exception { public void deleteAddress(String name) throws Exception {
checkStarted(); checkStarted();
@ -632,6 +646,30 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
} }
} }
@Override
public void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "filter", desc = "Filter of the queue") String filterStr,
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
@Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers,
@Parameter(name = "autoCreateAddress", desc = "Create an address with default values if one does not exist") boolean autoCreateAddress) throws Exception {
checkStarted();
clearIO();
SimpleString filter = filterStr == null ? null : new SimpleString(filterStr);
try {
if (filterStr != null && !filterStr.trim().equals("")) {
filter = new SimpleString(filterStr);
}
server.createQueue(SimpleString.toSimpleString(address), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
} finally {
blockOnIO();
}
}
@Override @Override
public void createQueue(final String address, public void createQueue(final String address,
final String name, final String name,
@ -727,23 +765,51 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
} }
@Override @Override
public void destroyQueue(final String name, final boolean removeConsumers) throws Exception { public void destroyQueue(final String name, final boolean removeConsumers, final boolean autoDeleteAddress) throws Exception {
checkStarted(); checkStarted();
clearIO(); clearIO();
try { try {
SimpleString queueName = new SimpleString(name); SimpleString queueName = new SimpleString(name);
server.destroyQueue(queueName, null, !removeConsumers, removeConsumers); server.destroyQueue(queueName, null, !removeConsumers, removeConsumers, autoDeleteAddress);
} finally { } finally {
blockOnIO(); blockOnIO();
} }
} }
@Override
public void destroyQueue(final String name, final boolean removeConsumers) throws Exception {
destroyQueue(name, removeConsumers, false);
}
@Override @Override
public void destroyQueue(final String name) throws Exception { public void destroyQueue(final String name) throws Exception {
destroyQueue(name, false); destroyQueue(name, false);
} }
@Override
public String getAddressInfo(String address) throws ActiveMQAddressDoesNotExistException {
AddressInfo addressInfo = server.getAddressInfo(SimpleString.toSimpleString(address));
if (addressInfo == null) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(address));
}
else {
return addressInfo.toString();
}
}
@Override
public String[] listBindingsForAddress(String address) throws Exception {
Bindings bindings = server.getPostOffice().getBindingsForAddress(new SimpleString(address));
List<String> result = new ArrayList<>(bindings.getBindings().size());
int i = 0;
for (Binding binding : bindings.getBindings()) {
}
return (String[]) result.toArray();
}
@Override @Override
public int getConnectionCount() { public int getConnectionCount() {
checkStarted(); checkStarted();

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.core.postoffice; package org.apache.activemq.artemis.core.postoffice;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -50,6 +51,8 @@ public interface PostOffice extends ActiveMQComponent {
AddressInfo getAddressInfo(SimpleString address); AddressInfo getAddressInfo(SimpleString address);
List<Queue> listQueuesForAddress(SimpleString address) throws Exception;
void addBinding(Binding binding) throws Exception; void addBinding(Binding binding) throws Exception;
Binding removeBinding(SimpleString uniqueName, Transaction tx, boolean deleteData) throws Exception; Binding removeBinding(SimpleString uniqueName, Transaction tx, boolean deleteData) throws Exception;

View File

@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.BindingsFactory; import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.QueueInfo; import org.apache.activemq.artemis.core.postoffice.QueueInfo;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
@ -131,6 +132,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
private final ActiveMQServer server; private final ActiveMQServer server;
private Object addressLock = new Object();
public PostOfficeImpl(final ActiveMQServer server, public PostOfficeImpl(final ActiveMQServer server,
final StorageManager storageManager, final StorageManager storageManager,
final PagingManager pagingManager, final PagingManager pagingManager,
@ -420,6 +423,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override @Override
public AddressInfo addAddressInfo(AddressInfo addressInfo) { public AddressInfo addAddressInfo(AddressInfo addressInfo) {
synchronized (addressLock) {
try { try {
managementService.registerAddress(addressInfo); managementService.registerAddress(addressInfo);
} catch (Exception e) { } catch (Exception e) {
@ -427,9 +431,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
} }
return addressManager.addAddressInfo(addressInfo); return addressManager.addAddressInfo(addressInfo);
} }
}
@Override @Override
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) { public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
synchronized (addressLock) {
try { try {
managementService.registerAddress(addressInfo); managementService.registerAddress(addressInfo);
} catch (Exception e) { } catch (Exception e) {
@ -437,21 +443,40 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
} }
return addressManager.addOrUpdateAddressInfo(addressInfo); return addressManager.addOrUpdateAddressInfo(addressInfo);
} }
}
@Override @Override
public AddressInfo removeAddressInfo(SimpleString address) { public AddressInfo removeAddressInfo(SimpleString address) throws Exception {
try { synchronized (addressLock) {
getServer().getManagementService().unregisterAddress(address); Bindings bindingsForAddress = getBindingsForAddress(address);
} catch (Exception e) { if (bindingsForAddress.getBindings().size() > 0) {
e.printStackTrace(); throw new IllegalStateException("Address has bindings");
} }
managementService.unregisterAddress(address);
return addressManager.removeAddressInfo(address); return addressManager.removeAddressInfo(address);
} }
}
@Override @Override
public AddressInfo getAddressInfo(SimpleString addressName) { public AddressInfo getAddressInfo(SimpleString addressName) {
synchronized (addressLock) {
return addressManager.getAddressInfo(addressName); return addressManager.getAddressInfo(addressName);
} }
}
@Override
public List<Queue> listQueuesForAddress(SimpleString address) throws Exception {
Bindings bindingsForAddress = getBindingsForAddress(address);
List<Queue> queues = new ArrayList<>();
for (Binding b : bindingsForAddress.getBindings()) {
if (b instanceof QueueBinding) {
Queue q = ((QueueBinding) b).getQueue();
queues.add(q);
}
}
return queues;
}
// TODO - needs to be synchronized to prevent happening concurrently with activate() // TODO - needs to be synchronized to prevent happening concurrently with activate()
// (and possible removeBinding and other methods) // (and possible removeBinding and other methods)

View File

@ -18,9 +18,12 @@ package org.apache.activemq.artemis.core.server;
import java.io.File; import java.io.File;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException; import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException;
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException; import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException;
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException; import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateMetaDataException; import org.apache.activemq.artemis.api.core.ActiveMQDuplicateMetaDataException;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -390,4 +393,13 @@ public interface ActiveMQMessageBundle {
@Message(id = 119202, value = "Invalid Queue Configuration for Queue {0}, Address {1}. Expected {2} to be {3} but was {4}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 119202, value = "Invalid Queue Configuration for Queue {0}, Address {1}. Expected {2} to be {3} but was {4}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQInvalidQueueConfiguration invalidQueueConfiguration(SimpleString address, SimpleString queueName, String queuePropertyName, Object expectedValue, Object actualValue); ActiveMQInvalidQueueConfiguration invalidQueueConfiguration(SimpleString address, SimpleString queueName, String queuePropertyName, Object expectedValue, Object actualValue);
@Message(id = 119203, value = "Address Does Not Exist: {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQAddressDoesNotExistException addressDoesNotExist(SimpleString address);
@Message(id = 119204, value = "Address already exists: {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQAddressExistsException addressAlreadyExists(SimpleString address);
@Message(id = 119205, value = "Address {0} has bindings", format = Message.Format.MESSAGE_FORMAT)
ActiveMQDeleteAddressException addressHasBindings(SimpleString address);
} }

View File

@ -22,6 +22,7 @@ import java.util.Set;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.google.common.collect.Queues;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
@ -122,6 +123,12 @@ public interface ActiveMQServer extends ActiveMQComponent {
*/ */
ActiveMQServerControlImpl getActiveMQServerControl(); ActiveMQServerControlImpl getActiveMQServerControl();
void destroyQueue(SimpleString queueName,
SecurityAuth session,
boolean checkConsumerCount,
boolean removeConsumers,
boolean autoDeleteAddress) throws Exception;
void registerActivateCallback(ActivateCallback callback); void registerActivateCallback(ActivateCallback callback);
void unregisterActivateCallback(ActivateCallback callback); void unregisterActivateCallback(ActivateCallback callback);
@ -289,7 +296,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
boolean durable, boolean durable,
boolean temporary, boolean temporary,
Integer maxConsumers, Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception; Boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
Queue createQueue(SimpleString address, Queue createQueue(SimpleString address,
SimpleString queueName, SimpleString queueName,
@ -305,7 +313,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
boolean durable, boolean durable,
boolean temporary, boolean temporary,
Integer maxConsumers, Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception; Boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
Queue createQueue(SimpleString address, Queue createQueue(SimpleString address,
SimpleString queueName, SimpleString queueName,
@ -323,7 +332,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
boolean temporary, boolean temporary,
boolean autoCreated, boolean autoCreated,
Integer maxConsumers, Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception; Boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
Queue deployQueue(SimpleString address, Queue deployQueue(SimpleString address,
SimpleString queueName, SimpleString queueName,
@ -351,7 +361,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
boolean temporary, boolean temporary,
boolean autoCreated, boolean autoCreated,
Integer maxConsumers, Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception; Boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
void destroyQueue(SimpleString queueName) throws Exception; void destroyQueue(SimpleString queueName) throws Exception;
@ -414,7 +425,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
boolean transientQueue, boolean transientQueue,
boolean autoCreated, boolean autoCreated,
Integer maxConsumers, Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception; Boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
/* /*
* add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will * add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will
@ -451,6 +463,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
AddressInfo putAddressInfoIfAbsent(AddressInfo addressInfo) throws Exception; AddressInfo putAddressInfoIfAbsent(AddressInfo addressInfo) throws Exception;
void createAddressInfo(AddressInfo addressInfo) throws Exception;
AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception; AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception;
void removeAddressInfo(SimpleString address) throws Exception; void removeAddressInfo(SimpleString address) throws Exception;

View File

@ -785,7 +785,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT) format = Message.Format.MESSAGE_FORMAT)
void noQueueIdDefined(ServerMessage message, ServerMessage messageCopy, SimpleString idsHeaderName); void noQueueIdDefined(ServerMessage message, ServerMessage messageCopy, SimpleString idsHeaderName);
@LogMessage(level = Logger.Level.WARN) @LogMessage(level = Logger.Level.TRACE)
@Message(id = 222111, value = "exception while invoking {0} on {1}", @Message(id = 222111, value = "exception while invoking {0} on {1}",
format = Message.Format.MESSAGE_FORMAT) format = Message.Format.MESSAGE_FORMAT)
void managementOperationError(@Cause Exception e, String op, String resourceName); void managementOperationError(@Cause Exception e, String op, String resourceName);

View File

@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
@ -1476,8 +1477,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean durable, final boolean durable,
final boolean temporary, final boolean temporary,
final Integer maxConsumers, final Integer maxConsumers,
final Boolean deleteOnNoConsumers) throws Exception { final Boolean deleteOnNoConsumers,
return createQueue(address, queueName, filterString, null, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers); final boolean autoCreateAddress) throws Exception {
return createQueue(address, queueName, filterString, null, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
} }
@Override @Override
@ -1498,8 +1500,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
boolean durable, boolean durable,
boolean temporary, boolean temporary,
Integer maxConsumers, Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception { Boolean deleteOnNoConsumers,
return createQueue(address, queueName, filter, user, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers); boolean autoCreateAddress) throws Exception {
return createQueue(address, queueName, filter, user, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
} }
@Override @Override
@ -1522,8 +1525,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
boolean temporary, boolean temporary,
boolean autoCreated, boolean autoCreated,
Integer maxConsumers, Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception { Boolean deleteOnNoConsumers,
return createQueue(address, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, deleteOnNoConsumers); boolean autoCreateAddress) throws Exception {
return createQueue(address, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
} }
@Override @Override
@ -1585,7 +1589,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean durable, final boolean durable,
final boolean temporary, final boolean temporary,
final boolean autoCreated) throws Exception { final boolean autoCreated) throws Exception {
return deployQueue(address, queueName, filterString, durable, temporary, autoCreated, null, null); return deployQueue(address, queueName, filterString, durable, temporary, autoCreated, null, null, true);
} }
@Override @Override
@ -1596,12 +1600,13 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean temporary, final boolean temporary,
final boolean autoCreated, final boolean autoCreated,
final Integer maxConsumers, final Integer maxConsumers,
final Boolean deleteOnNoConsumers) throws Exception { final Boolean deleteOnNoConsumers,
final boolean autoCreateAddress) throws Exception {
// TODO: fix logging here as this could be for a topic or queue // TODO: fix logging here as this could be for a topic or queue
ActiveMQServerLogger.LOGGER.deployQueue(queueName); ActiveMQServerLogger.LOGGER.deployQueue(queueName);
return createQueue(address, queueName, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers); return createQueue(address, queueName, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
} }
@Override @Override
@ -1621,7 +1626,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
public void destroyQueue(final SimpleString queueName, public void destroyQueue(final SimpleString queueName,
final SecurityAuth session, final SecurityAuth session,
final boolean checkConsumerCount) throws Exception { final boolean checkConsumerCount) throws Exception {
destroyQueue(queueName, session, checkConsumerCount, false); destroyQueue(queueName, session, checkConsumerCount, false, true);
} }
@Override @Override
@ -1629,6 +1634,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final SecurityAuth session, final SecurityAuth session,
final boolean checkConsumerCount, final boolean checkConsumerCount,
final boolean removeConsumers) throws Exception { final boolean removeConsumers) throws Exception {
destroyQueue(queueName, session, checkConsumerCount, removeConsumers, true);
}
@Override
public void destroyQueue(final SimpleString queueName,
final SecurityAuth session,
final boolean checkConsumerCount,
final boolean removeConsumers,
final boolean autoDeleteAddress) throws Exception {
if (postOffice == null) { if (postOffice == null) {
return; return;
} }
@ -1662,6 +1676,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
queue.deleteQueue(removeConsumers); queue.deleteQueue(removeConsumers);
if (autoDeleteAddress && postOffice != null) {
try {
postOffice.removeAddressInfo(address);
} catch (ActiveMQDeleteAddressException e) {
// Could be thrown if the address has bindings or is not deletable.
}
}
callPostQueueDeletionCallbacks(address, queueName); callPostQueueDeletionCallbacks(address, queueName);
} }
@ -2101,15 +2123,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// Deploy any predefined queues // Deploy any predefined queues
deployQueuesFromConfiguration(); deployQueuesFromConfiguration();
registerPostQueueDeletionCallback(new PostQueueDeletionCallback() { // registerPostQueueDeletionCallback(new PostQueueDeletionCallback() {
// TODO delete auto-created addresses when queueCount == 0 // // TODO delete auto-created addresses when queueCount == 0
@Override // @Override
public void callback(SimpleString address, SimpleString queueName) throws Exception { // public void callback(SimpleString address, SimpleString queueName) throws Exception {
if (getAddressInfo(address).isAutoCreated() && postOffice.getBindingsForAddress(address).getBindings().size() == 0) { // if (getAddressInfo(address).isAutoCreated()) {
removeAddressInfo(address); // removeAddressInfo(address);
} // }
} // }
}); // });
// We need to call this here, this gives any dependent server a chance to deploy its own addresses // We need to call this here, this gives any dependent server a chance to deploy its own addresses
// this needs to be done before clustering is fully activated // this needs to be done before clustering is fully activated
@ -2200,7 +2222,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception { private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception {
for (CoreQueueConfiguration config : queues) { for (CoreQueueConfiguration config : queues) {
deployQueue(SimpleString.toSimpleString(config.getAddress()), SimpleString.toSimpleString(config.getName()), SimpleString.toSimpleString(config.getFilterString()), config.isDurable(), false, false, config.getMaxConsumers(), config.getDeleteOnNoConsumers()); deployQueue(SimpleString.toSimpleString(config.getAddress()), SimpleString.toSimpleString(config.getName()), SimpleString.toSimpleString(config.getFilterString()), config.isDurable(), false, false, config.getMaxConsumers(), config.getDeleteOnNoConsumers(), true);
} }
} }
@ -2315,6 +2337,13 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return result; return result;
} }
@Override
public void createAddressInfo(AddressInfo addressInfo) throws Exception {
if (putAddressInfoIfAbsent(addressInfo) != null) {
throw ActiveMQMessageBundle.BUNDLE.addressAlreadyExists(addressInfo.getName());
}
}
@Override @Override
public AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception { public AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception {
AddressInfo result = postOffice.addOrUpdateAddressInfo(addressInfo); AddressInfo result = postOffice.addOrUpdateAddressInfo(addressInfo);
@ -2329,7 +2358,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override @Override
public void removeAddressInfo(SimpleString address) throws Exception { public void removeAddressInfo(SimpleString address) throws Exception {
postOffice.removeAddressInfo(address); if (postOffice.removeAddressInfo(address) == null) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
};
// TODO: is this the right way to do this? // TODO: is this the right way to do this?
// long txID = storageManager.generateID(); // long txID = storageManager.generateID();
@ -2357,17 +2388,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean ignoreIfExists, final boolean ignoreIfExists,
final boolean transientQueue, final boolean transientQueue,
final boolean autoCreated) throws Exception { final boolean autoCreated) throws Exception {
return createQueue(addressName, return createQueue(addressName, queueName, filterString, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, null, null, true);
queueName,
filterString,
user,
durable,
temporary,
ignoreIfExists,
transientQueue,
autoCreated,
null,
null);
} }
@Override @Override
@ -2381,7 +2402,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean transientQueue, final boolean transientQueue,
final boolean autoCreated, final boolean autoCreated,
final Integer maxConsumers, final Integer maxConsumers,
final Boolean deleteOnNoConsumers) throws Exception { final Boolean deleteOnNoConsumers,
final boolean autoCreateAddress) throws Exception {
final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName); final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
if (binding != null) { if (binding != null) {
if (ignoreIfExists) { if (ignoreIfExists) {
@ -2404,34 +2427,26 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} }
AddressInfo defaultAddressInfo = new AddressInfo(addressName); AddressInfo defaultAddressInfo = new AddressInfo(addressName);
// FIXME This boils down to a putIfAbsent (avoids race). This should be reflected in the API.
AddressInfo info = postOffice.getAddressInfo(addressName); AddressInfo info = postOffice.getAddressInfo(addressName);
if (info == null) { if (info == null) {
if (autoCreateAddress) {
info = defaultAddressInfo; info = defaultAddressInfo;
} else {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName);
}
} }
final boolean isDeleteOnNoConsumers = deleteOnNoConsumers == null ? info.isDefaultDeleteOnNoConsumers() : deleteOnNoConsumers; final boolean isDeleteOnNoConsumers = deleteOnNoConsumers == null ? info.isDefaultDeleteOnNoConsumers() : deleteOnNoConsumers;
final int noMaxConsumers = maxConsumers == null ? info.getDefaultMaxQueueConsumers() : maxConsumers; final int noMaxConsumers = maxConsumers == null ? info.getDefaultMaxQueueConsumers() : maxConsumers;
final QueueConfig queueConfig = queueConfigBuilder final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).deleteOnNoConsumers(isDeleteOnNoConsumers).maxConsumers(noMaxConsumers).build();
.filter(filter)
.pagingManager(pagingManager)
.user(user)
.durable(durable)
.temporary(temporary)
.autoCreated(autoCreated)
.deleteOnNoConsumers(isDeleteOnNoConsumers)
.maxConsumers(noMaxConsumers)
.build();
final Queue queue = queueFactory.createQueueWith(queueConfig); final Queue queue = queueFactory.createQueueWith(queueConfig);
boolean addressAlreadyExists = true; boolean addressAlreadyExists = true;
if (postOffice.getAddressInfo(queue.getAddress()) == null) { if (postOffice.getAddressInfo(queue.getAddress()) == null) {
postOffice.addAddressInfo(new AddressInfo(queue.getAddress()) postOffice.addAddressInfo(new AddressInfo(queue.getAddress()).setRoutingType(AddressInfo.RoutingType.MULTICAST).setDefaultMaxQueueConsumers(maxConsumers == null ? ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers() : maxConsumers));
.setRoutingType(AddressInfo.RoutingType.MULTICAST)
.setDefaultMaxQueueConsumers(maxConsumers == null ? ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers() : maxConsumers));
addressAlreadyExists = false; addressAlreadyExists = false;
} }

View File

@ -31,6 +31,8 @@ public class AddressInfo {
private boolean autoCreated = false; private boolean autoCreated = false;
private boolean deletable = false;
public AddressInfo(SimpleString name) { public AddressInfo(SimpleString name) {
this.name = name; this.name = name;
} }
@ -85,7 +87,7 @@ public class AddressInfo {
@Override @Override
public String toString() { public String toString() {
StringBuffer buff = new StringBuffer(); StringBuffer buff = new StringBuffer();
buff.append("AddressInfo [name=" + name); buff.append("Address [name=" + name);
buff.append(", routingType=" + routingType); buff.append(", routingType=" + routingType);
buff.append(", defaultMaxQueueConsumers=" + defaultMaxQueueConsumers); buff.append(", defaultMaxQueueConsumers=" + defaultMaxQueueConsumers);
buff.append(", defaultDeleteOnNoConsumers=" + defaultDeleteOnNoConsumers); buff.append(", defaultDeleteOnNoConsumers=" + defaultDeleteOnNoConsumers);

View File

@ -1421,7 +1421,8 @@ public class QueueImpl implements Queue {
@Override @Override
public void deleteQueue(boolean removeConsumers) throws Exception { public void deleteQueue(boolean removeConsumers) throws Exception {
synchronized (this) { synchronized (this) {
if (this.queueDestroyed) return; if (this.queueDestroyed)
return;
this.queueDestroyed = true; this.queueDestroyed = true;
} }
@ -1454,7 +1455,6 @@ public class QueueImpl implements Queue {
tx.rollback(); tx.rollback();
throw e; throw e;
} }
} }
@Override @Override
@ -1887,6 +1887,7 @@ public class QueueImpl implements Queue {
return "QueueImpl[name=" + name.toString() + ", postOffice=" + this.postOffice + ", temp=" + this.temporary + "]@" + Integer.toHexString(System.identityHashCode(this)); return "QueueImpl[name=" + name.toString() + ", postOffice=" + this.postOffice + ", temp=" + this.temporary + "]@" + Integer.toHexString(System.identityHashCode(this));
} }
private synchronized void internalAddTail(final MessageReference ref) { private synchronized void internalAddTail(final MessageReference ref) {
refAdded(ref); refAdded(ref);
messageReferences.addTail(ref, getPriority(ref)); messageReferences.addTail(ref, getPriority(ref));
@ -2960,8 +2961,6 @@ public class QueueImpl implements Queue {
return false; return false;
} }
@Override @Override
public MessageReference next() { public MessageReference next() {

View File

@ -510,7 +510,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
server.checkQueueCreationLimit(getUsername()); server.checkQueueCreationLimit(getUsername());
Queue queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers); Queue queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, maxConsumers, deleteOnNoConsumers, true);
if (temporary) { if (temporary) {
// Temporary queue in core simply means the queue will be deleted if // Temporary queue in core simply means the queue will be deleted if

View File

@ -739,7 +739,6 @@ public class ManagementServiceImpl implements ManagementService {
} }
Object result = method.invoke(resource, params); Object result = method.invoke(resource, params);
return result; return result;
} }

View File

@ -229,7 +229,7 @@ public class AddressingTest extends ActiveMQTestBase {
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
// For each address, create 2 Queues with the same address, assert both queues receive message // For each address, create 2 Queues with the same address, assert both queues receive message
boolean deleteOnNoConsumers = true; boolean deleteOnNoConsumers = true;
Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers); Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers, false);
ClientSession session = sessionFactory.createSession(); ClientSession session = sessionFactory.createSession();
session.start(); session.start();
@ -246,7 +246,7 @@ public class AddressingTest extends ActiveMQTestBase {
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
// For each address, create 2 Queues with the same address, assert both queues receive message // For each address, create 2 Queues with the same address, assert both queues receive message
boolean deleteOnNoConsumers = false; boolean deleteOnNoConsumers = false;
Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers); Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers, false);
ClientSession session = sessionFactory.createSession(); ClientSession session = sessionFactory.createSession();
session.start(); session.start();
@ -263,7 +263,7 @@ public class AddressingTest extends ActiveMQTestBase {
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
// For each address, create 2 Queues with the same address, assert both queues receive message // For each address, create 2 Queues with the same address, assert both queues receive message
boolean deleteOnNoConsumers = false; boolean deleteOnNoConsumers = false;
Queue q1 = server.createQueue(address, queueName, null, true, false, 0, deleteOnNoConsumers); Queue q1 = server.createQueue(address, queueName, null, true, false, 0, deleteOnNoConsumers, false);
Exception expectedException = null; Exception expectedException = null;
String expectedMessage = "Maximum Consumer Limit Reached on Queue"; String expectedMessage = "Maximum Consumer Limit Reached on Queue";
@ -290,7 +290,7 @@ public class AddressingTest extends ActiveMQTestBase {
SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString()); SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
// For each address, create 2 Queues with the same address, assert both queues receive message // For each address, create 2 Queues with the same address, assert both queues receive message
boolean deleteOnNoConsumers = false; boolean deleteOnNoConsumers = false;
Queue q1 = server.createQueue(address, queueName, null, true, false, -1, deleteOnNoConsumers); Queue q1 = server.createQueue(address, queueName, null, true, false, -1, deleteOnNoConsumers, false);
ClientSession session = sessionFactory.createSession(); ClientSession session = sessionFactory.createSession();
session.start(); session.start();
@ -310,7 +310,7 @@ public class AddressingTest extends ActiveMQTestBase {
boolean deleteOnNoConsumers = false; boolean deleteOnNoConsumers = false;
AddressInfo addressInfo = new AddressInfo(address); AddressInfo addressInfo = new AddressInfo(address);
addressInfo.setDefaultMaxQueueConsumers(0); addressInfo.setDefaultMaxQueueConsumers(0);
Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers); Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers, false);
ClientSession session = sessionFactory.createSession(); ClientSession session = sessionFactory.createSession();
session.start(); session.start();

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cli;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.cli.commands.AbstractAction;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.address.CreateAddress;
import org.apache.activemq.artemis.cli.commands.address.DeleteAddress;
import org.apache.activemq.artemis.cli.commands.address.ShowAddress;
import org.apache.activemq.artemis.cli.commands.queue.CreateQueue;
import org.apache.activemq.artemis.cli.commands.queue.DeleteQueue;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.junit.Before;
import org.junit.Test;
public class AddressCommandTest extends JMSTestBase {
//the command
private ByteArrayOutputStream output;
private ByteArrayOutputStream error;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
this.output = new ByteArrayOutputStream(1024);
this.error = new ByteArrayOutputStream(1024);
}
@Test
public void testCreateAddress() throws Exception {
String address = "address";
CreateAddress command = new CreateAddress();
command.setName(address);
command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionPassed(command);
assertNotNull(server.getAddressInfo(new SimpleString(address)));
}
@Test
public void testCreateAddressAlreadyExistsShowsError() throws Exception {
String address = "address";
CreateAddress command = new CreateAddress();
command.setName(address);
command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionPassed(command);
assertNotNull(server.getAddressInfo(new SimpleString(address)));
command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionFailure(command, "Address already exists");
}
@Test
public void testDeleteAddress() throws Exception {
String address = "address";
CreateAddress command = new CreateAddress();
command.setName(address);
command.execute(new ActionContext());
assertNotNull(server.getAddressInfo(new SimpleString(address)));
DeleteAddress deleteAddress = new DeleteAddress();
deleteAddress.setName(address);
deleteAddress.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionPassed(deleteAddress);
assertNull(server.getAddressInfo(new SimpleString(address)));
}
@Test
public void testDeleteAddressDoesNotExistsShowsError() throws Exception {
String address = "address";
DeleteAddress deleteAddress = new DeleteAddress();
deleteAddress.setName(address);
deleteAddress.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionFailure(deleteAddress, "Address Does Not Exist");
}
@Test
public void testShowAddress() throws Exception {
String address = "address";
CreateAddress command = new CreateAddress();
command.setName(address);
command.execute(new ActionContext());
assertNotNull(server.getAddressInfo(new SimpleString(address)));
ShowAddress showAddress = new ShowAddress();
showAddress.setName(address);
showAddress.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
System.out.println(output.toString());
}
@Test
public void testShowAddressDoesNotExist() throws Exception {
String address = "address";
ShowAddress showAddress = new ShowAddress();
showAddress.setName(address);
showAddress.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionFailure(showAddress, "Address Does Not Exist");
}
@Test
public void testShowAddressBindings() throws Exception {
// Create bindings
SimpleString address = new SimpleString("address");
server.createAddressInfo(new AddressInfo(address));
server.createQueue(address, new SimpleString("queue1"), null, true, false);
server.createQueue(address, new SimpleString("queue2"), null, true, false);
server.createQueue(address, new SimpleString("queue3"), null, true, false);
DivertConfiguration divertConfiguration = new DivertConfiguration();
divertConfiguration.setName(address.toString());
divertConfiguration.setAddress(address.toString());
server.deployDivert(divertConfiguration);
ShowAddress showAddress = new ShowAddress();
showAddress.setName(address.toString());
showAddress.setBindings(true);
showAddress.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
System.out.println(output.toString());
}
private void checkExecutionPassed(AbstractAction command) throws Exception {
String fullMessage = output.toString();
System.out.println("output: " + fullMessage);
assertTrue(fullMessage, fullMessage.contains("successfully"));
}
private void checkExecutionFailure(AbstractAction command, String message) throws Exception {
String fullMessage = error.toString();
System.out.println("error: " + fullMessage);
assertTrue(fullMessage, fullMessage.contains(message));
}
}

View File

@ -1,226 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cli;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.destination.CreateDestination;
import org.apache.activemq.artemis.cli.commands.destination.DeleteDestination;
import org.apache.activemq.artemis.cli.commands.destination.DestinationAction;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.junit.Before;
import org.junit.Test;
public class DestinationCommandTest extends JMSTestBase {
//the command
private ByteArrayOutputStream output;
private ByteArrayOutputStream error;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
this.output = new ByteArrayOutputStream(1024);
this.error = new ByteArrayOutputStream(1024);
}
@Test
public void testCreateJmsQueue() throws Exception {
CreateDestination command = new CreateDestination();
command.setName("jmsQueue1");
command.setBindings("jmsQueue1Binding");
command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionResult(command);
}
@Test
public void testDeleteJmsQueue() throws Exception {
CreateDestination command = new CreateDestination();
command.setName("jmsQueue1");
command.setBindings("jmsQueue1Binding");
command.execute(new ActionContext());
DeleteDestination delete = new DeleteDestination();
delete.setName("jmsQueue1");
delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionResult(delete);
}
@Test
public void testDeleteNonExistJmsQueue() throws Exception {
DeleteDestination delete = new DeleteDestination();
delete.setName("jmsQueue1NotExist");
delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionResult(delete);
}
@Test
public void testCreateJmsQueueWithFilter() throws Exception {
CreateDestination command = new CreateDestination();
command.setName("jmsQueue2");
command.setBindings("jmsQueue2Binding");
command.setFilter("color='red'");
command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionResult(command);
assertTrue(checkBindingExists(command, "color='red'"));
}
@Test
public void testCreateJmsTopic() throws Exception {
CreateDestination command = new CreateDestination();
command.setDestType(DestinationAction.JMS_TOPIC);
command.setName("jmsTopic1");
command.setBindings("jmsTopic1Binding");
command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionResult(command);
}
@Test
public void testDeleteJmsTopic() throws Exception {
CreateDestination command = new CreateDestination();
command.setDestType(DestinationAction.JMS_TOPIC);
command.setName("jmsTopic1");
command.setBindings("jmsTopic1Binding");
command.execute(new ActionContext());
DeleteDestination delete = new DeleteDestination();
delete.setDestType(DestinationAction.JMS_TOPIC);
delete.setName("jmsTopic1");
delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionResult(delete);
}
@Test
public void testDeleteJmsTopicNotExist() throws Exception {
DeleteDestination delete = new DeleteDestination();
delete.setDestType(DestinationAction.JMS_TOPIC);
delete.setName("jmsTopic1NotExist");
delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionResult(delete);
}
@Test
public void testCreateCoreQueue() throws Exception {
CreateDestination command = new CreateDestination();
command.setDestType(DestinationAction.CORE_QUEUE);
command.setName("coreQueue1");
command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionResult(command);
}
@Test
public void testCreateCoreQueueWithFilter() throws Exception {
CreateDestination command = new CreateDestination();
command.setName("coreQueue2");
command.setDestType(DestinationAction.CORE_QUEUE);
command.setFilter("color='green'");
command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionResult(command);
}
@Test
public void testDeleteCoreQueue() throws Exception {
CreateDestination command = new CreateDestination();
command.setName("coreQueue2");
command.setDestType(DestinationAction.CORE_QUEUE);
command.setFilter("color='green'");
command.execute(new ActionContext());
DeleteDestination delete = new DeleteDestination();
delete.setName("coreQueue2");
delete.setDestType(DestinationAction.CORE_QUEUE);
delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionResult(delete);
}
@Test
public void testDeleteCoreQueueNotExist() throws Exception {
DeleteDestination delete = new DeleteDestination();
delete.setName("coreQueue2NotExist");
delete.setDestType(DestinationAction.CORE_QUEUE);
delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionResult(delete);
}
private boolean isCreateCommand(DestinationAction command) {
return command instanceof CreateDestination;
}
private boolean isJms(DestinationAction command) {
String destType = command.getDestType();
return !DestinationAction.CORE_QUEUE.equals(destType);
}
private boolean isTopic(DestinationAction command) {
String destType = command.getDestType();
return DestinationAction.JMS_TOPIC.equals(destType);
}
private void checkExecutionResult(DestinationAction command) throws Exception {
if (isCreateCommand(command)) {
String fullMessage = output.toString();
System.out.println("output: " + fullMessage);
assertTrue(fullMessage, fullMessage.contains("successfully"));
assertTrue(checkBindingExists(command, null));
} else {
if (command.getName().equals("jmsQueue1") || command.getName().equals("coreQueue2") || command.getName().equals("jmsTopic1")) {
String fullMessage = output.toString();
System.out.println("output: " + fullMessage);
assertTrue(fullMessage, fullMessage.contains("successfully"));
assertFalse(checkBindingExists(command, null));
} else {
String errorMessage = error.toString();
System.out.println("error: " + errorMessage);
assertTrue(errorMessage, errorMessage.contains("Failed to"));
assertFalse(checkBindingExists(command, null));
}
}
}
private boolean checkBindingExists(DestinationAction command, String filter) {
String bindingKey = command.getName();
if (isJms(command)) {
if (isTopic(command)) {
// bindingKey = bindingKey;
} else {
// bindingKey = bindingKey;
}
}
Map<SimpleString, Binding> bindings = server.getPostOffice().getAllBindings();
System.out.println("bindings: " + bindings);
Binding binding = bindings.get(new SimpleString(bindingKey));
System.out.println("got binding: " + binding);
if (binding == null) {
System.out.println("No bindings for " + bindingKey);
return false;
}
if (filter != null) {
Filter bindingFilter = binding.getFilter();
assertNotNull(bindingFilter);
assertEquals(filter, bindingFilter.getFilterString().toString());
}
return true;
}
}

View File

@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cli;
import java.util.List;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.transaction.Transaction;
public class DummyServerConsumer implements ServerConsumer {
@Override
public void setlowConsumerDetection(SlowConsumerDetectionListener listener) {
}
@Override
public SlowConsumerDetectionListener getSlowConsumerDetecion() {
return null;
}
@Override
public void fireSlowConsumer() {
}
@Override
public Object getProtocolData() {
return null;
}
@Override
public void setProtocolData(Object protocolData) {
}
@Override
public void setProtocolContext(Object protocolContext) {
}
@Override
public Object getProtocolContext() {
return null;
}
@Override
public long getID() {
return 0;
}
@Override
public Object getConnectionID() {
return null;
}
@Override
public void close(boolean failed) throws Exception {
}
@Override
public void removeItself() throws Exception {
}
@Override
public List<MessageReference> cancelRefs(boolean failed,
boolean lastConsumedAsDelivered,
Transaction tx) throws Exception {
return null;
}
@Override
public void setStarted(boolean started) {
}
@Override
public void receiveCredits(int credits) {
}
@Override
public Queue getQueue() {
return null;
}
@Override
public MessageReference removeReferenceByID(long messageID) throws Exception {
return null;
}
@Override
public void backToDelivering(MessageReference reference) {
}
@Override
public List<MessageReference> getDeliveringReferencesBasedOnProtocol(boolean remove,
Object protocolDataStart,
Object protocolDataEnd) {
return null;
}
@Override
public void acknowledge(Transaction tx, long messageID) throws Exception {
}
@Override
public void individualAcknowledge(Transaction tx, long messageID) throws Exception {
}
@Override
public void individualCancel(long messageID, boolean failed) throws Exception {
}
@Override
public void forceDelivery(long sequence) {
}
@Override
public void setTransferring(boolean transferring) {
}
@Override
public boolean isBrowseOnly() {
return false;
}
@Override
public long getCreationTime() {
return 0;
}
@Override
public String getSessionID() {
return null;
}
@Override
public void promptDelivery() {
}
@Override
public HandleStatus handle(MessageReference reference) throws Exception {
return null;
}
@Override
public void proceedDeliver(MessageReference reference) throws Exception {
}
@Override
public Filter getFilter() {
return null;
}
@Override
public List<MessageReference> getDeliveringMessages() {
return null;
}
@Override
public String debug() {
return null;
}
@Override
public String toManagementString() {
return null;
}
@Override
public void disconnect() {
}
}

View File

@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cli;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.queue.CreateQueue;
import org.apache.activemq.artemis.cli.commands.queue.DeleteQueue;
import org.apache.activemq.artemis.cli.commands.AbstractAction;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.junit.Before;
import org.junit.Test;
public class QueueCommandTest extends JMSTestBase {
//the command
private ByteArrayOutputStream output;
private ByteArrayOutputStream error;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
this.output = new ByteArrayOutputStream(1024);
this.error = new ByteArrayOutputStream(1024);
}
@Test
public void testCreateCoreQueueShowsErrorWhenAddressDoesNotExists() throws Exception {
String queueName = "queue1";
CreateQueue command = new CreateQueue();
command.setName(queueName);
command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionFailure(command, "AMQ119203: Address Does Not Exist:");;
assertFalse(server.queueQuery(new SimpleString(queueName)).isExists());
}
@Test
public void testCreateCoreQueueAutoCreateAddressDefaultAddress() throws Exception {
String queueName = UUID.randomUUID().toString();
CreateQueue command = new CreateQueue();
command.setName(queueName);
command.setAutoCreateAddress(true);
command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionPassed(command);
assertNotNull(server.getAddressInfo(new SimpleString(queueName)));
Queue queue = server.locateQueue(new SimpleString(queueName));
assertEquals(-1, queue.getMaxConsumers());
assertEquals(false, queue.isDeleteOnNoConsumers());
assertTrue(server.queueQuery(new SimpleString(queueName)).isExists());
}
@Test
public void testCreateCoreQueueAddressExists() throws Exception {
String queueName = "queue";
String address= "address";
CreateQueue command = new CreateQueue();
command.setName(queueName);
command.setAutoCreateAddress(false);
command.setAddress(address);
server.createOrUpdateAddressInfo(new AddressInfo(new SimpleString(address)));
command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionPassed(command);
assertNotNull(server.getAddressInfo(new SimpleString(address)));
Queue queue = server.locateQueue(new SimpleString(queueName));
assertEquals(-1, queue.getMaxConsumers());
assertEquals(false, queue.isDeleteOnNoConsumers());
assertTrue(server.queueQuery(new SimpleString(queueName)).isExists());
}
@Test
public void testCreateCoreQueueWithFilter() throws Exception {
String queueName = "queue2";
String filerString = "color='green'";
CreateQueue command = new CreateQueue();
command.setName(queueName);
command.setFilter("color='green'");
command.setAutoCreateAddress(true);
command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionPassed(command);
Queue queue = server.locateQueue(new SimpleString(queueName));
assertNotNull(queue);
assertEquals(new SimpleString(filerString), queue.getFilter().getFilterString());
}
@Test
public void testCreateQueueAlreadyExists() throws Exception {
String queueName = "queue2";
String filerString = "color='green'";
CreateQueue command = new CreateQueue();
command.setName(queueName);
command.setFilter("color='green'");
command.setAutoCreateAddress(true);
command.execute(new ActionContext());
command.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionFailure(command, "AMQ119019: Queue already exists " + queueName);
}
@Test
public void testDeleteCoreQueue() throws Exception {
SimpleString queueName = new SimpleString("deleteQueue");
CreateQueue command = new CreateQueue();
command.setName(queueName.toString());
command.setFilter("color='green'");
command.setAutoCreateAddress(true);
command.execute(new ActionContext());
DeleteQueue delete = new DeleteQueue();
delete.setName(queueName.toString());
delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionPassed(delete);
assertFalse(server.queueQuery(queueName).isExists());
}
@Test
public void testDeleteQueueDoesNotExist() throws Exception {
SimpleString queueName = new SimpleString("deleteQueue");
DeleteQueue delete = new DeleteQueue();
delete.setName(queueName.toString());
delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionFailure(delete, "AMQ119017: Queue " + queueName + " does not exist");
assertFalse(server.queueQuery(queueName).isExists());
}
@Test
public void testDeleteQueueWithConsumersFails() throws Exception {
SimpleString queueName = new SimpleString("deleteQueue");
CreateQueue command = new CreateQueue();
command.setName(queueName.toString());
command.setFilter("color='green'");
command.setAutoCreateAddress(true);
command.execute(new ActionContext());
server.locateQueue(queueName).addConsumer(new DummyServerConsumer());
DeleteQueue delete = new DeleteQueue();
delete.setName(queueName.toString());
delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionFailure(delete, "AMQ119025: Cannot delete queue " + queueName + " on binding deleteQueue");
}
@Test
public void testDeleteQueueWithConsumersFailsAndRemoveConsumersTrue() throws Exception {
SimpleString queueName = new SimpleString("deleteQueue");
CreateQueue command = new CreateQueue();
command.setName(queueName.toString());
command.setFilter("color='green'");
command.setAutoCreateAddress(true);
command.execute(new ActionContext());
server.locateQueue(queueName).addConsumer(new DummyServerConsumer());
DeleteQueue delete = new DeleteQueue();
delete.setName(queueName.toString());
delete.setRemoveConsumers(true);
delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionPassed(command);
}
@Test
public void testAutoDeleteAddress() throws Exception {
SimpleString queueName = new SimpleString("deleteQueue");
CreateQueue command = new CreateQueue();
command.setName(queueName.toString());
command.setFilter("color='green'");
command.setAutoCreateAddress(true);
command.execute(new ActionContext());
assertNotNull(server.getAddressInfo(queueName));
server.locateQueue(queueName).addConsumer(new DummyServerConsumer());
DeleteQueue delete = new DeleteQueue();
delete.setName(queueName.toString());
delete.setRemoveConsumers(true);
delete.setAutoDeleteAddress(true);
delete.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionPassed(command);
assertNull(server.getAddressInfo(queueName));
}
private void checkExecutionPassed(AbstractAction command) throws Exception {
String fullMessage = output.toString();
System.out.println("output: " + fullMessage);
assertTrue(fullMessage, fullMessage.contains("successfully"));
}
private void checkExecutionFailure(AbstractAction command, String message) throws Exception {
String fullMessage = error.toString();
System.out.println("error: " + fullMessage);
assertTrue(fullMessage, fullMessage.contains(message));
}
}

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.tests.integration.management; package org.apache.activemq.artemis.tests.integration.management;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.Parameter; import org.apache.activemq.artemis.api.core.management.Parameter;
import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.core.management.ResourceNames;
@ -103,6 +104,14 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
proxy.invokeOperation("createAddress", name, routingType, defaultDeleteOnNoConsumers, defaultMaxConsumers); proxy.invokeOperation("createAddress", name, routingType, defaultDeleteOnNoConsumers, defaultMaxConsumers);
} }
@Override
public void createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
@Parameter(name = "routingType", desc = "The routing type for the address either 'MULTICAST' or 'ANYCAST'") String routingType,
@Parameter(name = "defaultDeleteOnNoConsumers", desc = "Whether or not a queue with this address is deleted when it has no consumers") boolean defaultDeleteOnNoConsumers,
@Parameter(name = "defaultMaxConsumers", desc = "The maximim number of consumer a queue with this address can have") int defaultMaxConsumers) throws Exception {
proxy.invokeOperation("createAddress", name, routingType, defaultDeleteOnNoConsumers, defaultMaxConsumers);
}
@Override @Override
public void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception { public void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception {
proxy.invokeOperation("deleteAddress", name); proxy.invokeOperation("deleteAddress", name);
@ -121,6 +130,17 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
proxy.invokeOperation("createQueue", address, name, durable); proxy.invokeOperation("createQueue", address, name, durable);
} }
@Override
public void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "filter", desc = "Filter of the queue") String filter,
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
@Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers,
@Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception {
}
@Override @Override
public void deployQueue(final String address, public void deployQueue(final String address,
final String name, final String name,
@ -144,6 +164,12 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
proxy.invokeOperation("destroyQueue", name, removeConsumers); proxy.invokeOperation("destroyQueue", name, removeConsumers);
} }
@Override
public void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name,
@Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers,
boolean autoDeleteAddress) throws Exception {
}
@Override @Override
public void disableMessageCounters() throws Exception { public void disableMessageCounters() throws Exception {
proxy.invokeOperation("disableMessageCounters"); proxy.invokeOperation("disableMessageCounters");
@ -631,6 +657,16 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
return (String) proxy.invokeOperation("listNetworkTopology"); return (String) proxy.invokeOperation("listNetworkTopology");
} }
@Override
public String getAddressInfo(String address) throws ActiveMQAddressDoesNotExistException {
return null;
}
@Override
public String[] listBindingsForAddress(String address) throws Exception {
return new String[0];
}
@Override @Override
public void removeAddressSettings(String addressMatch) throws Exception { public void removeAddressSettings(String addressMatch) throws Exception {
proxy.invokeOperation("removeAddressSettings", addressMatch); proxy.invokeOperation("removeAddressSettings", addressMatch);

View File

@ -1628,7 +1628,7 @@ public class MQTTTest extends MQTTTestSupport {
addressInfo.setDefaultMaxQueueConsumers(0); addressInfo.setDefaultMaxQueueConsumers(0);
getServer().createOrUpdateAddressInfo(addressInfo); getServer().createOrUpdateAddressInfo(addressInfo);
getServer().createQueue(coreAddress, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false); getServer().createQueue(coreAddress, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false, false);
MQTT mqtt = createMQTTConnection(); MQTT mqtt = createMQTTConnection();
mqtt.setClientId(clientId); mqtt.setClientId(clientId);
@ -1674,7 +1674,7 @@ public class MQTTTest extends MQTTTestSupport {
try { try {
String clientId = "testMqtt"; String clientId = "testMqtt";
SimpleString coreAddress = new SimpleString("foo.bar"); SimpleString coreAddress = new SimpleString("foo.bar");
getServer().createQueue(coreAddress, new SimpleString(clientId + "." + coreAddress), null, false, true, -1, true); getServer().createQueue(coreAddress, new SimpleString(clientId + "." + coreAddress), null, false, true, -1, true, false);
Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}; Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.tests.unit.core.server.impl.fakes; package org.apache.activemq.artemis.tests.unit.core.server.impl.fakes;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -80,6 +81,11 @@ public class FakePostOffice implements PostOffice {
return null; return null;
} }
@Override
public List<Queue> listQueuesForAddress(SimpleString address) throws Exception {
return null;
}
@Override @Override
public void addBinding(final Binding binding) throws Exception { public void addBinding(final Binding binding) throws Exception {