ARTEMIS-878: Improved CLI commands
This commit is contained in:
parent
23e5650d9f
commit
0a4d1b38c8
|
@ -30,18 +30,18 @@ import org.apache.activemq.artemis.cli.commands.Kill;
|
|||
import org.apache.activemq.artemis.cli.commands.Mask;
|
||||
import org.apache.activemq.artemis.cli.commands.Run;
|
||||
import org.apache.activemq.artemis.cli.commands.Stop;
|
||||
import org.apache.activemq.artemis.cli.commands.address.AddRoutingType;
|
||||
import org.apache.activemq.artemis.cli.commands.address.CreateAddress;
|
||||
import org.apache.activemq.artemis.cli.commands.address.DeleteAddress;
|
||||
import org.apache.activemq.artemis.cli.commands.address.HelpAddress;
|
||||
import org.apache.activemq.artemis.cli.commands.address.RemoveRoutingType;
|
||||
import org.apache.activemq.artemis.cli.commands.address.ShowAddress;
|
||||
import org.apache.activemq.artemis.cli.commands.address.UpdateAddress;
|
||||
import org.apache.activemq.artemis.cli.commands.queue.CreateQueue;
|
||||
import org.apache.activemq.artemis.cli.commands.queue.DeleteQueue;
|
||||
import org.apache.activemq.artemis.cli.commands.queue.HelpQueue;
|
||||
import org.apache.activemq.artemis.cli.commands.messages.Browse;
|
||||
import org.apache.activemq.artemis.cli.commands.messages.Consumer;
|
||||
import org.apache.activemq.artemis.cli.commands.messages.Producer;
|
||||
import org.apache.activemq.artemis.cli.commands.queue.UpdateQueue;
|
||||
import org.apache.activemq.artemis.cli.commands.tools.CompactJournal;
|
||||
import org.apache.activemq.artemis.cli.commands.tools.DecodeJournal;
|
||||
import org.apache.activemq.artemis.cli.commands.tools.EncodeJournal;
|
||||
|
@ -134,11 +134,11 @@ public class Artemis {
|
|||
String instance = artemisInstance != null ? artemisInstance.getAbsolutePath() : System.getProperty("artemis.instance");
|
||||
Cli.CliBuilder<Action> builder = Cli.<Action>builder("artemis").withDescription("ActiveMQ Artemis Command Line").withCommand(HelpAction.class).withCommand(Producer.class).withCommand(Consumer.class).withCommand(Browse.class).withCommand(Mask.class).withDefaultCommand(HelpAction.class);
|
||||
|
||||
builder.withGroup("queue").withDescription("Queue tools group (create|delete) (example ./artemis queue create)").
|
||||
withDefaultCommand(HelpQueue.class).withCommands(CreateQueue.class, DeleteQueue.class);
|
||||
builder.withGroup("queue").withDescription("Queue tools group (create|delete|update) (example ./artemis queue create)").
|
||||
withDefaultCommand(HelpQueue.class).withCommands(CreateQueue.class, DeleteQueue.class, UpdateQueue.class);
|
||||
|
||||
builder.withGroup("address").withDescription("Address tools group (create|delete|addRoutingType|removeRoutingType|show) (example ./artemis address create)").
|
||||
withDefaultCommand(HelpAddress.class).withCommands(CreateAddress.class, DeleteAddress.class, AddRoutingType.class, RemoveRoutingType.class, ShowAddress.class);
|
||||
builder.withGroup("address").withDescription("Address tools group (create|delete|update|show) (example ./artemis address create)").
|
||||
withDefaultCommand(HelpAddress.class).withCommands(CreateAddress.class, DeleteAddress.class, UpdateAddress.class, ShowAddress.class);
|
||||
|
||||
if (instance != null) {
|
||||
builder.withGroup("data").withDescription("data tools group (print|exp|imp|exp|encode|decode|compact) (example ./artemis data print)").
|
||||
|
|
|
@ -50,7 +50,8 @@ public class CreateAddress extends AbstractAction {
|
|||
|
||||
@Override
|
||||
public void requestSuccessful(ClientMessage reply) throws Exception {
|
||||
context.out.println("Address " + getName() + " created successfully.");
|
||||
final String result = ManagementHelper.getResult(reply, String.class) + " created successfully.";
|
||||
context.out.println(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,32 +24,33 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
|||
import org.apache.activemq.artemis.cli.commands.AbstractAction;
|
||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||
|
||||
@Command(name = "addRoutingType", description = "add the provided routing types to an address")
|
||||
public class AddRoutingType extends AbstractAction {
|
||||
@Command(name = "update", description = "update an address")
|
||||
public class UpdateAddress extends AbstractAction {
|
||||
|
||||
@Option(name = "--name", description = "The name of this address", required = true)
|
||||
String name;
|
||||
|
||||
@Option(name = "--routingType", description = "The routing types to be added to this address, options are 'anycast' or 'multicast'", required = true)
|
||||
String routingType;
|
||||
@Option(name = "--routingTypes", description = "The routing types supported by this address, options are 'anycast' or 'multicast', enter comma separated list")
|
||||
String routingTypes = null;
|
||||
|
||||
@Override
|
||||
public Object execute(ActionContext context) throws Exception {
|
||||
super.execute(context);
|
||||
addRoutingType(context);
|
||||
updateAddress(context);
|
||||
return null;
|
||||
}
|
||||
|
||||
private void addRoutingType(final ActionContext context) throws Exception {
|
||||
private void updateAddress(final ActionContext context) throws Exception {
|
||||
performCoreManagement(new AbstractAction.ManagementCallback<ClientMessage>() {
|
||||
@Override
|
||||
public void setUpInvocation(ClientMessage message) throws Exception {
|
||||
ManagementHelper.putOperationInvocation(message, "broker", "addRoutingType", name, routingType);
|
||||
ManagementHelper.putOperationInvocation(message, "broker", "updateAddress", name, routingTypes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void requestSuccessful(ClientMessage reply) throws Exception {
|
||||
context.out.println("Address " + name + " updated successfully.");
|
||||
final String result = ManagementHelper.getResult(reply, String.class) + " updated successfully.";
|
||||
context.out.println(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -68,11 +69,11 @@ public class AddRoutingType extends AbstractAction {
|
|||
this.name = name;
|
||||
}
|
||||
|
||||
public String getRoutingType() {
|
||||
return routingType;
|
||||
public String getRoutingTypes() {
|
||||
return routingTypes;
|
||||
}
|
||||
|
||||
public void setRoutingType(String routingType) {
|
||||
this.routingType = routingType;
|
||||
public void setRoutingTypes(String routingTypes) {
|
||||
this.routingTypes = routingTypes;
|
||||
}
|
||||
}
|
|
@ -75,7 +75,8 @@ public class CreateQueue extends AbstractAction {
|
|||
|
||||
@Override
|
||||
public void requestSuccessful(ClientMessage reply) throws Exception {
|
||||
context.out.println("Core queue " + getName() + " created successfully.");
|
||||
final String result = ManagementHelper.getResult(reply, String.class) + " created successfully.";
|
||||
context.out.println(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.cli.commands.address;
|
||||
package org.apache.activemq.artemis.cli.commands.queue;
|
||||
|
||||
import io.airlift.airline.Command;
|
||||
import io.airlift.airline.Option;
|
||||
|
@ -24,38 +24,45 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
|||
import org.apache.activemq.artemis.cli.commands.AbstractAction;
|
||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||
|
||||
@Command(name = "removeRoutingType", description = "remove the provided routing types from an address")
|
||||
public class RemoveRoutingType extends AbstractAction {
|
||||
@Command(name = "update", description = "update a core queue")
|
||||
public class UpdateQueue extends AbstractAction {
|
||||
|
||||
@Option(name = "--name", description = "The name of the address", required = true)
|
||||
@Option(name = "--name", description = "name", required = true)
|
||||
String name;
|
||||
|
||||
@Option(name = "--routingType", description = "The routing types to be removed from this address, options are 'anycast' or 'multicast'", required = true)
|
||||
String routingType;
|
||||
@Option(name = "--deleteOnNoConsumers", description = "whether to delete when it's last consumers disconnects)")
|
||||
Boolean deleteOnNoConsumers = null;
|
||||
|
||||
@Option(name = "--maxConsumers", description = "Maximum number of consumers allowed at any one time")
|
||||
Integer maxConsumers = null;
|
||||
|
||||
@Option(name = "--routingType", description = "The routing type supported by this queue, options are 'anycast' or 'multicast'")
|
||||
String routingType = null;
|
||||
|
||||
@Override
|
||||
public Object execute(ActionContext context) throws Exception {
|
||||
super.execute(context);
|
||||
removeRoutingType(context);
|
||||
createQueue(context);
|
||||
return null;
|
||||
}
|
||||
|
||||
private void removeRoutingType(final ActionContext context) throws Exception {
|
||||
performCoreManagement(new AbstractAction.ManagementCallback<ClientMessage>() {
|
||||
private void createQueue(final ActionContext context) throws Exception {
|
||||
performCoreManagement(new ManagementCallback<ClientMessage>() {
|
||||
@Override
|
||||
public void setUpInvocation(ClientMessage message) throws Exception {
|
||||
ManagementHelper.putOperationInvocation(message, "broker", "removeRoutingType", name, routingType);
|
||||
ManagementHelper.putOperationInvocation(message, "broker", "updateQueue", name, routingType, maxConsumers, deleteOnNoConsumers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void requestSuccessful(ClientMessage reply) throws Exception {
|
||||
context.out.println("Address " + name + " updated successfully.");
|
||||
final String result = ManagementHelper.getResult(reply, String.class) + " updated successfully.";
|
||||
context.out.println(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void requestFailed(ClientMessage reply) throws Exception {
|
||||
String errMsg = (String) ManagementHelper.getResult(reply, String.class);
|
||||
context.err.println("Failed to update address " + name + ". Reason: " + errMsg);
|
||||
context.err.println("Failed to update " + name + ". Reason: " + errMsg);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -68,6 +75,22 @@ public class RemoveRoutingType extends AbstractAction {
|
|||
this.name = name;
|
||||
}
|
||||
|
||||
public Boolean getDeleteOnNoConsumers() {
|
||||
return deleteOnNoConsumers;
|
||||
}
|
||||
|
||||
public void setDeleteOnNoConsumers(boolean deleteOnNoConsumers) {
|
||||
this.deleteOnNoConsumers = deleteOnNoConsumers;
|
||||
}
|
||||
|
||||
public Integer getMaxConsumers() {
|
||||
return maxConsumers;
|
||||
}
|
||||
|
||||
public void setMaxConsumers(int maxConsumers) {
|
||||
this.maxConsumers = maxConsumers;
|
||||
}
|
||||
|
||||
public String getRoutingType() {
|
||||
return routingType;
|
||||
}
|
||||
|
@ -76,3 +99,4 @@ public class RemoveRoutingType extends AbstractAction {
|
|||
this.routingType = routingType;
|
||||
}
|
||||
}
|
||||
|
|
@ -435,16 +435,12 @@ public interface ActiveMQServerControl {
|
|||
// Operations ----------------------------------------------------
|
||||
|
||||
@Operation(desc = "create an address", impact = MBeanOperationInfo.ACTION)
|
||||
void createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
|
||||
String createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
|
||||
@Parameter(name = "routingTypes", desc = "Comma separated list of Routing Types (anycast/multicast)") String routingTypes) throws Exception;
|
||||
|
||||
@Operation(desc = "add the provided routing type to an address", impact = MBeanOperationInfo.ACTION)
|
||||
void addRoutingType(@Parameter(name = "name", desc = "The name of the address") String name,
|
||||
@Parameter(name = "routingType", desc = "The routing types to be added to this address, options are 'anycast' or 'multicast'") String routingType) throws Exception;
|
||||
|
||||
@Operation(desc = "remove the provided routing type to an address", impact = MBeanOperationInfo.ACTION)
|
||||
void removeRoutingType(@Parameter(name = "name", desc = "The name of the address") String name,
|
||||
@Parameter(name = "routingType", desc = "The routing types to be added to this address, options are 'anycast' or 'multicast'") String routingType) throws Exception;
|
||||
@Operation(desc = "update an address", impact = MBeanOperationInfo.ACTION)
|
||||
String updateAddress(@Parameter(name = "name", desc = "The name of the address") String name,
|
||||
@Parameter(name = "routingTypes", desc = "Comma separated list of Routing Types (anycast/multicast)") String routingTypes) throws Exception;
|
||||
|
||||
@Operation(desc = "delete an address", impact = MBeanOperationInfo.ACTION)
|
||||
void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception;
|
||||
|
@ -571,9 +567,10 @@ public interface ActiveMQServerControl {
|
|||
* @param maxConsumers the maximum number of consumers allowed on this queue at any one time
|
||||
* @param deleteOnNoConsumers delete this queue when the last consumer disconnects
|
||||
* @param autoCreateAddress create an address with default values should a matching address not be found
|
||||
* @return a textual summary of the queue
|
||||
* @throws Exception
|
||||
*/
|
||||
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
|
||||
String createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
|
||||
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
|
||||
@Parameter(name = "name", desc = "Name of the queue") String name,
|
||||
@Parameter(name = "filter", desc = "Filter of the queue") String filterStr,
|
||||
|
@ -582,6 +579,22 @@ public interface ActiveMQServerControl {
|
|||
@Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers,
|
||||
@Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception;
|
||||
|
||||
/**
|
||||
* Update a queue.
|
||||
*
|
||||
* @param name name of the queue
|
||||
* @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
|
||||
* @param maxConsumers the maximum number of consumers allowed on this queue at any one time
|
||||
* @param deleteOnNoConsumers delete this queue when the last consumer disconnects
|
||||
* @return a textual summary of the queue
|
||||
* @throws Exception
|
||||
*/
|
||||
String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
|
||||
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
|
||||
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
|
||||
@Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean deleteOnNoConsumers) throws Exception;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Deploy a durable queue.
|
||||
|
|
|
@ -60,6 +60,7 @@ import org.apache.activemq.artemis.core.config.BridgeConfiguration;
|
|||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager;
|
||||
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
|
@ -562,8 +563,58 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
|||
}
|
||||
}
|
||||
|
||||
private enum AddressInfoTextFormatter {
|
||||
Long {
|
||||
@Override
|
||||
public void createAddress(String name, String routingTypes) throws Exception {
|
||||
public StringBuilder format(AddressInfo addressInfo, StringBuilder output) {
|
||||
output.append("Address [name=").append(addressInfo.getName());
|
||||
output.append(", routingTypes={");
|
||||
final Set<RoutingType> routingTypes = addressInfo.getRoutingTypes();
|
||||
if (!routingTypes.isEmpty()) {
|
||||
for (RoutingType routingType : routingTypes) {
|
||||
output.append(routingType).append(',');
|
||||
}
|
||||
// delete hanging comma
|
||||
output.deleteCharAt(output.length() - 1);
|
||||
}
|
||||
output.append('}');
|
||||
output.append(", autoCreated=").append(addressInfo.isAutoCreated());
|
||||
output.append(']');
|
||||
return output;
|
||||
}
|
||||
};
|
||||
|
||||
public abstract StringBuilder format(AddressInfo addressInfo, StringBuilder output);
|
||||
}
|
||||
|
||||
public enum QueueTextFormatter {
|
||||
Long {
|
||||
@Override
|
||||
StringBuilder format(Queue queue, StringBuilder output) {
|
||||
output.append("Queue [name=").append(queue.getName());
|
||||
output.append(", address=").append(queue.getAddress());
|
||||
output.append(", routingType=").append(queue.getRoutingType());
|
||||
final Filter filter = queue.getFilter();
|
||||
if (filter != null) {
|
||||
output.append(", filter=").append(filter.getFilterString());
|
||||
}
|
||||
output.append(", durable=").append(queue.isDurable());
|
||||
final int maxConsumers = queue.getMaxConsumers();
|
||||
if (maxConsumers != Queue.MAX_CONSUMERS_UNLIMITED) {
|
||||
output.append(", maxConsumers=").append(queue.getMaxConsumers());
|
||||
}
|
||||
output.append(", deleteOnNoConsumers=").append(queue.isDeleteOnNoConsumers());
|
||||
output.append(", autoCreateAddress=").append(queue.isAutoCreated());
|
||||
output.append(']');
|
||||
return output;
|
||||
}
|
||||
};
|
||||
|
||||
abstract StringBuilder format(Queue queue, StringBuilder output);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createAddress(String name, String routingTypes) throws Exception {
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
|
@ -572,33 +623,38 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
|||
for (String routingType : toList(routingTypes)) {
|
||||
set.add(RoutingType.valueOf(routingType));
|
||||
}
|
||||
server.createAddressInfo(new AddressInfo(new SimpleString(name), set));
|
||||
final AddressInfo addressInfo = new AddressInfo(new SimpleString(name), set);
|
||||
if (server.createAddressInfo(addressInfo)) {
|
||||
return AddressInfoTextFormatter.Long.format(addressInfo, new StringBuilder()).toString();
|
||||
} else {
|
||||
return "";
|
||||
}
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addRoutingType(String name, String routingTypeName) throws Exception {
|
||||
public String updateAddress(String name, String routingTypes) throws Exception {
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
try {
|
||||
final RoutingType routingType = RoutingType.valueOf(routingTypeName);
|
||||
server.addRoutingType(name, routingType);
|
||||
} finally {
|
||||
blockOnIO();
|
||||
final Set<RoutingType> routingTypeSet;
|
||||
if (routingTypes == null) {
|
||||
routingTypeSet = null;
|
||||
} else {
|
||||
routingTypeSet = new HashSet<>();
|
||||
final String[] routingTypeNames = routingTypes.split(",");
|
||||
for (String routingTypeName : routingTypeNames) {
|
||||
routingTypeSet.add(RoutingType.valueOf(routingTypeName));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeRoutingType(String name, String routingTypeName) throws Exception {
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
try {
|
||||
final RoutingType routingType = RoutingType.valueOf(routingTypeName);
|
||||
server.removeRoutingType(name, routingType);
|
||||
final AddressInfo updatedAddressInfo = server.updateAddressInfo(name, routingTypeSet);
|
||||
if (updatedAddressInfo == null) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(name));
|
||||
}
|
||||
return AddressInfoTextFormatter.Long.format(updatedAddressInfo, new StringBuilder()).toString();
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
|
@ -672,7 +728,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
|||
}
|
||||
|
||||
@Override
|
||||
public void createQueue(String address,
|
||||
public String createQueue(String address,
|
||||
String routingType,
|
||||
String name,
|
||||
String filterStr,
|
||||
|
@ -690,12 +746,32 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
|||
filter = new SimpleString(filterStr);
|
||||
}
|
||||
|
||||
server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
|
||||
final Queue queue = server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
|
||||
return QueueTextFormatter.Long.format(queue, new StringBuilder()).toString();
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String updateQueue(String name,
|
||||
String routingType,
|
||||
Integer maxConsumers,
|
||||
Boolean deleteOnNoConsumers) throws Exception {
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
|
||||
try {
|
||||
final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, maxConsumers, deleteOnNoConsumers);
|
||||
if (queue == null) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(new SimpleString(name));
|
||||
}
|
||||
return QueueTextFormatter.Long.format(queue, new StringBuilder()).toString();
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getQueueNames() {
|
||||
|
@ -804,7 +880,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
|||
if (addressInfo == null) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(address));
|
||||
} else {
|
||||
return addressInfo.toString();
|
||||
return AddressInfoTextFormatter.Long.format(addressInfo, new StringBuilder()).toString();
|
||||
}
|
||||
} finally {
|
||||
blockOnIO();
|
||||
|
|
|
@ -16,9 +16,9 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.postoffice;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
|
@ -64,14 +64,14 @@ public interface AddressManager {
|
|||
*/
|
||||
boolean addAddressInfo(AddressInfo addressInfo);
|
||||
|
||||
AddressInfo updateAddressInfoIfPresent(SimpleString addressName,
|
||||
BiFunction<? super SimpleString, ? super AddressInfo, ? extends AddressInfo> remappingFunction);
|
||||
AddressInfo updateAddressInfo(SimpleString addressName,
|
||||
Collection<RoutingType> routingTypes) throws Exception;
|
||||
|
||||
/**
|
||||
* @param addressInfo
|
||||
* @return true if the address was added, false if it was updated
|
||||
* @return the same provided {@code addressInfo} if the address was added, another if it was updated
|
||||
*/
|
||||
boolean addOrUpdateAddressInfo(AddressInfo addressInfo);
|
||||
AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo);
|
||||
|
||||
AddressInfo removeAddressInfo(SimpleString address);
|
||||
|
||||
|
|
|
@ -16,11 +16,11 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.postoffice;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
|
@ -53,20 +53,25 @@ public interface PostOffice extends ActiveMQComponent {
|
|||
|
||||
/**
|
||||
* @param addressInfo
|
||||
* @return true if the address was added, false if it was updated
|
||||
* @return the same provided {@code addressInfo} if the address was added, another if it was updated
|
||||
*/
|
||||
boolean addOrUpdateAddressInfo(AddressInfo addressInfo);
|
||||
AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo);
|
||||
|
||||
AddressInfo removeAddressInfo(SimpleString address) throws Exception;
|
||||
|
||||
AddressInfo getAddressInfo(SimpleString address);
|
||||
|
||||
void addRoutingType(SimpleString addressName, RoutingType routingType) throws ActiveMQAddressDoesNotExistException;
|
||||
AddressInfo updateAddressInfo(SimpleString addressName, Collection<RoutingType> routingTypes) throws Exception;
|
||||
|
||||
void removeRoutingType(SimpleString addressName, RoutingType routingType) throws Exception;
|
||||
QueueBinding updateQueue(SimpleString name,
|
||||
RoutingType routingType,
|
||||
Integer maxConsumers,
|
||||
Boolean deleteOnNoConsumers) throws Exception;
|
||||
|
||||
List<Queue> listQueuesForAddress(SimpleString address) throws Exception;
|
||||
|
||||
|
||||
|
||||
void addBinding(Binding binding) throws Exception;
|
||||
|
||||
Binding removeBinding(SimpleString uniqueName, Transaction tx, boolean deleteData) throws Exception;
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.artemis.core.postoffice.impl;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -32,7 +33,6 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
|
||||
|
@ -440,53 +440,68 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean addOrUpdateAddressInfo(AddressInfo addressInfo) {
|
||||
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
|
||||
synchronized (addressLock) {
|
||||
boolean result = addressManager.addOrUpdateAddressInfo(addressInfo);
|
||||
final AddressInfo updatedAddressInfo = addressManager.addOrUpdateAddressInfo(addressInfo);
|
||||
// only register address if it is newly added
|
||||
if (result) {
|
||||
final boolean isNew = updatedAddressInfo == addressInfo;
|
||||
if (isNew) {
|
||||
try {
|
||||
managementService.registerAddress(addressInfo);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
return result;
|
||||
return updatedAddressInfo;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addRoutingType(SimpleString addressName, RoutingType routingType) throws ActiveMQAddressDoesNotExistException {
|
||||
public QueueBinding updateQueue(SimpleString name,
|
||||
RoutingType routingType,
|
||||
Integer maxConsumers,
|
||||
Boolean deleteOnNoConsumers) throws Exception {
|
||||
synchronized (addressLock) {
|
||||
final AddressInfo updateAddressInfo = addressManager.updateAddressInfoIfPresent(addressName, (name, addressInfo) -> {
|
||||
addressInfo.getRoutingTypes().add(routingType);
|
||||
return addressInfo;
|
||||
});
|
||||
if (updateAddressInfo == null) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName);
|
||||
final QueueBinding queueBinding = (QueueBinding) addressManager.getBinding(name);
|
||||
if (queueBinding == null) {
|
||||
return null;
|
||||
}
|
||||
final Queue queue = queueBinding.getQueue();
|
||||
//TODO put the whole update logic on Queue
|
||||
//validate update
|
||||
if (maxConsumers != null) {
|
||||
final int consumerCount = queue.getConsumerCount();
|
||||
if (consumerCount > maxConsumers) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.invalidMaxConsumersUpdate(name.toString(), maxConsumers, consumerCount);
|
||||
}
|
||||
}
|
||||
if (routingType != null) {
|
||||
final SimpleString address = queue.getAddress();
|
||||
final AddressInfo addressInfo = addressManager.getAddressInfo(address);
|
||||
final Set<RoutingType> addressRoutingTypes = addressInfo.getRoutingTypes();
|
||||
if (!addressRoutingTypes.contains(routingType)) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeUpdate(name.toString(), routingType, address.toString(), addressRoutingTypes);
|
||||
}
|
||||
}
|
||||
//atomic update
|
||||
if (maxConsumers != null) {
|
||||
queue.setMaxConsumer(maxConsumers);
|
||||
}
|
||||
if (routingType != null) {
|
||||
queue.setRoutingType(routingType);
|
||||
}
|
||||
if (deleteOnNoConsumers != null) {
|
||||
queue.setDeleteOnNoConsumers(deleteOnNoConsumers);
|
||||
}
|
||||
return queueBinding;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeRoutingType(SimpleString addressName, RoutingType routingType) throws Exception {
|
||||
public AddressInfo updateAddressInfo(SimpleString addressName,
|
||||
Collection<RoutingType> routingTypes) throws Exception {
|
||||
synchronized (addressLock) {
|
||||
if (RoutingType.MULTICAST.equals(routingType)) {
|
||||
final Bindings bindings = addressManager.getBindingsForRoutingAddress(addressName);
|
||||
if (bindings != null) {
|
||||
final boolean existsQueueBindings = bindings.getBindings().stream().anyMatch(QueueBinding.class::isInstance);
|
||||
if (existsQueueBindings) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.invalidMulticastRoutingTypeDelete();
|
||||
}
|
||||
}
|
||||
}
|
||||
final AddressInfo updateAddressInfo = addressManager.updateAddressInfoIfPresent(addressName, (name, addressInfo) -> {
|
||||
addressInfo.getRoutingTypes().remove(routingType);
|
||||
return addressInfo;
|
||||
});
|
||||
if (updateAddressInfo == null) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName);
|
||||
}
|
||||
return addressManager.updateAddressInfo(addressName, routingTypes);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -16,12 +16,13 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.postoffice.impl;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.postoffice.Address;
|
||||
|
@ -29,6 +30,7 @@ import org.apache.activemq.artemis.core.postoffice.AddressManager;
|
|||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||
import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
|
||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
|
@ -220,26 +222,48 @@ public class SimpleAddressManager implements AddressManager {
|
|||
}
|
||||
|
||||
@Override
|
||||
public AddressInfo updateAddressInfoIfPresent(SimpleString addressName,
|
||||
BiFunction<? super SimpleString, ? super AddressInfo, ? extends AddressInfo> remappingFunction) {
|
||||
return addressInfoMap.computeIfPresent(addressName, remappingFunction);
|
||||
public AddressInfo updateAddressInfo(SimpleString addressName,
|
||||
Collection<RoutingType> routingTypes) throws Exception {
|
||||
if (routingTypes == null) {
|
||||
return this.addressInfoMap.get(addressName);
|
||||
} else {
|
||||
return this.addressInfoMap.computeIfPresent(addressName, (name, oldAddressInfo) -> {
|
||||
validateRoutingTypes(name, routingTypes);
|
||||
final Set<RoutingType> updatedRoutingTypes = EnumSet.copyOf(routingTypes);
|
||||
oldAddressInfo.setRoutingTypes(updatedRoutingTypes);
|
||||
return oldAddressInfo;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean addOrUpdateAddressInfo(AddressInfo addressInfo) {
|
||||
boolean isNew = addAddressInfo(addressInfo);
|
||||
|
||||
// address already exists so update it
|
||||
if (!isNew) {
|
||||
AddressInfo toUpdate = getAddressInfo(addressInfo.getName());
|
||||
synchronized (toUpdate) {
|
||||
for (RoutingType routingType : addressInfo.getRoutingTypes()) {
|
||||
toUpdate.addRoutingType(routingType);
|
||||
private void validateRoutingTypes(SimpleString addressName, Collection<RoutingType> routingTypes) {
|
||||
final Bindings bindings = this.mappings.get(addressName);
|
||||
if (bindings != null) {
|
||||
for (Binding binding : bindings.getBindings()) {
|
||||
if (binding instanceof QueueBinding) {
|
||||
final QueueBinding queueBinding = (QueueBinding) binding;
|
||||
final RoutingType routingType = queueBinding.getQueue().getRoutingType();
|
||||
if (!routingTypes.contains(routingType)) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeDelete(routingType, addressName.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return isNew;
|
||||
@Override
|
||||
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
|
||||
return this.addressInfoMap.compute(addressInfo.getName(), (name, oldAddressInfo) -> {
|
||||
if (oldAddressInfo != null) {
|
||||
final Set<RoutingType> routingTypes = addressInfo.getRoutingTypes();
|
||||
validateRoutingTypes(name, routingTypes);
|
||||
final Set<RoutingType> updatedRoutingTypes = EnumSet.copyOf(routingTypes);
|
||||
oldAddressInfo.setRoutingTypes(updatedRoutingTypes);
|
||||
return oldAddressInfo;
|
||||
} else {
|
||||
return addressInfo;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -414,6 +414,17 @@ public interface ActiveMQMessageBundle {
|
|||
@Message(id = 119208, value = "Invalid routing type {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||
IllegalArgumentException invalidRoutingType(String val);
|
||||
|
||||
@Message(id = 119209, value = "Can't remove MULTICAST routing type, queues exists. Please delete queues before removing this routing type.")
|
||||
IllegalStateException invalidMulticastRoutingTypeDelete();
|
||||
@Message(id = 119209, value = "Can''t remove routing type {0}, queues exists for address: {1}. Please delete queues before removing this routing type.", format = Message.Format.MESSAGE_FORMAT)
|
||||
IllegalStateException invalidRoutingTypeDelete(RoutingType routingType, String address);
|
||||
|
||||
@Message(id = 119210, value = "Can''t update queue {0} with maxConsumers: {1}. Current consumers are {2}.", format = Message.Format.MESSAGE_FORMAT)
|
||||
IllegalStateException invalidMaxConsumersUpdate(String queueName, int maxConsumers, int consumers);
|
||||
|
||||
@Message(id = 119211, value = "Can''t update queue {0} with routing type: {1}, Supported routing types for address: {2} are {3}", format = Message.Format.MESSAGE_FORMAT)
|
||||
IllegalStateException invalidRoutingTypeUpdate(String queueName,
|
||||
RoutingType routingType,
|
||||
String address,
|
||||
Set<RoutingType> supportedRoutingTypes);
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -17,13 +17,13 @@
|
|||
package org.apache.activemq.artemis.core.server;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
|
@ -422,6 +422,11 @@ public interface ActiveMQServer extends ActiveMQComponent {
|
|||
boolean deleteOnNoConsumers,
|
||||
boolean autoCreateAddress) throws Exception;
|
||||
|
||||
Queue updateQueue(String name,
|
||||
RoutingType routingType,
|
||||
Integer maxConsumers,
|
||||
Boolean deleteOnNoConsumers) throws Exception;
|
||||
|
||||
/*
|
||||
* add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will
|
||||
* replace any factories with the same protocol
|
||||
|
@ -455,28 +460,11 @@ public interface ActiveMQServer extends ActiveMQComponent {
|
|||
|
||||
void removeClientConnection(String clientId);
|
||||
|
||||
/**
|
||||
* Add the {@code routingType} from the specified {@code address}.
|
||||
*
|
||||
* @param address the address name
|
||||
* @param routingType the routing type to be added
|
||||
* @throws ActiveMQAddressDoesNotExistException
|
||||
*/
|
||||
void addRoutingType(String address, RoutingType routingType) throws ActiveMQAddressDoesNotExistException;
|
||||
|
||||
/**
|
||||
* Remove the {@code routingType} from the specified {@code address}.
|
||||
*
|
||||
* @param address the address name
|
||||
* @param routingType the routing type to be removed
|
||||
* @throws ActiveMQAddressDoesNotExistException
|
||||
* @throws IllegalStateException when a binding already exists and is requested to remove {@link org.apache.activemq.artemis.core.server.RoutingType#MULTICAST}.
|
||||
*/
|
||||
void removeRoutingType(String address, RoutingType routingType) throws Exception;
|
||||
AddressInfo updateAddressInfo(String name, Collection<RoutingType> routingTypes) throws Exception;
|
||||
|
||||
boolean createAddressInfo(AddressInfo addressInfo) throws Exception;
|
||||
|
||||
boolean createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception;
|
||||
AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception;
|
||||
|
||||
void removeAddressInfo(SimpleString address, SecurityAuth session) throws Exception;
|
||||
|
||||
|
|
|
@ -54,8 +54,12 @@ public interface Queue extends Bindable {
|
|||
|
||||
boolean isDeleteOnNoConsumers();
|
||||
|
||||
void setDeleteOnNoConsumers(boolean value);
|
||||
|
||||
int getMaxConsumers();
|
||||
|
||||
void setMaxConsumer(int maxConsumers);
|
||||
|
||||
void addConsumer(Consumer consumer) throws Exception;
|
||||
|
||||
void removeConsumer(Consumer consumer);
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.net.URL;
|
|||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -48,7 +49,6 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
@ -2403,15 +2403,21 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void addRoutingType(String address, RoutingType routingType) throws ActiveMQAddressDoesNotExistException {
|
||||
public AddressInfo updateAddressInfo(String address, Collection<RoutingType> routingTypes) throws Exception {
|
||||
final SimpleString addressName = new SimpleString(address);
|
||||
postOffice.addRoutingType(addressName,routingType);
|
||||
//after the postOffice call, updatedAddressInfo could change further (concurrently)!
|
||||
final AddressInfo updatedAddressInfo = postOffice.updateAddressInfo(addressName, routingTypes);
|
||||
if (updatedAddressInfo != null) {
|
||||
//it change the address info without any lock!
|
||||
final long txID = storageManager.generateID();
|
||||
try {
|
||||
storageManager.deleteAddressBinding(txID, updatedAddressInfo.getId());
|
||||
storageManager.addAddressBinding(txID, updatedAddressInfo);
|
||||
} finally {
|
||||
storageManager.commitBindings(txID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeRoutingType(String address, RoutingType routingType) throws Exception {
|
||||
final SimpleString addressName = new SimpleString(address);
|
||||
postOffice.removeRoutingType(addressName,routingType);
|
||||
}
|
||||
return updatedAddressInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -2430,21 +2436,21 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception {
|
||||
boolean result = postOffice.addOrUpdateAddressInfo(addressInfo);
|
||||
public AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception {
|
||||
final AddressInfo updatedAddressInfo = postOffice.addOrUpdateAddressInfo(addressInfo);
|
||||
final boolean isNew = updatedAddressInfo == addressInfo;
|
||||
|
||||
long txID = storageManager.generateID();
|
||||
if (result) {
|
||||
final long txID = storageManager.generateID();
|
||||
if (isNew) {
|
||||
storageManager.addAddressBinding(txID, addressInfo);
|
||||
storageManager.commitBindings(txID);
|
||||
} else {
|
||||
AddressInfo updatedAddressInfo = getAddressInfo(addressInfo.getName());
|
||||
storageManager.deleteAddressBinding(txID, updatedAddressInfo.getId());
|
||||
storageManager.addAddressBinding(txID, updatedAddressInfo);
|
||||
storageManager.commitBindings(txID);
|
||||
}
|
||||
|
||||
return result;
|
||||
return updatedAddressInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -2510,12 +2516,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, addressName);
|
||||
}
|
||||
|
||||
AddressInfo defaultAddressInfo = new AddressInfo(addressName);
|
||||
defaultAddressInfo.addRoutingType(routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType);
|
||||
AddressInfo info = postOffice.getAddressInfo(addressName);
|
||||
|
||||
if (autoCreateAddress) {
|
||||
if (info == null || !info.getRoutingTypes().contains(routingType)) {
|
||||
final AddressInfo defaultAddressInfo = new AddressInfo(addressName);
|
||||
defaultAddressInfo.addRoutingType(routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType);
|
||||
createOrUpdateAddressInfo(defaultAddressInfo.setAutoCreated(true));
|
||||
}
|
||||
} else if (info == null) {
|
||||
|
@ -2571,6 +2577,31 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
return queue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Queue updateQueue(String name,
|
||||
RoutingType routingType,
|
||||
Integer maxConsumers,
|
||||
Boolean deleteOnNoConsumers) throws Exception {
|
||||
final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, maxConsumers, deleteOnNoConsumers);
|
||||
if (queueBinding != null) {
|
||||
final Queue queue = queueBinding.getQueue();
|
||||
if (queue.isDurable()) {
|
||||
final long txID = storageManager.generateID();
|
||||
try {
|
||||
storageManager.deleteQueueBinding(txID, queueBinding.getID());
|
||||
storageManager.addQueueBinding(txID, queueBinding);
|
||||
storageManager.commitBindings(txID);
|
||||
} catch (Throwable throwable) {
|
||||
storageManager.rollbackBindings(txID);
|
||||
throw throwable;
|
||||
}
|
||||
}
|
||||
return queue;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private void deployDiverts() throws Exception {
|
||||
for (DivertConfiguration config : configuration.getDivertConfigurations()) {
|
||||
deployDivert(config);
|
||||
|
|
|
@ -65,11 +65,11 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
|||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.Consumer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.HandleStatus;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
|
||||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
|
||||
|
@ -240,15 +240,15 @@ public class QueueImpl implements Queue {
|
|||
|
||||
private SlowConsumerReaperRunnable slowConsumerReaperRunnable;
|
||||
|
||||
private int maxConsumers;
|
||||
private volatile int maxConsumers;
|
||||
|
||||
private boolean deleteOnNoConsumers;
|
||||
private volatile boolean deleteOnNoConsumers;
|
||||
|
||||
private final AddressInfo addressInfo;
|
||||
|
||||
private final AtomicInteger noConsumers = new AtomicInteger(0);
|
||||
|
||||
private RoutingType routingType;
|
||||
private volatile RoutingType routingType;
|
||||
|
||||
/**
|
||||
* This is to avoid multi-thread races on calculating direct delivery,
|
||||
|
@ -482,11 +482,21 @@ public class QueueImpl implements Queue {
|
|||
return deleteOnNoConsumers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setDeleteOnNoConsumers(boolean value) {
|
||||
this.deleteOnNoConsumers = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxConsumers() {
|
||||
return maxConsumers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setMaxConsumer(int maxConsumers) {
|
||||
this.maxConsumers = maxConsumers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleString getName() {
|
||||
return name;
|
||||
|
|
|
@ -41,10 +41,10 @@ import org.apache.activemq.artemis.core.message.BodyEncoder;
|
|||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||
import org.apache.activemq.artemis.core.server.Consumer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||
|
@ -837,6 +837,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
|||
|
||||
public class FakeQueueForScheduleUnitTest implements Queue {
|
||||
|
||||
@Override
|
||||
public void setDeleteOnNoConsumers(boolean value) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxConsumer(int maxConsumers) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unproposed(SimpleString groupID) {
|
||||
|
||||
|
|
|
@ -18,16 +18,16 @@ package org.apache.activemq.artemis.tests.integration.cli;
|
|||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.text.MessageFormat;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.cli.commands.AbstractAction;
|
||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||
import org.apache.activemq.artemis.cli.commands.address.AddRoutingType;
|
||||
import org.apache.activemq.artemis.cli.commands.address.CreateAddress;
|
||||
import org.apache.activemq.artemis.cli.commands.address.DeleteAddress;
|
||||
import org.apache.activemq.artemis.cli.commands.address.RemoveRoutingType;
|
||||
import org.apache.activemq.artemis.cli.commands.address.ShowAddress;
|
||||
import org.apache.activemq.artemis.cli.commands.address.UpdateAddress;
|
||||
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
|
@ -160,16 +160,16 @@ public class AddressCommandTest extends JMSTestBase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testAddRoutingType() throws Exception {
|
||||
public void testUpdateAddressRoutingTypes() throws Exception {
|
||||
final String addressName = "address";
|
||||
final SimpleString address = new SimpleString(addressName);
|
||||
server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
|
||||
|
||||
final AddRoutingType addRoutingType = new AddRoutingType();
|
||||
addRoutingType.setName(addressName);
|
||||
addRoutingType.setRoutingType(RoutingType.MULTICAST.toString());
|
||||
addRoutingType.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
|
||||
checkExecutionPassed(addRoutingType);
|
||||
final UpdateAddress updateAddress = new UpdateAddress();
|
||||
updateAddress.setName(addressName);
|
||||
updateAddress.setRoutingTypes(RoutingType.MULTICAST.toString() + ',' + RoutingType.ANYCAST.toString());
|
||||
updateAddress.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
|
||||
checkExecutionPassed(updateAddress);
|
||||
|
||||
final AddressInfo addressInfo = server.getAddressInfo(address);
|
||||
assertNotNull(addressInfo);
|
||||
|
@ -177,59 +177,29 @@ public class AddressCommandTest extends JMSTestBase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testFailAddRoutingTypeAddressDoesNotExist() throws Exception {
|
||||
public void testFailUpdateAddressDoesNotExist() throws Exception {
|
||||
final String addressName = "address";
|
||||
final AddRoutingType addRoutingType = new AddRoutingType();
|
||||
addRoutingType.setName(addressName);
|
||||
addRoutingType.setRoutingType(RoutingType.MULTICAST.toString());
|
||||
addRoutingType.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
|
||||
checkExecutionFailure(addRoutingType, "Address Does Not Exist");
|
||||
final AddressInfo addressInfo = server.getAddressInfo(new SimpleString(addressName));
|
||||
assertNull(addressInfo);
|
||||
final UpdateAddress updateAddress = new UpdateAddress();
|
||||
updateAddress.setName(addressName);
|
||||
updateAddress.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
|
||||
checkExecutionFailure(updateAddress, "Address Does Not Exist");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveRoutingType() throws Exception {
|
||||
final String addressName = "address";
|
||||
final SimpleString address = new SimpleString(addressName);
|
||||
server.createAddressInfo(new AddressInfo(address, EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST)));
|
||||
|
||||
final RemoveRoutingType removeRoutingType = new RemoveRoutingType();
|
||||
removeRoutingType.setName(addressName);
|
||||
removeRoutingType.setRoutingType(RoutingType.MULTICAST.toString());
|
||||
removeRoutingType.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
|
||||
checkExecutionPassed(removeRoutingType);
|
||||
|
||||
final AddressInfo addressInfo = server.getAddressInfo(new SimpleString(addressName));
|
||||
assertNotNull(addressInfo);
|
||||
assertEquals(EnumSet.of(RoutingType.ANYCAST), addressInfo.getRoutingTypes());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailRemoveRoutingTypeAddressDoesNotExist() throws Exception {
|
||||
final String addressName = "address";
|
||||
final RemoveRoutingType removeRoutingType = new RemoveRoutingType();
|
||||
removeRoutingType.setName(addressName);
|
||||
removeRoutingType.setRoutingType(RoutingType.MULTICAST.toString());
|
||||
removeRoutingType.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
|
||||
checkExecutionFailure(removeRoutingType, "Address Does Not Exist");
|
||||
final AddressInfo addressInfo = server.getAddressInfo(new SimpleString(addressName));
|
||||
assertNull(addressInfo);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailRemoveMulticastRoutingTypeWhenExistsQueues() throws Exception {
|
||||
public void testFailUpdateAddressRoutingTypesWhenExistsQueues() throws Exception {
|
||||
final String addressName = "address";
|
||||
final SimpleString addressSimpleString = new SimpleString(addressName);
|
||||
final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST));
|
||||
server.createAddressInfo(addressInfo);
|
||||
server.createQueue(addressSimpleString, RoutingType.MULTICAST, new SimpleString("queue1"), null, true, false);
|
||||
|
||||
final RemoveRoutingType removeRoutingType = new RemoveRoutingType();
|
||||
removeRoutingType.setName(addressName);
|
||||
removeRoutingType.setRoutingType(RoutingType.MULTICAST.toString());
|
||||
removeRoutingType.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
|
||||
checkExecutionFailure(removeRoutingType, "Can't remove MULTICAST routing type, queues exists. Please delete queues before removing this routing type.");
|
||||
final UpdateAddress updateAddress = new UpdateAddress();
|
||||
updateAddress.setName(addressName);
|
||||
updateAddress.setRoutingTypes(RoutingType.ANYCAST.toString());
|
||||
updateAddress.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
|
||||
|
||||
final String expectedErrorMessage = MessageFormat.format("Can''t remove routing type {0}, queues exists for address: {1}. Please delete queues before removing this routing type.", RoutingType.MULTICAST, addressName);
|
||||
checkExecutionFailure(updateAddress, expectedErrorMessage);
|
||||
}
|
||||
|
||||
private void checkExecutionPassed(AbstractAction command) throws Exception {
|
||||
|
|
|
@ -18,6 +18,9 @@ package org.apache.activemq.artemis.tests.integration.cli;
|
|||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.text.MessageFormat;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
@ -25,7 +28,9 @@ import org.apache.activemq.artemis.cli.commands.ActionContext;
|
|||
import org.apache.activemq.artemis.cli.commands.queue.CreateQueue;
|
||||
import org.apache.activemq.artemis.cli.commands.queue.DeleteQueue;
|
||||
import org.apache.activemq.artemis.cli.commands.AbstractAction;
|
||||
import org.apache.activemq.artemis.cli.commands.queue.UpdateQueue;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
||||
|
@ -224,6 +229,107 @@ public class QueueCommandTest extends JMSTestBase {
|
|||
assertNull(server.getAddressInfo(queueName));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateCoreQueue() throws Exception {
|
||||
final String queueName = "updateQueue";
|
||||
final SimpleString queueNameString = new SimpleString(queueName);
|
||||
final String addressName = "address";
|
||||
final SimpleString addressSimpleString = new SimpleString(addressName);
|
||||
final int oldMaxConsumers = -1;
|
||||
final RoutingType oldRoutingType = RoutingType.MULTICAST;
|
||||
final boolean oldDeleteOnNoConsumers = false;
|
||||
final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.of(RoutingType.ANYCAST, RoutingType.MULTICAST));
|
||||
server.createAddressInfo(addressInfo);
|
||||
server.createQueue(addressSimpleString, oldRoutingType, queueNameString, null, true, false, oldMaxConsumers, oldDeleteOnNoConsumers, false);
|
||||
|
||||
final int newMaxConsumers = 1;
|
||||
final RoutingType newRoutingType = RoutingType.ANYCAST;
|
||||
final boolean newDeleteOnNoConsumers = true;
|
||||
final UpdateQueue updateQueue = new UpdateQueue();
|
||||
updateQueue.setName(queueName);
|
||||
updateQueue.setDeleteOnNoConsumers(newDeleteOnNoConsumers);
|
||||
updateQueue.setRoutingType(newRoutingType.name());
|
||||
updateQueue.setMaxConsumers(newMaxConsumers);
|
||||
updateQueue.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
|
||||
|
||||
checkExecutionPassed(updateQueue);
|
||||
|
||||
final QueueQueryResult queueQueryResult = server.queueQuery(queueNameString);
|
||||
assertEquals("maxConsumers", newMaxConsumers, queueQueryResult.getMaxConsumers());
|
||||
assertEquals("routingType", newRoutingType, queueQueryResult.getRoutingType());
|
||||
assertTrue("deleteOnNoConsumers", newDeleteOnNoConsumers == queueQueryResult.isDeleteOnNoConsumers());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateCoreQueueCannotChangeRoutingType() throws Exception {
|
||||
final String queueName = "updateQueue";
|
||||
final SimpleString queueNameString = new SimpleString(queueName);
|
||||
final String addressName = "address";
|
||||
final SimpleString addressSimpleString = new SimpleString(addressName);
|
||||
final int oldMaxConsumers = 10;
|
||||
final RoutingType oldRoutingType = RoutingType.MULTICAST;
|
||||
final boolean oldDeleteOnNoConsumers = false;
|
||||
final Set<RoutingType> supportedRoutingTypes = EnumSet.of(oldRoutingType);
|
||||
final AddressInfo addressInfo = new AddressInfo(addressSimpleString, EnumSet.copyOf(supportedRoutingTypes));
|
||||
server.createAddressInfo(addressInfo);
|
||||
server.createQueue(addressSimpleString, oldRoutingType, queueNameString, null, true, false, oldMaxConsumers, oldDeleteOnNoConsumers, false);
|
||||
|
||||
final RoutingType newRoutingType = RoutingType.ANYCAST;
|
||||
final UpdateQueue updateQueue = new UpdateQueue();
|
||||
updateQueue.setName(queueName);
|
||||
updateQueue.setRoutingType(newRoutingType.name());
|
||||
updateQueue.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
|
||||
|
||||
final String expectedErrorMessage = MessageFormat.format("Can''t update queue {0} with routing type: {1}, Supported routing types for address: {2} are {3}", queueName, newRoutingType, addressName, supportedRoutingTypes);
|
||||
checkExecutionFailure(updateQueue, expectedErrorMessage);
|
||||
|
||||
final QueueQueryResult queueQueryResult = server.queueQuery(queueNameString);
|
||||
assertEquals("maxConsumers", oldMaxConsumers, queueQueryResult.getMaxConsumers());
|
||||
assertEquals("routingType", oldRoutingType, queueQueryResult.getRoutingType());
|
||||
assertTrue("deleteOnNoConsumers", oldDeleteOnNoConsumers == queueQueryResult.isDeleteOnNoConsumers());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateCoreQueueCannotLowerMaxConsumers() throws Exception {
|
||||
final String queueName = "updateQueue";
|
||||
final SimpleString queueNameString = new SimpleString(queueName);
|
||||
final String addressName = "address";
|
||||
final SimpleString addressSimpleString = new SimpleString(addressName);
|
||||
final int oldMaxConsumers = 2;
|
||||
final RoutingType oldRoutingType = RoutingType.MULTICAST;
|
||||
final boolean oldDeleteOnNoConsumers = false;
|
||||
final AddressInfo addressInfo = new AddressInfo(addressSimpleString, oldRoutingType);
|
||||
server.createAddressInfo(addressInfo);
|
||||
server.createQueue(addressSimpleString, oldRoutingType, queueNameString, null, true, false, oldMaxConsumers, oldDeleteOnNoConsumers, false);
|
||||
|
||||
server.locateQueue(queueNameString).addConsumer(new DummyServerConsumer());
|
||||
server.locateQueue(queueNameString).addConsumer(new DummyServerConsumer());
|
||||
|
||||
final int newMaxConsumers = 1;
|
||||
final UpdateQueue updateQueue = new UpdateQueue();
|
||||
updateQueue.setName(queueName);
|
||||
updateQueue.setMaxConsumers(newMaxConsumers);
|
||||
updateQueue.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
|
||||
|
||||
final String expectedErrorMessage = MessageFormat.format("Can''t update queue {0} with maxConsumers: {1}. Current consumers are {2}.", queueName, newMaxConsumers, 2);
|
||||
checkExecutionFailure(updateQueue, expectedErrorMessage);
|
||||
|
||||
final QueueQueryResult queueQueryResult = server.queueQuery(queueNameString);
|
||||
assertEquals("maxConsumers", oldMaxConsumers, queueQueryResult.getMaxConsumers());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateCoreQueueDoesNotExist() throws Exception {
|
||||
SimpleString queueName = new SimpleString("updateQueue");
|
||||
|
||||
UpdateQueue updateQueue = new UpdateQueue();
|
||||
updateQueue.setName(queueName.toString());
|
||||
updateQueue.execute(new ActionContext(System.in, new PrintStream(output), new PrintStream(error)));
|
||||
checkExecutionFailure(updateQueue, "AMQ119017: Queue " + queueName + " does not exist");
|
||||
|
||||
assertFalse(server.queueQuery(queueName).isExists());
|
||||
}
|
||||
|
||||
private void checkExecutionPassed(AbstractAction command) throws Exception {
|
||||
String fullMessage = output.toString();
|
||||
System.out.println("output: " + fullMessage);
|
||||
|
|
|
@ -60,13 +60,8 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
|
|||
return new ActiveMQServerControl() {
|
||||
|
||||
@Override
|
||||
public void addRoutingType(String name, String routingType) throws Exception {
|
||||
proxy.invokeOperation("addRoutingType", name, routingType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeRoutingType(String name, String routingType) throws Exception {
|
||||
proxy.invokeOperation("removeRoutingType", name, routingType);
|
||||
public String updateAddress(String name, String routingTypes) throws Exception {
|
||||
return (String) proxy.invokeOperation("updateAddress", name, routingTypes);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -112,7 +107,7 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
|
|||
}
|
||||
|
||||
@Override
|
||||
public void createQueue(String address,
|
||||
public String createQueue(String address,
|
||||
String routingType,
|
||||
String name,
|
||||
String filterStr,
|
||||
|
@ -120,9 +115,16 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
|
|||
int maxConsumers,
|
||||
boolean deleteOnNoConsumers,
|
||||
boolean autoCreateAddress) throws Exception {
|
||||
proxy.invokeOperation("createQueue", address, routingType, name, filterStr, durable, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
|
||||
return (String) proxy.invokeOperation("createQueue", address, routingType, name, filterStr, durable, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
|
||||
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
|
||||
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
|
||||
@Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean deleteOnNoConsumers) throws Exception {
|
||||
return (String) proxy.invokeOperation("updateQueue", name, routingType, maxConsumers, deleteOnNoConsumers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception {
|
||||
|
@ -591,8 +593,8 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
|
|||
}
|
||||
|
||||
@Override
|
||||
public void createAddress(String name, String routingTypes) throws Exception {
|
||||
proxy.invokeOperation("createAddress", name, routingTypes);
|
||||
public String createAddress(String name, String routingTypes) throws Exception {
|
||||
return (String) proxy.invokeOperation("createAddress", name, routingTypes);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -26,10 +26,10 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
|||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||
import org.apache.activemq.artemis.core.server.Consumer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||
import org.apache.activemq.artemis.core.server.impl.AckReason;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
|
@ -38,6 +38,16 @@ import org.apache.activemq.artemis.utils.ReferenceCounter;
|
|||
|
||||
public class FakeQueue implements Queue {
|
||||
|
||||
@Override
|
||||
public void setDeleteOnNoConsumers(boolean value) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxConsumer(int maxConsumers) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInternalQueue() {
|
||||
// no-op
|
||||
|
|
|
@ -16,11 +16,11 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.unit.core.server.impl.fakes;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
|
||||
|
@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
|
|||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
|
||||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
|
@ -42,14 +43,17 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
|
|||
public class FakePostOffice implements PostOffice {
|
||||
|
||||
@Override
|
||||
public void addRoutingType(SimpleString addressName,
|
||||
RoutingType routingType) throws ActiveMQAddressDoesNotExistException {
|
||||
|
||||
public QueueBinding updateQueue(SimpleString name,
|
||||
RoutingType routingType,
|
||||
Integer maxConsumers,
|
||||
Boolean deleteOnNoConsumers) throws Exception {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeRoutingType(SimpleString addressName, RoutingType routingType) throws Exception {
|
||||
|
||||
public AddressInfo updateAddressInfo(SimpleString addressName,
|
||||
Collection<RoutingType> routingTypes) throws Exception {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -90,8 +94,8 @@ public class FakePostOffice implements PostOffice {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean addOrUpdateAddressInfo(AddressInfo addressInfo) {
|
||||
return false;
|
||||
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue