This closes #1736 ARTEMIS-1569 - Queue - User Enhancement

This commit is contained in:
Michael Pearce 2017-12-21 15:38:18 +00:00
commit 30ae2db2b0
9 changed files with 82 additions and 2 deletions

View File

@ -56,6 +56,12 @@ public interface QueueControl {
@Attribute(desc = "whether this queue is durable") @Attribute(desc = "whether this queue is durable")
boolean isDurable(); boolean isDurable();
/**
* Returns the user that is associated with creating the queue.
*/
@Attribute(desc = "the user that created the queue")
String getUser();
/** /**
* The routing type of this queue. * The routing type of this queue.
*/ */

View File

@ -33,6 +33,8 @@ public class CoreQueueConfiguration implements Serializable {
private boolean durable = true; private boolean durable = true;
private String user = null;
private Integer maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(); private Integer maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
private Boolean purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(); private Boolean purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();
@ -58,6 +60,10 @@ public class CoreQueueConfiguration implements Serializable {
return durable; return durable;
} }
public String getUser() {
return user;
}
/** /**
* @param address the address to set * @param address the address to set
*/ */
@ -106,6 +112,14 @@ public class CoreQueueConfiguration implements Serializable {
return this; return this;
} }
/**
* @param user the use you want to associate with creating the queue
*/
public CoreQueueConfiguration setUser(String user) {
this.user = user;
return this;
}
public boolean getPurgeOnNoConsumers() { public boolean getPurgeOnNoConsumers() {
return purgeOnNoConsumers; return purgeOnNoConsumers;
} }

View File

