ARTEMIS-1951 allow queue's user to be updated
This commit is contained in:
parent
63a4b49f4c
commit
754a263328
|
@ -605,6 +605,7 @@ public interface ActiveMQServerControl {
|
||||||
* @return a textual summary of the queue
|
* @return a textual summary of the queue
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
@Operation(desc = "Update a queue", impact = MBeanOperationInfo.ACTION)
|
@Operation(desc = "Update a queue", impact = MBeanOperationInfo.ACTION)
|
||||||
String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
|
String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
|
||||||
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
|
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
|
||||||
|
@ -618,15 +619,37 @@ public interface ActiveMQServerControl {
|
||||||
* @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
|
* @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
|
||||||
* @param maxConsumers the maximum number of consumers allowed on this queue at any one time
|
* @param maxConsumers the maximum number of consumers allowed on this queue at any one time
|
||||||
* @param purgeOnNoConsumers delete this queue when the last consumer disconnects
|
* @param purgeOnNoConsumers delete this queue when the last consumer disconnects
|
||||||
|
* @param exclusive if the queue should route exclusively to one consumer
|
||||||
* @return a textual summary of the queue
|
* @return a textual summary of the queue
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
|
@Operation(desc = "Update a queue", impact = MBeanOperationInfo.ACTION)
|
||||||
|
String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
|
||||||
|
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
|
||||||
|
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
|
||||||
|
@Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers,
|
||||||
|
@Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive) throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update a queue
|
||||||
|
*
|
||||||
|
* @param name name of the queue
|
||||||
|
* @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
|
||||||
|
* @param maxConsumers the maximum number of consumers allowed on this queue at any one time
|
||||||
|
* @param purgeOnNoConsumers delete this queue when the last consumer disconnects
|
||||||
|
* @param exclusive if the queue should route exclusively to one consumer
|
||||||
|
* @param user the user associated with this queue
|
||||||
|
* @return
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
@Operation(desc = "Update a queue", impact = MBeanOperationInfo.ACTION)
|
@Operation(desc = "Update a queue", impact = MBeanOperationInfo.ACTION)
|
||||||
String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
|
String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
|
||||||
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
|
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
|
||||||
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
|
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
|
||||||
@Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers,
|
@Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers,
|
||||||
@Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive) throws Exception;
|
@Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive,
|
||||||
|
@Parameter(name = "user", desc = "The user associated with this queue") String user) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deploy a durable queue.
|
* Deploy a durable queue.
|
||||||
|
|
|
@ -825,6 +825,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
@Override
|
@Override
|
||||||
public String updateQueue(String name,
|
public String updateQueue(String name,
|
||||||
String routingType,
|
String routingType,
|
||||||
|
@ -833,18 +834,29 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
||||||
return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, null);
|
return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
@Override
|
@Override
|
||||||
public String updateQueue(String name,
|
public String updateQueue(String name,
|
||||||
String routingType,
|
String routingType,
|
||||||
Integer maxConsumers,
|
Integer maxConsumers,
|
||||||
Boolean purgeOnNoConsumers,
|
Boolean purgeOnNoConsumers,
|
||||||
Boolean exclusive) throws Exception {
|
Boolean exclusive) throws Exception {
|
||||||
|
return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, exclusive, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String updateQueue(String name,
|
||||||
|
String routingType,
|
||||||
|
Integer maxConsumers,
|
||||||
|
Boolean purgeOnNoConsumers,
|
||||||
|
Boolean exclusive,
|
||||||
|
String user) throws Exception {
|
||||||
checkStarted();
|
checkStarted();
|
||||||
|
|
||||||
clearIO();
|
clearIO();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, maxConsumers, purgeOnNoConsumers, exclusive);
|
final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, maxConsumers, purgeOnNoConsumers, exclusive, user);
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(new SimpleString(name));
|
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(new SimpleString(name));
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,7 +68,8 @@ public interface PostOffice extends ActiveMQComponent {
|
||||||
RoutingType routingType,
|
RoutingType routingType,
|
||||||
Integer maxConsumers,
|
Integer maxConsumers,
|
||||||
Boolean purgeOnNoConsumers,
|
Boolean purgeOnNoConsumers,
|
||||||
Boolean exclusive) throws Exception;
|
Boolean exclusive,
|
||||||
|
String user) throws Exception;
|
||||||
|
|
||||||
List<Queue> listQueuesForAddress(SimpleString address) throws Exception;
|
List<Queue> listQueuesForAddress(SimpleString address) throws Exception;
|
||||||
|
|
||||||
|
|
|
@ -467,7 +467,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
RoutingType routingType,
|
RoutingType routingType,
|
||||||
Integer maxConsumers,
|
Integer maxConsumers,
|
||||||
Boolean purgeOnNoConsumers,
|
Boolean purgeOnNoConsumers,
|
||||||
Boolean exclusive) throws Exception {
|
Boolean exclusive,
|
||||||
|
String user) throws Exception {
|
||||||
synchronized (addressLock) {
|
synchronized (addressLock) {
|
||||||
final QueueBinding queueBinding = (QueueBinding) addressManager.getBinding(name);
|
final QueueBinding queueBinding = (QueueBinding) addressManager.getBinding(name);
|
||||||
if (queueBinding == null) {
|
if (queueBinding == null) {
|
||||||
|
@ -511,6 +512,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
changed = true;
|
changed = true;
|
||||||
queue.setExclusive(exclusive);
|
queue.setExclusive(exclusive);
|
||||||
}
|
}
|
||||||
|
if ((user != null && !user.equals(queue.getUser()) || (user == null && queue.getUser() != null))) {
|
||||||
|
changed = true;
|
||||||
|
queue.setUser(SimpleString.toSimpleString(user));
|
||||||
|
}
|
||||||
|
|
||||||
if (changed) {
|
if (changed) {
|
||||||
final long txID = storageManager.generateID();
|
final long txID = storageManager.generateID();
|
||||||
|
|
|
@ -445,6 +445,13 @@ public interface ActiveMQServer extends ServiceComponent {
|
||||||
Boolean purgeOnNoConsumers,
|
Boolean purgeOnNoConsumers,
|
||||||
Boolean exclusive) throws Exception;
|
Boolean exclusive) throws Exception;
|
||||||
|
|
||||||
|
Queue updateQueue(String name,
|
||||||
|
RoutingType routingType,
|
||||||
|
Integer maxConsumers,
|
||||||
|
Boolean purgeOnNoConsumers,
|
||||||
|
Boolean exclusive,
|
||||||
|
String user) throws Exception;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will
|
* add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will
|
||||||
* replace any factories with the same protocol
|
* replace any factories with the same protocol
|
||||||
|
|
|
@ -352,10 +352,15 @@ public interface Queue extends Bindable,CriticalComponent {
|
||||||
float getRate();
|
float getRate();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the user who created this queue
|
* @return the user associated with this queue
|
||||||
*/
|
*/
|
||||||
SimpleString getUser();
|
SimpleString getUser();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param user the user associated with this queue
|
||||||
|
*/
|
||||||
|
void setUser(SimpleString user);
|
||||||
|
|
||||||
/** This is to perform a check on the counter again */
|
/** This is to perform a check on the counter again */
|
||||||
void recheckRefCount(OperationContext context);
|
void recheckRefCount(OperationContext context);
|
||||||
|
|
||||||
|
|
|
@ -2546,7 +2546,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
int maxConsumer = (config.isMaxConsumerConfigured()) ? maxConsumerQueueConfig : maxConsumerAddressSetting;
|
int maxConsumer = (config.isMaxConsumerConfigured()) ? maxConsumerQueueConfig : maxConsumerAddressSetting;
|
||||||
if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) {
|
if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) {
|
||||||
updateQueue(config.getName(), config.getRoutingType(), maxConsumer, config.getPurgeOnNoConsumers(),
|
updateQueue(config.getName(), config.getRoutingType(), maxConsumer, config.getPurgeOnNoConsumers(),
|
||||||
config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive());
|
config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive(),
|
||||||
|
config.getUser());
|
||||||
} else {
|
} else {
|
||||||
// if the address::queue doesn't exist then create it
|
// if the address::queue doesn't exist then create it
|
||||||
try {
|
try {
|
||||||
|
@ -2943,6 +2944,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
@Override
|
@Override
|
||||||
public Queue updateQueue(String name,
|
public Queue updateQueue(String name,
|
||||||
RoutingType routingType,
|
RoutingType routingType,
|
||||||
|
@ -2951,13 +2953,24 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, null);
|
return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
@Override
|
@Override
|
||||||
public Queue updateQueue(String name,
|
public Queue updateQueue(String name,
|
||||||
RoutingType routingType,
|
RoutingType routingType,
|
||||||
Integer maxConsumers,
|
Integer maxConsumers,
|
||||||
Boolean purgeOnNoConsumers,
|
Boolean purgeOnNoConsumers,
|
||||||
Boolean exclusive) throws Exception {
|
Boolean exclusive) throws Exception {
|
||||||
final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, maxConsumers, purgeOnNoConsumers, exclusive);
|
return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Queue updateQueue(String name,
|
||||||
|
RoutingType routingType,
|
||||||
|
Integer maxConsumers,
|
||||||
|
Boolean purgeOnNoConsumers,
|
||||||
|
Boolean exclusive,
|
||||||
|
String user) throws Exception {
|
||||||
|
final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, maxConsumers, purgeOnNoConsumers, exclusive, user);
|
||||||
if (queueBinding != null) {
|
if (queueBinding != null) {
|
||||||
final Queue queue = queueBinding.getQueue();
|
final Queue queue = queueBinding.getQueue();
|
||||||
return queue;
|
return queue;
|
||||||
|
|
|
@ -136,7 +136,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
private final SimpleString name;
|
private final SimpleString name;
|
||||||
|
|
||||||
private final SimpleString user;
|
private SimpleString user;
|
||||||
|
|
||||||
private volatile Filter filter;
|
private volatile Filter filter;
|
||||||
|
|
||||||
|
@ -491,6 +491,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
return user;
|
return user;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setUser(SimpleString user) {
|
||||||
|
this.user = user;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isExclusive() {
|
public boolean isExclusive() {
|
||||||
return exclusive;
|
return exclusive;
|
||||||
|
|
|
@ -1393,6 +1393,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
||||||
public SimpleString getUser() {
|
public SimpleString getUser() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setUser(SimpleString user) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isLastValue() {
|
public boolean isLastValue() {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -49,7 +49,11 @@ public class UpdateQueueTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
SimpleString ADDRESS = SimpleString.toSimpleString("queue.0");
|
SimpleString ADDRESS = SimpleString.toSimpleString("queue.0");
|
||||||
|
|
||||||
long originalID = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, null, true, false).getID();
|
Queue queue = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, null, true, false);
|
||||||
|
|
||||||
|
long originalID = queue.getID();
|
||||||
|
|
||||||
|
Assert.assertNull(queue.getUser());
|
||||||
|
|
||||||
Connection conn = factory.createConnection();
|
Connection conn = factory.createConnection();
|
||||||
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
@ -59,7 +63,7 @@ public class UpdateQueueTest extends ActiveMQTestBase {
|
||||||
prod.send(session.createTextMessage("message " + i));
|
prod.send(session.createTextMessage("message " + i));
|
||||||
}
|
}
|
||||||
|
|
||||||
server.updateQueue(ADDRESS.toString(), RoutingType.ANYCAST, 1, false, false);
|
server.updateQueue(ADDRESS.toString(), RoutingType.ANYCAST, 1, false, false, "newUser");
|
||||||
|
|
||||||
conn.close();
|
conn.close();
|
||||||
factory.close();
|
factory.close();
|
||||||
|
@ -69,10 +73,12 @@ public class UpdateQueueTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
validateBindingRecords(server, JournalRecordIds.QUEUE_BINDING_RECORD, 2);
|
validateBindingRecords(server, JournalRecordIds.QUEUE_BINDING_RECORD, 2);
|
||||||
|
|
||||||
Queue queue = server.locateQueue(ADDRESS);
|
queue = server.locateQueue(ADDRESS);
|
||||||
|
|
||||||
Assert.assertNotNull("queue not found", queue);
|
Assert.assertNotNull("queue not found", queue);
|
||||||
|
|
||||||
|
Assert.assertEquals("newUser", queue.getUser().toString());
|
||||||
|
|
||||||
factory = new ActiveMQConnectionFactory();
|
factory = new ActiveMQConnectionFactory();
|
||||||
|
|
||||||
conn = factory.createConnection();
|
conn = factory.createConnection();
|
||||||
|
|
|
@ -144,6 +144,17 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
|
||||||
return (String) proxy.invokeOperation("updateQueue", name, routingType, maxConsumers, purgeOnNoConsumers, exclusive);
|
return (String) proxy.invokeOperation("updateQueue", name, routingType, maxConsumers, purgeOnNoConsumers, exclusive);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
|
||||||
|
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
|
||||||
|
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
|
||||||
|
@Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers,
|
||||||
|
@Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive,
|
||||||
|
@Parameter(name = "user", desc = "The user associated with this queue") String user)
|
||||||
|
throws Exception {
|
||||||
|
return (String) proxy.invokeOperation("updateQueue", name, routingType, maxConsumers, purgeOnNoConsumers, exclusive, user);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception {
|
public void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception {
|
||||||
proxy.invokeOperation("deleteAddress", name);
|
proxy.invokeOperation("deleteAddress", name);
|
||||||
|
|
|
@ -738,6 +738,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setUser(SimpleString user) {
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getDeliveringSize() {
|
public long getDeliveringSize() {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -48,7 +48,8 @@ public class FakePostOffice implements PostOffice {
|
||||||
RoutingType routingType,
|
RoutingType routingType,
|
||||||
Integer maxConsumers,
|
Integer maxConsumers,
|
||||||
Boolean purgeOnNoConsumers,
|
Boolean purgeOnNoConsumers,
|
||||||
Boolean exclusive) throws Exception {
|
Boolean exclusive,
|
||||||
|
String user) throws Exception {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue