This closes #922

This commit is contained in:
Martyn Taylor 2016-12-15 13:41:30 +00:00
commit 2d7b77ac1b
22 changed files with 553 additions and 247 deletions

View File

@ -30,18 +30,18 @@ import org.apache.activemq.artemis.cli.commands.Kill;
import org.apache.activemq.artemis.cli.commands.Mask;
import org.apache.activemq.artemis.cli.commands.Run;
import org.apache.activemq.artemis.cli.commands.Stop;
import org.apache.activemq.artemis.cli.commands.address.AddRoutingType;
import org.apache.activemq.artemis.cli.commands.address.CreateAddress;
import org.apache.activemq.artemis.cli.commands.address.DeleteAddress;
import org.apache.activemq.artemis.cli.commands.address.HelpAddress;
import org.apache.activemq.artemis.cli.commands.address.RemoveRoutingType;
import org.apache.activemq.artemis.cli.commands.address.ShowAddress;
import org.apache.activemq.artemis.cli.commands.address.UpdateAddress;
import org.apache.activemq.artemis.cli.commands.queue.CreateQueue;
import org.apache.activemq.artemis.cli.commands.queue.DeleteQueue;
import org.apache.activemq.artemis.cli.commands.queue.HelpQueue;
import org.apache.activemq.artemis.cli.commands.messages.Browse;
import org.apache.activemq.artemis.cli.commands.messages.Consumer;
import org.apache.activemq.artemis.cli.commands.messages.Producer;
import org.apache.activemq.artemis.cli.commands.queue.UpdateQueue;
import org.apache.activemq.artemis.cli.commands.tools.CompactJournal;
import org.apache.activemq.artemis.cli.commands.tools.DecodeJournal;
import org.apache.activemq.artemis.cli.commands.tools.EncodeJournal;
@ -134,11 +134,11 @@ public class Artemis {
String instance = artemisInstance != null ? artemisInstance.getAbsolutePath() : System.getProperty("artemis.instance");
Cli.CliBuilder<Action> builder = Cli.<Action>builder("artemis").withDescription("ActiveMQ Artemis Command Line").withCommand(HelpAction.class).withCommand(Producer.class).withCommand(Consumer.class).withCommand(Browse.class).withCommand(Mask.class).withDefaultCommand(HelpAction.class);
builder.withGroup("queue").withDescription("Queue tools group (create|delete) (example ./artemis queue create)").
withDefaultCommand(HelpQueue.class).withCommands(CreateQueue.class, DeleteQueue.class);
builder.withGroup("queue").withDescription("Queue tools group (create|delete|update) (example ./artemis queue create)").
withDefaultCommand(HelpQueue.class).withCommands(CreateQueue.class, DeleteQueue.class, UpdateQueue.class);
builder.withGroup("address").withDescription("Address tools group (create|delete|addRoutingType|removeRoutingType|show) (example ./artemis address create)").
withDefaultCommand(HelpAddress.class).withCommands(CreateAddress.class, DeleteAddress.class, AddRoutingType.class, RemoveRoutingType.class, ShowAddress.class);
builder.withGroup("address").withDescription("Address tools group (create|delete|update|show) (example ./artemis address create)").
withDefaultCommand(HelpAddress.class).withCommands(CreateAddress.class, DeleteAddress.class, UpdateAddress.class, ShowAddress.class);
if (instance != null) {
builder.withGroup("data").withDescription("data tools group (print|exp|imp|exp|encode|decode|compact) (example ./artemis data print)").

View File

@ -50,7 +50,8 @@ public class CreateAddress extends AbstractAction {
@Override
public void requestSuccessful(ClientMessage reply) throws Exception {
context.out.println("Address " + getName() + " created successfully.");
final String result = ManagementHelper.getResult(reply, String.class) + " created successfully.";
context.out.println(result);
}
@Override

View File

@ -24,32 +24,33 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.cli.commands.AbstractAction;
import org.apache.activemq.artemis.cli.commands.ActionContext;
@Command(name = "addRoutingType", description = "add the provided routing types to an address")
public class AddRoutingType extends AbstractAction {
@Command(name = "update", description = "update an address")
public class UpdateAddress extends AbstractAction {
@Option(name = "--name", description = "The name of this address", required = true)
String name;
@Option(name = "--routingType", description = "The routing types to be added to this address, options are 'anycast' or 'multicast'", required = true)
String routingType;
@Option(name = "--routingTypes", description = "The routing types supported by this address, options are 'anycast' or 'multicast', enter comma separated list")
String routingTypes = null;
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
addRoutingType(context);
updateAddress(context);
return null;
}
private void addRoutingType(final ActionContext context) throws Exception {
private void updateAddress(final ActionContext context) throws Exception {
performCoreManagement(new AbstractAction.ManagementCallback<ClientMessage>() {
@Override
public void setUpInvocation(ClientMessage message) throws Exception {
ManagementHelper.putOperationInvocation(message, "broker", "addRoutingType", name, routingType);
ManagementHelper.putOperationInvocation(message, "broker", "updateAddress", name, routingTypes);
}
@Override
public void requestSuccessful(ClientMessage reply) throws Exception {
context.out.println("Address " + name + " updated successfully.");
final String result = ManagementHelper.getResult(reply, String.class) + " updated successfully.";
context.out.println(result);
}
@Override
@ -68,11 +69,11 @@ public class AddRoutingType extends AbstractAction {
this.name = name;
}
public String getRoutingType() {
return routingType;
public String getRoutingTypes() {
return routingTypes;
}
public void setRoutingType(String routingType) {
this.routingType = routingType;
public void setRoutingTypes(String routingTypes) {
this.routingTypes = routingTypes;
}
}

View File

@ -75,7 +75,8 @@ public class CreateQueue extends AbstractAction {
@Override
public void requestSuccessful(ClientMessage reply) throws Exception {
context.out.println("Core queue " + getName() + " created successfully.");
final String result = ManagementHelper.getResult(reply, String.class) + " created successfully.";
context.out.println(result);
}
@Override

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.address;
package org.apache.activemq.artemis.cli.commands.queue;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
@ -24,38 +24,45 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.cli.commands.AbstractAction;
import org.apache.activemq.artemis.cli.commands.ActionContext;
@Command(name = "removeRoutingType", description = "remove the provided routing types from an address")
public class RemoveRoutingType extends AbstractAction {
@Command(name = "update", description = "update a core queue")
public class UpdateQueue extends AbstractAction {
@Option(name = "--name", description = "The name of the address", required = true)
@Option(name = "--name", description = "name", required = true)
String name;
@Option(name = "--routingType", description = "The routing types to be removed from this address, options are 'anycast' or 'multicast'", required = true)
String routingType;
@Option(name = "--deleteOnNoConsumers", description = "whether to delete when it's last consumers disconnects)")
Boolean deleteOnNoConsumers = null;
@Option(name = "--maxConsumers", description = "Maximum number of consumers allowed at any one time")
Integer maxConsumers = null;
@Option(name = "--routingType", description = "The routing type supported by this queue, options are 'anycast' or 'multicast'")
String routingType = null;
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
removeRoutingType(context);
createQueue(context);
return null;
}
private void removeRoutingType(final ActionContext context) throws Exception {
performCoreManagement(new AbstractAction.ManagementCallback<ClientMessage>() {
private void createQueue(final ActionContext context) throws Exception {
performCoreManagement(new ManagementCallback<ClientMessage>() {
@Override
public void setUpInvocation(ClientMessage message) throws Exception {
ManagementHelper.putOperationInvocation(message, "broker", "removeRoutingType", name, routingType);
ManagementHelper.putOperationInvocation(message, "broker", "updateQueue", name, routingType, maxConsumers, deleteOnNoConsumers);
}
@Override
public void requestSuccessful(ClientMessage reply) throws Exception {
context.out.println("Address " + name + " updated successfully.");
final String result = ManagementHelper.getResult(reply, String.class) + " updated successfully.";
context.out.println(result);
}
@Override
public void requestFailed(ClientMessage reply) throws Exception {
String errMsg = (String) ManagementHelper.getResult(reply, String.class);
context.err.println("Failed to update address " + name + ". Reason: " + errMsg);
context.err.println("Failed to update " + name + ". Reason: " + errMsg);
}
});
}
@ -68,6 +75,22 @@ public class RemoveRoutingType extends AbstractAction {
this.name = name;
}
public Boolean getDeleteOnNoConsumers() {
return deleteOnNoConsumers;
}
public void setDeleteOnNoConsumers(boolean deleteOnNoConsumers) {
this.deleteOnNoConsumers = deleteOnNoConsumers;
}
public Integer getMaxConsumers() {
return maxConsumers;
}
public void setMaxConsumers(int maxConsumers) {
this.maxConsumers = maxConsumers;
}
public String getRoutingType() {
return routingType;
}
@ -76,3 +99,4 @@ public class RemoveRoutingType extends AbstractAction {
this.routingType = routingType;
}
}

View File

@ -435,16 +435,12 @@ public interface ActiveMQServerControl {
// Operations ----------------------------------------------------
@Operation(desc = "create an address", impact = MBeanOperationInfo.ACTION)
void createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
String createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
@Parameter(name = "routingTypes", desc = "Comma separated list of Routing Types (anycast/multicast)") String routingTypes) throws Exception;
@Operation(desc = "add the provided routing type to an address", impact = MBeanOperationInfo.ACTION)
void addRoutingType(@Parameter(name = "name", desc = "The name of the address") String name,
@Parameter(name = "routingType", desc = "The routing types to be added to this address, options are 'anycast' or 'multicast'") String routingType) throws Exception;
@Operation(desc = "remove the provided routing type to an address", impact = MBeanOperationInfo.ACTION)
void removeRoutingType(@Parameter(name = "name", desc = "The name of the address") String name,
@Parameter(name = "routingType", desc = "The routing types to be added to this address, options are 'anycast' or 'multicast'") String routingType) throws Exception;
@Operation(desc = "update an address", impact = MBeanOperationInfo.ACTION)
String updateAddress(@Parameter(name = "name", desc = "The name of the address") String name,
@Parameter(name = "routingTypes", desc = "Comma separated list of Routing Types (anycast/multicast)") String routingTypes) throws Exception;
@Operation(desc = "delete an address", impact = MBeanOperationInfo.ACTION)
void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception;
@ -571,9 +567,10 @@ public interface ActiveMQServerControl {
* @param maxConsumers the maximum number of consumers allowed on this queue at any one time
* @param deleteOnNoConsumers delete this queue when the last consumer disconnects
* @param autoCreateAddress create an address with default values should a matching address not be found
* @return a textual summary of the queue
* @throws Exception
*/
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
String createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "filter", desc = "Filter of the queue") String filterStr,
@ -582,6 +579,22 @@ public interface ActiveMQServerControl {
@Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers,
@Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception;
/**
* Update a queue.
*
* @param name name of the queue
* @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
* @param maxConsumers the maximum number of consumers allowed on this queue at any one time
* @param deleteOnNoConsumers delete this queue when the last consumer disconnects
* @return a textual summary of the queue
* @throws Exception
*/
String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
@Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean deleteOnNoConsumers) throws Exception;
/**
* Deploy a durable queue.

View File

@ -60,6 +60,7 @@ import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -562,8 +563,58 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
}
private enum AddressInfoTextFormatter {
Long {
@Override
public void createAddress(String name, String routingTypes) throws Exception {
public StringBuilder format(AddressInfo addressInfo, StringBuilder output) {
output.append("Address [name=").append(addressInfo.getName());
output.append(", routingTypes={");
final Set<RoutingType> routingTypes = addressInfo.getRoutingTypes();
if (!routingTypes.isEmpty()) {
for (RoutingType routingType : routingTypes) {
output.append(routingType).append(',');
}
// delete hanging comma
output.deleteCharAt(output.length() - 1);
}
output.append('}');
output.append(", autoCreated=").append(addressInfo.isAutoCreated());
output.append(']');
return output;
}
};
public abstract StringBuilder format(AddressInfo addressInfo, StringBuilder output);
}
public enum QueueTextFormatter {
Long {
@Override
StringBuilder format(Queue queue, StringBuilder output) {
output.append("Queue [name=").append(queue.getName());
output.append(", address=").append(queue.getAddress());
output.append(", routingType=").append(queue.getRoutingType());
final Filter filter = queue.getFilter();
if (filter != null) {
output.append(", filter=").append(filter.getFilterString());
}
output.append(", durable=").append(queue.isDurable());
final int maxConsumers = queue.getMaxConsumers();
if (maxConsumers != Queue.MAX_CONSUMERS_UNLIMITED) {
output.append(", maxConsumers=").append(queue.getMaxConsumers());
}
output.append(", deleteOnNoConsumers=").append(queue.isDeleteOnNoConsumers());
output.append(", autoCreateAddress=").append(queue.isAutoCreated());
output.append(']');
return output;
}
};
abstract StringBuilder format(Queue queue, StringBuilder output);
}
@Override
public String createAddress(String name, String routingTypes) throws Exception {
checkStarted();
clearIO();
@ -572,33 +623,38 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
for (String routingType : toList(routingTypes)) {
set.add(RoutingType.valueOf(routingType));
}
server.createAddressInfo(new AddressInfo(new SimpleString(name), set));
final AddressInfo addressInfo = new AddressInfo(new SimpleString(name), set);
if (server.createAddressInfo(addressInfo)) {
return AddressInfoTextFormatter.Long.format(addressInfo, new StringBuilder()).toString();
} else {
return "";
}
} finally {
blockOnIO();
}
}
@Override
public void addRoutingType(String name, String routingTypeName) throws Exception {
public String updateAddress(String name, String routingTypes) throws Exception {
checkStarted();
clearIO();
try {
final RoutingType routingType = RoutingType.valueOf(routingTypeName);
server.addRoutingType(name, routingType);
} finally {
blockOnIO();
final Set<RoutingType> routingTypeSet;
if (routingTypes == null) {
routingTypeSet = null;
} else {
routingTypeSet = new HashSet<>();
final String[] routingTypeNames = routingTypes.split(",");
for (String routingTypeName : routingTypeNames) {
routingTypeSet.add(RoutingType.valueOf(routingTypeName));
}
}
@Override
public void removeRoutingType(String name, String routingTypeName) throws Exception {
checkStarted();
clearIO();
try {
final RoutingType routingType = RoutingType.valueOf(routingTypeName);
server.removeRoutingType(name, routingType);
final AddressInfo updatedAddressInfo = server.updateAddressInfo(name, routingTypeSet);
if (updatedAddressInfo == null) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(name));
}
return AddressInfoTextFormatter.Long.format(updatedAddressInfo, new StringBuilder()).toString();
} finally {
blockOnIO();
}
@ -672,7 +728,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
@Override
public void createQueue(String address,
public String createQueue(String address,
String routingType,
String name,
String filterStr,
@ -690,12 +746,32 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
filter = new SimpleString(filterStr);
}
server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
final Queue queue = server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
return QueueTextFormatter.Long.format(queue, new StringBuilder()).toString();
} finally {
blockOnIO();
}
}
@Override
public String updateQueue(String name,
String routingType,
Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception {
checkStarted();
clearIO();
try {
final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, maxConsumers, deleteOnNoConsumers);
if (queue == null) {
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(new SimpleString(name));
}
return QueueTextFormatter.Long.format(queue, new StringBuilder()).toString();
} finally {
blockOnIO();
}
}
@Override
public String[] getQueueNames() {
@ -804,7 +880,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
if (addressInfo == null) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(address));
} else {
return addressInfo.toString();
return AddressInfoTextFormatter.Long.format(addressInfo, new StringBuilder()).toString();
}
} finally {
blockOnIO();

View File

@ -16,9 +16,9 @@
*/
package org.apache.activemq.artemis.core.postoffice;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.RoutingType;
@ -64,14 +64,14 @@ public interface AddressManager {
*/
boolean addAddressInfo(AddressInfo addressInfo);
AddressInfo updateAddressInfoIfPresent(SimpleString addressName,
BiFunction<? super SimpleString, ? super AddressInfo, ? extends AddressInfo> remappingFunction);
AddressInfo updateAddressInfo(SimpleString addressName,
Collection<RoutingType> routingTypes) throws Exception;
/**
* @param addressInfo
* @return true if the address was added, false if it was updated
* @return the same provided {@code addressInfo} if the address was added, another if it was updated
*/
boolean addOrUpdateAddressInfo(AddressInfo addressInfo);
AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo);
AddressInfo removeAddressInfo(SimpleString address);

View File

@ -16,11 +16,11 @@
*/
package org.apache.activemq.artemis.core.postoffice;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@ -53,20 +53,25 @@ public interface PostOffice extends ActiveMQComponent {
/**
* @param addressInfo
* @return true if the address was added, false if it was updated
* @return the same provided {@code addressInfo} if the address was added, another if it was updated
*/
boolean addOrUpdateAddressInfo(AddressInfo addressInfo);
AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo);
AddressInfo removeAddressInfo(SimpleString address) throws Exception;
AddressInfo getAddressInfo(SimpleString address);
void addRoutingType(SimpleString addressName, RoutingType routingType) throws ActiveMQAddressDoesNotExistException;
AddressInfo updateAddressInfo(SimpleString addressName, Collection<RoutingType> routingTypes) throws Exception;
void removeRoutingType(SimpleString addressName, RoutingType routingType) throws Exception;
QueueBinding updateQueue(SimpleString name,
RoutingType routingType,
Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception;
List<Queue> listQueuesForAddress(SimpleString address) throws Exception;
void addBinding(Binding binding) throws Exception;
Binding removeBinding(SimpleString uniqueName, Transaction tx, boolean deleteData) throws Exception;

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -32,7 +33,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
@ -440,53 +440,68 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
@Override
public boolean addOrUpdateAddressInfo(AddressInfo addressInfo) {
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
synchronized (addressLock) {
boolean result = addressManager.addOrUpdateAddressInfo(addressInfo);
final AddressInfo updatedAddressInfo = addressManager.addOrUpdateAddressInfo(addressInfo);
// only register address if it is newly added
if (result) {
final boolean isNew = updatedAddressInfo == addressInfo;
if (isNew) {
try {
managementService.registerAddress(addressInfo);
} catch (Exception e) {
e.printStackTrace();
}
}
return result;
return updatedAddressInfo;
}
}
@Override
public void addRoutingType(SimpleString addressName, RoutingType routingType) throws ActiveMQAddressDoesNotExistException {
public QueueBinding updateQueue(SimpleString name,
RoutingType routingType,
Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception {
synchronized (addressLock) {
final AddressInfo updateAddressInfo = addressManager.updateAddressInfoIfPresent(addressName, (name, addressInfo) -> {
addressInfo.getRoutingTypes().add(routingType);
return addressInfo;
});
if (updateAddressInfo == null) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName);
final QueueBinding queueBinding = (QueueBinding) addressManager.getBinding(name);
if (queueBinding == null) {
return null;
}
final Queue queue = queueBinding.getQueue();
//TODO put the whole update logic on Queue
//validate update
if (maxConsumers != null) {
final int consumerCount = queue.getConsumerCount();
if (consumerCount > maxConsumers) {
throw ActiveMQMessageBundle.BUNDLE.invalidMaxConsumersUpdate(name.toString(), maxConsumers, consumerCount);
}
}
if (routingType != null) {
final SimpleString address = queue.getAddress();
final AddressInfo addressInfo = addressManager.getAddressInfo(address);
final Set<RoutingType> addressRoutingTypes = addressInfo.getRoutingTypes();
if (!addressRoutingTypes.contains(routingType)) {
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeUpdate(name.toString(), routingType, address.toString(), addressRoutingTypes);
}
}
//atomic update
if (maxConsumers != null) {
queue.setMaxConsumer(maxConsumers);
}
if (routingType != null) {
queue.setRoutingType(routingType);
}
if (deleteOnNoConsumers != null) {
queue.setDeleteOnNoConsumers(deleteOnNoConsumers);
}
return queueBinding;
}
}
@Override
public void removeRoutingType(SimpleString addressName, RoutingType routingType) throws Exception {
public AddressInfo updateAddressInfo(SimpleString addressName,
Collection<RoutingType> routingTypes) throws Exception {
synchronized (addressLock) {
if (RoutingType.MULTICAST.equals(routingType)) {
final Bindings bindings = addressManager.getBindingsForRoutingAddress(addressName);
if (bindings != null) {
final boolean existsQueueBindings = bindings.getBindings().stream().anyMatch(QueueBinding.class::isInstance);
if (existsQueueBindings) {
throw ActiveMQMessageBundle.BUNDLE.invalidMulticastRoutingTypeDelete();
}
}
}
final AddressInfo updateAddressInfo = addressManager.updateAddressInfoIfPresent(addressName, (name, addressInfo) -> {
addressInfo.getRoutingTypes().remove(routingType);
return addressInfo;
});
if (updateAddressInfo == null) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName);
}
return addressManager.updateAddressInfo(addressName, routingTypes);
}
}

View File

@ -16,12 +16,13 @@
*/
package org.apache.activemq.artemis.core.postoffice.impl;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Address;
@ -29,6 +30,7 @@ import org.apache.activemq.artemis.core.postoffice.AddressManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@ -220,26 +222,48 @@ public class SimpleAddressManager implements AddressManager {
}
@Override
public AddressInfo updateAddressInfoIfPresent(SimpleString addressName,
BiFunction<? super SimpleString, ? super AddressInfo, ? extends AddressInfo> remappingFunction) {
return addressInfoMap.computeIfPresent(addressName, remappingFunction);
public AddressInfo updateAddressInfo(SimpleString addressName,
Collection<RoutingType> routingTypes) throws Exception {
if (routingTypes == null) {
return this.addressInfoMap.get(addressName);
} else {
return this.addressInfoMap.computeIfPresent(addressName, (name, oldAddressInfo) -> {
validateRoutingTypes(name, routingTypes);
final Set<RoutingType> updatedRoutingTypes = EnumSet.copyOf(routingTypes);
oldAddressInfo.setRoutingTypes(updatedRoutingTypes);
return oldAddressInfo;
});
}
}
@Override
public boolean addOrUpdateAddressInfo(AddressInfo addressInfo) {
boolean isNew = addAddressInfo(addressInfo);
// address already exists so update it
if (!isNew) {
AddressInfo toUpdate = getAddressInfo(addressInfo.getName());
synchronized (toUpdate) {
for (RoutingType routingType : addressInfo.getRoutingTypes()) {
toUpdate.addRoutingType(routingType);
private void validateRoutingTypes(SimpleString addressName, Collection<RoutingType> routingTypes) {
final Bindings bindings = this.mappings.get(addressName);
if (bindings != null) {
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
final QueueBinding queueBinding = (QueueBinding) binding;
final RoutingType routingType = queueBinding.getQueue().getRoutingType();
if (!routingTypes.contains(routingType)) {
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeDelete(routingType, addressName.toString());
}
}
}
}
}
return isNew;
@Override
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
return this.addressInfoMap.compute(addressInfo.getName(), (name, oldAddressInfo) -> {
if (oldAddressInfo != null) {
final Set<RoutingType> routingTypes = addressInfo.getRoutingTypes();
validateRoutingTypes(name, routingTypes);
final Set<RoutingType> updatedRoutingTypes = EnumSet.copyOf(routingTypes);
oldAddressInfo.setRoutingTypes(updatedRoutingTypes);
return oldAddressInfo;
} else {
return addressInfo;
}
});
}
@Override

View File

@ -414,6 +414,17 @@ public interface ActiveMQMessageBundle {
@Message(id = 119208, value = "Invalid routing type {0}", format = Message.Format.MESSAGE_FORMAT)
IllegalArgumentException invalidRoutingType(String val);
@Message(id = 119209, value = "Can't remove MULTICAST routing type, queues exists. Please delete queues before removing this routing type.")
IllegalStateException invalidMulticastRoutingTypeDelete();
@Message(id = 119209, value = "Can''t remove routing type {0}, queues exists for address: {1}. Please delete queues before removing this routing type.", format = Message.Format.MESSAGE_FORMAT)
IllegalStateException invalidRoutingTypeDelete(RoutingType routingType, String address);
@Message(id = 119210, value = "Can''t update queue {0} with maxConsumers: {1}. Current consumers are {2}.", format = Message.Format.MESSAGE_FORMAT)
IllegalStateException invalidMaxConsumersUpdate(String queueName, int maxConsumers, int consumers);
@Message(id = 119211, value = "Can''t update queue {0} with routing type: {1}, Supported routing types for address: {2} are {3}", format = Message.Format.MESSAGE_FORMAT)
IllegalStateException invalidRoutingTypeUpdate(String queueName,
RoutingType routingType,
String address,
Set<RoutingType> supportedRoutingTypes);
}

View File

@ -17,13 +17,13 @@
package org.apache.activemq.artemis.core.server;
import javax.management.MBeanServer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
@ -422,6 +422,11 @@ public interface ActiveMQServer extends ActiveMQComponent {
boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
Queue updateQueue(String name,
RoutingType routingType,
Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception;
/*
* add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will
* replace any factories with the same protocol
@ -455,28 +460,11 @@ public interface ActiveMQServer extends ActiveMQComponent {
void removeClientConnection(String clientId);
/**
* Add the {@code routingType} from the specified {@code address}.
*
* @param address the address name
* @param routingType the routing type to be added
* @throws ActiveMQAddressDoesNotExistException
*/
void addRoutingType(String address, RoutingType routingType) throws ActiveMQAddressDoesNotExistException;
/**
* Remove the {@code routingType} from the specified {@code address}.
*
* @param address the address name
* @param routingType the routing type to be removed
* @throws ActiveMQAddressDoesNotExistException
* @throws IllegalStateException when a binding already exists and is requested to remove {@link org.apache.activemq.artemis.core.server.RoutingType#MULTICAST}.
*/
void removeRoutingType(String address, RoutingType routingType) throws Exception;
AddressInfo updateAddressInfo(String name, Collection<RoutingType> routingTypes) throws Exception;
boolean createAddressInfo(AddressInfo addressInfo) throws Exception;
boolean createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception;
AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception;
void removeAddressInfo(SimpleString address, SecurityAuth session) throws Exception;

View File

@ -54,8 +54,12 @@ public interface Queue extends Bindable {
boolean isDeleteOnNoConsumers();
void setDeleteOnNoConsumers(boolean value);
int getMaxConsumers();
void setMaxConsumer(int maxConsumers);
void addConsumer(Consumer consumer) throws Exception;
void removeConsumer(Consumer consumer);

View File

@ -27,6 +27,7 @@ import java.net.URL;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@ -48,7 +49,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -2403,15 +2403,21 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
public void addRoutingType(String address, RoutingType routingType) throws ActiveMQAddressDoesNotExistException {
public AddressInfo updateAddressInfo(String address, Collection<RoutingType> routingTypes) throws Exception {
final SimpleString addressName = new SimpleString(address);
postOffice.addRoutingType(addressName,routingType);
//after the postOffice call, updatedAddressInfo could change further (concurrently)!
final AddressInfo updatedAddressInfo = postOffice.updateAddressInfo(addressName, routingTypes);
if (updatedAddressInfo != null) {
//it change the address info without any lock!
final long txID = storageManager.generateID();
try {
storageManager.deleteAddressBinding(txID, updatedAddressInfo.getId());
storageManager.addAddressBinding(txID, updatedAddressInfo);
} finally {
storageManager.commitBindings(txID);
}
@Override
public void removeRoutingType(String address, RoutingType routingType) throws Exception {
final SimpleString addressName = new SimpleString(address);
postOffice.removeRoutingType(addressName,routingType);
}
return updatedAddressInfo;
}
@Override
@ -2430,21 +2436,21 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
public boolean createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception {
boolean result = postOffice.addOrUpdateAddressInfo(addressInfo);
public AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception {
final AddressInfo updatedAddressInfo = postOffice.addOrUpdateAddressInfo(addressInfo);
final boolean isNew = updatedAddressInfo == addressInfo;
long txID = storageManager.generateID();
if (result) {
final long txID = storageManager.generateID();
if (isNew) {
storageManager.addAddressBinding(txID, addressInfo);
storageManager.commitBindings(txID);
} else {
AddressInfo updatedAddressInfo = getAddressInfo(addressInfo.getName());
storageManager.deleteAddressBinding(txID, updatedAddressInfo.getId());
storageManager.addAddressBinding(txID, updatedAddressInfo);
storageManager.commitBindings(txID);
}
return result;
return updatedAddressInfo;
}
@Override
@ -2510,12 +2516,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, addressName);
}
AddressInfo defaultAddressInfo = new AddressInfo(addressName);
defaultAddressInfo.addRoutingType(routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType);
AddressInfo info = postOffice.getAddressInfo(addressName);
if (autoCreateAddress) {
if (info == null || !info.getRoutingTypes().contains(routingType)) {
final AddressInfo defaultAddressInfo = new AddressInfo(addressName);
defaultAddressInfo.addRoutingType(routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType);
createOrUpdateAddressInfo(defaultAddressInfo.setAutoCreated(true));
}
} else if (info == null) {
@ -2571,6 +2577,31 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return queue;
}
@Override
public Queue updateQueue(String name,
RoutingType routingType,
Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception {
final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, maxConsumers, deleteOnNoConsumers);
if (queueBinding != null) {
final Queue queue = queueBinding.getQueue();
if (queue.isDurable()) {
final long txID = storageManager.generateID();
try {
storageManager.deleteQueueBinding(txID, queueBinding.getID());
storageManager.addQueueBinding(txID, queueBinding);
storageManager.commitBindings(txID);
} catch (Throwable throwable) {
storageManager.rollbackBindings(txID);
throw throwable;
}
}
return queue;
} else {
return null;
}
}
private void deployDiverts() throws Exception {
for (DivertConfiguration config : configuration.getDivertConfigurations()) {
deployDivert(config);

View File

@ -65,11 +65,11 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
@ -240,15 +240,15 @@ public class QueueImpl implements Queue {
private SlowConsumerReaperRunnable slowConsumerReaperRunnable;
private int maxConsumers;
private volatile int maxConsumers;
private boolean deleteOnNoConsumers;
private volatile boolean deleteOnNoConsumers;
private final AddressInfo addressInfo;
private final AtomicInteger noConsumers = new AtomicInteger(0);
private RoutingType routingType;
private volatile RoutingType routingType;
/**
* This is to avoid multi-thread races on calculating direct delivery,
@ -482,11 +482,21 @@ public class QueueImpl implements Queue {
return deleteOnNoConsumers;
}
@Override
public synchronized void setDeleteOnNoConsumers(boolean value) {
this.deleteOnNoConsumers = value;
}
@Override
public int getMaxConsumers() {
return maxConsumers;
}
@Override
public synchronized void setMaxConsumer(int maxConsumers) {
this.maxConsumers = maxConsumers;
}
@Override
public SimpleString getName() {
return name;

View File

@ -41,10 +41,10 @@ import org.apache.activemq.artemis.core.message.BodyEncoder;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
@ -837,6 +837,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
public class FakeQueueForScheduleUnitTest implements Queue {
@Override
public void setDeleteOnNoConsumers(boolean value) {
}
@Override
public void setMaxConsumer(int maxConsumers) {
}
@Override
public void unproposed(SimpleString groupID) {

View File

@ -18,16 +18,16 @@ package org.apache.activemq.artemis.tests.integration.cli;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.text.MessageFormat;
import java.util.EnumSet;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.cli.commands.AbstractAction;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.address.AddRoutingType;
import org.apache.activemq.artemis.cli.commands.address.CreateAddress;
import org.apache.activemq.artemis.cli.commands.address.DeleteAddress;
import org.apache.activemq.artemis.cli.commands.address.RemoveRoutingType;
import org.apache.activemq.artemis.cli.commands.address.ShowAddress;
import org.apache.activemq.artemis.cli.commands.address.UpdateAddress;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@ -160,16 +160,16 @@ public class AddressCommandTest extends JMSTestBase {
}
@Test
public void testAddRoutingType() throws Exception {
public void testUpdateAddressRoutingTypes() throws Exception {
final String addressName = "address";
final SimpleString address = new SimpleString(addressName);
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
final AddRoutingType addRoutingType = new AddRoutingType();
addRoutingType.setName(addressName);
addRoutingType.setRoutingType(RoutingType.MULTICAST.toString());
addRoutingType.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionPassed(addRoutingType);
final UpdateAddress updateAddress = new UpdateAddress();
updateAddress.setName(addressName);
updateAddress.setRoutingTypes(RoutingType.MULTICAST.toString() + ',' + RoutingType.ANYCAST.toString());
updateAddress.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionPassed(updateAddress);
final AddressInfo addressInfo = server.getAddressInfo(address);
assertNotNull(addressInfo);
@ -177,59 +177,29 @@ public class AddressCommandTest extends JMSTestBase {
}
@Test
public void testFailAddRoutingTypeAddressDoesNotExist() throws Exception {
public void testFailUpdateAddressDoesNotExist() throws Exception {
final String addressName = "address";
final AddRoutingType addRoutingType = new AddRoutingType();
addRoutingType.setName(addressName);
addRoutingType.setRoutingType(RoutingType.MULTICAST.toString());
addRoutingType.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionFailure(addRoutingType, "Address Does Not Exist");
final AddressInfo addressInfo = server.getAddressInfo(new SimpleString(addressName));
assertNull(addressInfo);
final UpdateAddress updateAddress = new UpdateAddress();
updateAddress.setName(addressName);
updateAddress.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionFailure(updateAddress, "Address Does Not Exist");
}
@Test
public void testRemoveRoutingType() throws Exception {
final String addressName = "address";
final SimpleString address = new SimpleString(addressName);
server.createAddressInfo(new AddressInfo(address, EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST)));
final RemoveRoutingType removeRoutingType = new RemoveRoutingType();
removeRoutingType.setName(addressName);
removeRoutingType.setRoutingType(RoutingType.MULTICAST.toString());
removeRoutingType.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionPassed(removeRoutingType);
final AddressInfo addressInfo = server.getAddressInfo(new SimpleString(addressName));
assertNotNull(addressInfo);
assertEquals(EnumSet.of(RoutingType.ANYCAST), addressInfo.getRoutingTypes());
}
@Test
public void testFailRemoveRoutingTypeAddressDoesNotExist() throws Exception {
final String addressName = "address";
final RemoveRoutingType removeRoutingType = new RemoveRoutingType();
removeRoutingType.setName(addressName);
removeRoutingType.setRoutingType(RoutingType.MULTICAST.toString());
removeRoutingType.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionFailure(removeRoutingType, "Address Does Not Exist");
final AddressInfo addressInfo = server.getAddressInfo(new SimpleString(addressName));
assertNull(addressInfo);
}
@Test
public void testFailRemoveMulticastRoutingTypeWhenExistsQueues() throws Exception {
public void testFailUpdateAddressRoutingTypesWhenExistsQueues() throws Exception {
final String addressName = "address";
final SimpleString addressSimpleString = new SimpleString(addressName);
final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST));
server.createAddressInfo(addressInfo);
server.createQueue(addressSimpleString, RoutingType.MULTICAST, new SimpleString("queue1"), null, true, false);
final RemoveRoutingType removeRoutingType = new RemoveRoutingType();
removeRoutingType.setName(addressName);
removeRoutingType.setRoutingType(RoutingType.MULTICAST.toString());
removeRoutingType.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionFailure(removeRoutingType, "Can't remove MULTICAST routing type, queues exists. Please delete queues before removing this routing type.");
final UpdateAddress updateAddress = new UpdateAddress();
updateAddress.setName(addressName);
updateAddress.setRoutingTypes(RoutingType.ANYCAST.toString());
updateAddress.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
final String expectedErrorMessage = MessageFormat.format("Can''t remove routing type {0}, queues exists for address: {1}. Please delete queues before removing this routing type.", RoutingType.MULTICAST, addressName);
checkExecutionFailure(updateAddress, expectedErrorMessage);
}
private void checkExecutionPassed(AbstractAction command) throws Exception {

View File

@ -18,6 +18,9 @@ package org.apache.activemq.artemis.tests.integration.cli;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.text.MessageFormat;
import java.util.EnumSet;
import java.util.Set;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -25,7 +28,9 @@ import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.queue.CreateQueue;
import org.apache.activemq.artemis.cli.commands.queue.DeleteQueue;
import org.apache.activemq.artemis.cli.commands.AbstractAction;
import org.apache.activemq.artemis.cli.commands.queue.UpdateQueue;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
@ -224,6 +229,107 @@ public class QueueCommandTest extends JMSTestBase {
assertNull(server.getAddressInfo(queueName));
}
@Test
public void testUpdateCoreQueue() throws Exception {
final String queueName = "updateQueue";
final SimpleString queueNameString = new SimpleString(queueName);
final String addressName = "address";
final SimpleString addressSimpleString = new SimpleString(addressName);
final int oldMaxConsumers = -1;
final RoutingType oldRoutingType = RoutingType.MULTICAST;
final boolean oldDeleteOnNoConsumers = false;
final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST));
server.createAddressInfo(addressInfo);
server.createQueue(addressSimpleString, oldRoutingType, queueNameString, null, true, false, oldMaxConsumers, oldDeleteOnNoConsumers, false);
final int newMaxConsumers = 1;
final RoutingType newRoutingType = RoutingType.ANYCAST;
final boolean newDeleteOnNoConsumers = true;
final UpdateQueue updateQueue = new UpdateQueue();
updateQueue.setName(queueName);
updateQueue.setDeleteOnNoConsumers(newDeleteOnNoConsumers);
updateQueue.setRoutingType(newRoutingType.name());
updateQueue.setMaxConsumers(newMaxConsumers);
updateQueue.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionPassed(updateQueue);
final QueueQueryResult queueQueryResult = server.queueQuery(queueNameString);
assertEquals("maxConsumers", newMaxConsumers, queueQueryResult.getMaxConsumers());
assertEquals("routingType", newRoutingType, queueQueryResult.getRoutingType());
assertTrue("deleteOnNoConsumers", newDeleteOnNoConsumers == queueQueryResult.isDeleteOnNoConsumers());
}
@Test
public void testUpdateCoreQueueCannotChangeRoutingType() throws Exception {
final String queueName = "updateQueue";
final SimpleString queueNameString = new SimpleString(queueName);
final String addressName = "address";
final SimpleString addressSimpleString = new SimpleString(addressName);
final int oldMaxConsumers = 10;
final RoutingType oldRoutingType = RoutingType.MULTICAST;
final boolean oldDeleteOnNoConsumers = false;
final Set<RoutingType> supportedRoutingTypes = EnumSet.of(oldRoutingType);
final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.copyOf(supportedRoutingTypes));
server.createAddressInfo(addressInfo);
server.createQueue(addressSimpleString, oldRoutingType, queueNameString, null, true, false, oldMaxConsumers, oldDeleteOnNoConsumers, false);
final RoutingType newRoutingType = RoutingType.ANYCAST;
final UpdateQueue updateQueue = new UpdateQueue();
updateQueue.setName(queueName);
updateQueue.setRoutingType(newRoutingType.name());
updateQueue.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
final String expectedErrorMessage = MessageFormat.format("Can''t update queue {0} with routing type: {1}, Supported routing types for address: {2} are {3}", queueName, newRoutingType, addressName, supportedRoutingTypes);
checkExecutionFailure(updateQueue, expectedErrorMessage);
final QueueQueryResult queueQueryResult = server.queueQuery(queueNameString);
assertEquals("maxConsumers", oldMaxConsumers, queueQueryResult.getMaxConsumers());
assertEquals("routingType", oldRoutingType, queueQueryResult.getRoutingType());
assertTrue("deleteOnNoConsumers", oldDeleteOnNoConsumers == queueQueryResult.isDeleteOnNoConsumers());
}
@Test
public void testUpdateCoreQueueCannotLowerMaxConsumers() throws Exception {
final String queueName = "updateQueue";
final SimpleString queueNameString = new SimpleString(queueName);
final String addressName = "address";
final SimpleString addressSimpleString = new SimpleString(addressName);
final int oldMaxConsumers = 2;
final RoutingType oldRoutingType = RoutingType.MULTICAST;
final boolean oldDeleteOnNoConsumers = false;
final AddressInfo addressInfo = new AddressInfo(addressSimpleString, oldRoutingType);
server.createAddressInfo(addressInfo);
server.createQueue(addressSimpleString, oldRoutingType, queueNameString, null, true, false, oldMaxConsumers, oldDeleteOnNoConsumers, false);
server.locateQueue(queueNameString).addConsumer(new DummyServerConsumer());
server.locateQueue(queueNameString).addConsumer(new DummyServerConsumer());
final int newMaxConsumers = 1;
final UpdateQueue updateQueue = new UpdateQueue();
updateQueue.setName(queueName);
updateQueue.setMaxConsumers(newMaxConsumers);
updateQueue.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
final String expectedErrorMessage = MessageFormat.format("Can''t update queue {0} with maxConsumers: {1}. Current consumers are {2}.", queueName, newMaxConsumers, 2);
checkExecutionFailure(updateQueue, expectedErrorMessage);
final QueueQueryResult queueQueryResult = server.queueQuery(queueNameString);
assertEquals("maxConsumers", oldMaxConsumers, queueQueryResult.getMaxConsumers());
}
@Test
public void testUpdateCoreQueueDoesNotExist() throws Exception {
SimpleString queueName = new SimpleString("updateQueue");
UpdateQueue updateQueue = new UpdateQueue();
updateQueue.setName(queueName.toString());
updateQueue.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
checkExecutionFailure(updateQueue, "AMQ119017: Queue " + queueName + " does not exist");
assertFalse(server.queueQuery(queueName).isExists());
}
private void checkExecutionPassed(AbstractAction command) throws Exception {
String fullMessage = output.toString();
System.out.println("output: " + fullMessage);

View File

@ -60,13 +60,8 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
return new ActiveMQServerControl() {
@Override
public void addRoutingType(String name, String routingType) throws Exception {
proxy.invokeOperation("addRoutingType", name, routingType);
}
@Override
public void removeRoutingType(String name, String routingType) throws Exception {
proxy.invokeOperation("removeRoutingType", name, routingType);
public String updateAddress(String name, String routingTypes) throws Exception {
return (String) proxy.invokeOperation("updateAddress", name, routingTypes);
}
@Override
@ -112,7 +107,7 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
}
@Override
public void createQueue(String address,
public String createQueue(String address,
String routingType,
String name,
String filterStr,
@ -120,9 +115,16 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
int maxConsumers,
boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception {
proxy.invokeOperation("createQueue", address, routingType, name, filterStr, durable, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
return (String) proxy.invokeOperation("createQueue", address, routingType, name, filterStr, durable, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
}
@Override
public String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
@Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean deleteOnNoConsumers) throws Exception {
return (String) proxy.invokeOperation("updateQueue", name, routingType, maxConsumers, deleteOnNoConsumers);
}
@Override
public void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception {
@ -591,8 +593,8 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
}
@Override
public void createAddress(String name, String routingTypes) throws Exception {
proxy.invokeOperation("createAddress", name, routingTypes);
public String createAddress(String name, String routingTypes) throws Exception {
return (String) proxy.invokeOperation("createAddress", name, routingTypes);
}
@Override

View File

@ -26,10 +26,10 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -38,6 +38,16 @@ import org.apache.activemq.artemis.utils.ReferenceCounter;
public class FakeQueue implements Queue {
@Override
public void setDeleteOnNoConsumers(boolean value) {
}
@Override
public void setMaxConsumer(int maxConsumers) {
}
@Override
public boolean isInternalQueue() {
// no-op

View File

@ -16,11 +16,11 @@
*/
package org.apache.activemq.artemis.tests.unit.core.server.impl.fakes;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
import org.apache.activemq.artemis.core.server.MessageReference;
@ -42,14 +43,17 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
public class FakePostOffice implements PostOffice {
@Override
public void addRoutingType(SimpleString addressName,
RoutingType routingType) throws ActiveMQAddressDoesNotExistException {
public QueueBinding updateQueue(SimpleString name,
RoutingType routingType,
Integer maxConsumers,
Boolean deleteOnNoConsumers) throws Exception {
return null;
}
@Override
public void removeRoutingType(SimpleString addressName, RoutingType routingType) throws Exception {
public AddressInfo updateAddressInfo(SimpleString addressName,
Collection<RoutingType> routingTypes) throws Exception {
return null;
}
@Override
@ -90,8 +94,8 @@ public class FakePostOffice implements PostOffice {
}
@Override
public boolean addOrUpdateAddressInfo(AddressInfo addressInfo) {
return false;
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
return null;
}