@ -1076,6 +1076,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
boolean durable = true; boolean durable = true;
int maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(); int maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
boolean purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(); boolean purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();
String user = null;
NamedNodeMap attributes = node.getAttributes(); NamedNodeMap attributes = node.getAttributes();
for (int i = 0; i < attributes.getLength(); i++) { for (int i = 0; i < attributes.getLength(); i++) {
@ -1098,10 +1099,12 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
filterString = getAttributeValue(child, "string"); filterString = getAttributeValue(child, "string");
} else if (child.getNodeName().equals("durable")) { } else if (child.getNodeName().equals("durable")) {
durable = XMLUtil.parseBoolean(child); durable = XMLUtil.parseBoolean(child);
} else if (child.getNodeName().equals("user")) {
user = getTrimmedTextContent(child);
} }
} }
return new CoreQueueConfiguration().setAddress(address).setName(name).setFilterString(filterString).setDurable(durable).setMaxConsumers(maxConsumers).setPurgeOnNoConsumers(purgeOnNoConsumers); return new CoreQueueConfiguration().setAddress(address).setName(name).setFilterString(filterString).setDurable(durable).setMaxConsumers(maxConsumers).setPurgeOnNoConsumers(purgeOnNoConsumers).setUser(user);
} }
protected CoreAddressConfiguration parseAddressConfiguration(final Node node) { protected CoreAddressConfiguration parseAddressConfiguration(final Node node) {

View File

@ -175,6 +175,20 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
} }
} }
@Override
public String getUser() {
checkStarted();
clearIO();
try {
SimpleString user = queue.getUser();
return user == null ? null : user.toString();
} finally {
blockOnIO();
}
}
@Override @Override
public String getRoutingType() { public String getRoutingType() {
checkStarted(); checkStarted();

View File

@ -2508,7 +2508,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// if the address::queue doesn't exist then create it // if the address::queue doesn't exist then create it
try { try {
createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(),
queueName, SimpleString.toSimpleString(config.getFilterString()),null, queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()),
config.isDurable(),false,false,false,false,config.getMaxConsumers(),config.getPurgeOnNoConsumers(),true); config.isDurable(),false,false,false,false,config.getMaxConsumers(),config.getPurgeOnNoConsumers(),true);
} catch (ActiveMQQueueExistsException e) { } catch (ActiveMQQueueExistsException e) {
// the queue may exist on a *different* address // the queue may exist on a *different* address

View File

@ -477,6 +477,13 @@
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="user" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
user to associate for creating the queue
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element ref="filter" maxOccurs="1" minOccurs="0"/> <xsd:element ref="filter" maxOccurs="1" minOccurs="0"/>
<xsd:element name="durable" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0"> <xsd:element name="durable" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation> <xsd:annotation>
@ -2987,6 +2994,7 @@
<xsd:all> <xsd:all>
<xsd:element ref="filter" maxOccurs="1" minOccurs="0"/> <xsd:element ref="filter" maxOccurs="1" minOccurs="0"/>
<xsd:element name="durable" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0" /> <xsd:element name="durable" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0" />
<xsd:element name="user" type="xsd:string" maxOccurs="1" minOccurs="0" />
</xsd:all> </xsd:all>
<xsd:attribute name="name" type="xsd:string" use="required"/> <xsd:attribute name="name" type="xsd:string" use="required"/>
<xsd:attribute name="max-consumers" type="xsd:integer" use="optional"/> <xsd:attribute name="max-consumers" type="xsd:integer" use="optional"/>

View File

@ -459,6 +459,13 @@
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="user" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
user to associate for creating the queue
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element ref="filter" maxOccurs="1" minOccurs="0"/> <xsd:element ref="filter" maxOccurs="1" minOccurs="0"/>
<xsd:element name="durable" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0"> <xsd:element name="durable" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation> <xsd:annotation>
@ -2678,6 +2685,7 @@
<xsd:all> <xsd:all>
<xsd:element ref="filter" maxOccurs="1" minOccurs="0"/> <xsd:element ref="filter" maxOccurs="1" minOccurs="0"/>
<xsd:element name="durable" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0" /> <xsd:element name="durable" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0" />
<xsd:element name="user" type="xsd:string" maxOccurs="1" minOccurs="0" />
</xsd:all> </xsd:all>
<xsd:attribute name="name" type="xsd:string" use="required"/> <xsd:attribute name="name" type="xsd:string" use="required"/>
<xsd:attribute name="max-consumers" type="xsd:integer" use="optional"/> <xsd:attribute name="max-consumers" type="xsd:integer" use="optional"/>

View File

@ -71,6 +71,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
return (String) proxy.retrieveAttributeValue("address"); return (String) proxy.retrieveAttributeValue("address");
} }
@Override
public String getUser() {
return (String) proxy.retrieveAttributeValue("user");
}
@Override @Override
public int getConsumerCount() { public int getConsumerCount() {
return (Integer) proxy.retrieveAttributeValue("consumerCount", Integer.class); return (Integer) proxy.retrieveAttributeValue("consumerCount", Integer.class);

View File

@ -60,6 +60,28 @@ public class QueueConfigRestartTest extends ActiveMQTestBase {
QueueBinding queueBinding2 = (QueueBinding)server.getPostOffice().getBinding(queue); QueueBinding queueBinding2 = (QueueBinding)server.getPostOffice().getBinding(queue);
Assert.assertTrue(queueBinding2.getQueue().isPurgeOnNoConsumers()); Assert.assertTrue(queueBinding2.getQueue().isPurgeOnNoConsumers());
} }
@Test
public void testQueueConfigUserAndRestart() throws Exception {
ActiveMQServer server = createServer(true);
server.start();
SimpleString address = new SimpleString("test.address");
SimpleString queue = new SimpleString("test.queue");
server.createQueue(address, RoutingType.MULTICAST, queue, null, SimpleString.toSimpleString("bob"), true, false, false, 10, true, true);
QueueBinding queueBinding1 = (QueueBinding)server.getPostOffice().getBinding(queue);
Assert.assertEquals(SimpleString.toSimpleString("bob"), queueBinding1.getQueue().getUser());
server.stop();
server.start();
QueueBinding queueBinding2 = (QueueBinding)server.getPostOffice().getBinding(queue);
Assert.assertTrue(queueBinding2.getQueue().isPurgeOnNoConsumers());
}
// Package protected --------------------------------------------- // Package protected ---------------------------------------------
// Protected ----------------------------------------------------- // Protected -----------------------------------------------------