ARTEMIS-881 Support new address-settings for auto-create queue
This commit is contained in:
parent
af27714026
commit
fe52ca6d75
|
@ -62,16 +62,18 @@ public interface ClientSession extends XAResource, AutoCloseable {
|
|||
List<SimpleString> getQueueNames();
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if auto-creation for this address is enabled and if the address queried is for a JMS
|
||||
* queue, <code>false</code> else.
|
||||
* Returns <code>true</code> if auto-queue-creation for this address is enabled, <code>false</code> else.
|
||||
*/
|
||||
boolean isAutoCreateJmsQueues();
|
||||
boolean isAutoCreateQueues();
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if auto-creation for this address is enabled and if the address queried is for a JMS
|
||||
* topic, <code>false</code> else.
|
||||
* Returns <code>true</code> if auto-address-creation for this address is enabled, <code>false</code> else.
|
||||
*/
|
||||
boolean isAutoCreateJmsTopics();
|
||||
boolean isAutoCreateAddresses();
|
||||
|
||||
boolean isDefaultDeleteOnNoConsumers();
|
||||
|
||||
int getDefaultMaxConsumers();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -510,8 +512,23 @@ public interface ClientSession extends XAResource, AutoCloseable {
|
|||
* @throws ActiveMQException in an exception occurs while creating the queue
|
||||
*/
|
||||
void createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
|
||||
boolean durable,
|
||||
boolean autoCreated) throws ActiveMQException;
|
||||
boolean durable, boolean autoCreated) throws ActiveMQException;
|
||||
|
||||
/**
|
||||
* Creates a <em>non-temporary</em> queue.
|
||||
*
|
||||
* @param address the queue will be bound to this address
|
||||
* @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
|
||||
* @param queueName the name of the queue
|
||||
* @param filter only messages which match this filter will be put in the queue
|
||||
* @param durable whether the queue is durable or not
|
||||
* @param autoCreated whether to mark this queue as autoCreated or not
|
||||
* @param maxConsumers how many concurrent consumers will be allowed on this queue
|
||||
* @param deleteOnNoConsumers whether to delete the queue when the last consumer disconnects
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
void createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
|
||||
boolean durable, boolean autoCreated, int maxConsumers, boolean deleteOnNoConsumers) throws ActiveMQException;
|
||||
|
||||
/**
|
||||
* Creates a <em>non-temporary</em>queue.
|
||||
|
@ -526,6 +543,22 @@ public interface ClientSession extends XAResource, AutoCloseable {
|
|||
*/
|
||||
void createQueue(String address, RoutingType routingType, String queueName, String filter, boolean durable, boolean autoCreated) throws ActiveMQException;
|
||||
|
||||
/**
|
||||
* Creates a <em>non-temporary</em>queue.
|
||||
*
|
||||
* @param address the queue will be bound to this address
|
||||
* @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
|
||||
* @param queueName the name of the queue
|
||||
* @param filter only messages which match this filter will be put in the queue
|
||||
* @param durable whether the queue is durable or not
|
||||
* @param autoCreated whether to mark this queue as autoCreated or not
|
||||
* @param maxConsumers how many concurrent consumers will be allowed on this queue
|
||||
* @param deleteOnNoConsumers whether to delete the queue when the last consumer disconnects
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
void createQueue(String address, RoutingType routingType, String queueName, String filter, boolean durable, boolean autoCreated,
|
||||
int maxConsumers, boolean deleteOnNoConsumers) throws ActiveMQException;
|
||||
|
||||
/**
|
||||
* Creates a <em>temporary</em> queue.
|
||||
*
|
||||
|
|
|
@ -28,18 +28,26 @@ public class AddressQueryImpl implements ClientSession.AddressQuery {
|
|||
|
||||
private final ArrayList<SimpleString> queueNames;
|
||||
|
||||
private final boolean autoCreateJmsQueues;
|
||||
private final boolean autoCreateQueues;
|
||||
|
||||
private final boolean autoCreateJmsTopics;
|
||||
private final boolean autoCreateAddresses;
|
||||
|
||||
private final boolean defaultDeleteOnNoConsumers;
|
||||
|
||||
private final int defaultMaxConsumers;
|
||||
|
||||
public AddressQueryImpl(final boolean exists,
|
||||
final List<SimpleString> queueNames,
|
||||
final boolean autoCreateJmsQueues,
|
||||
final boolean autoCreateJmsTopics) {
|
||||
final boolean autoCreateQueues,
|
||||
final boolean autoCreateAddresses,
|
||||
final boolean defaultDeleteOnNoConsumers,
|
||||
final int defaultMaxConsumers) {
|
||||
this.exists = exists;
|
||||
this.queueNames = new ArrayList<>(queueNames);
|
||||
this.autoCreateJmsQueues = autoCreateJmsQueues;
|
||||
this.autoCreateJmsTopics = autoCreateJmsTopics;
|
||||
this.autoCreateQueues = autoCreateQueues;
|
||||
this.autoCreateAddresses = autoCreateAddresses;
|
||||
this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers;
|
||||
this.defaultMaxConsumers = defaultMaxConsumers;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -53,12 +61,22 @@ public class AddressQueryImpl implements ClientSession.AddressQuery {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean isAutoCreateJmsQueues() {
|
||||
return autoCreateJmsQueues;
|
||||
public boolean isAutoCreateQueues() {
|
||||
return autoCreateQueues;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAutoCreateJmsTopics() {
|
||||
return autoCreateJmsTopics;
|
||||
public boolean isAutoCreateAddresses() {
|
||||
return autoCreateAddresses;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDefaultDeleteOnNoConsumers() {
|
||||
return defaultDeleteOnNoConsumers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getDefaultMaxConsumers() {
|
||||
return defaultMaxConsumers;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -239,14 +239,14 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
|
||||
@Override
|
||||
public void createQueue(final SimpleString address, final SimpleString queueName) throws ActiveMQException {
|
||||
createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName);
|
||||
createQueue(address, queueName, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createQueue(final SimpleString address,
|
||||
final SimpleString queueName,
|
||||
final boolean durable) throws ActiveMQException {
|
||||
createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, durable);
|
||||
createQueue(address, queueName, null, durable, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -260,7 +260,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
public void createSharedQueue(SimpleString address,
|
||||
SimpleString queueName,
|
||||
boolean durable) throws ActiveMQException {
|
||||
createSharedQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, null, durable);
|
||||
createSharedQueue(address, queueName, null, durable);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -295,8 +295,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
final SimpleString queueName,
|
||||
final SimpleString filterString,
|
||||
final boolean durable) throws ActiveMQException {
|
||||
createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filterString,
|
||||
durable);
|
||||
createQueue(address, queueName, filterString, durable, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -316,9 +315,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
final SimpleString filterString,
|
||||
final boolean durable,
|
||||
final boolean autoCreated) throws ActiveMQException {
|
||||
createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filterString,
|
||||
durable,
|
||||
autoCreated);
|
||||
createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filterString, durable, autoCreated);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -332,7 +329,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
|
||||
@Override
|
||||
public void createTemporaryQueue(final SimpleString address, final SimpleString queueName) throws ActiveMQException {
|
||||
createTemporaryQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName);
|
||||
createTemporaryQueue(address, queueName, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -351,7 +348,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
public void createTemporaryQueue(final String address,
|
||||
final String queueName,
|
||||
final String filter) throws ActiveMQException {
|
||||
createTemporaryQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filter);
|
||||
createTemporaryQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter));
|
||||
}
|
||||
|
||||
|
||||
|
@ -377,8 +374,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
|
||||
@Override
|
||||
public void createQueue(final String address, final RoutingType routingType, final String queueName, final String filterString,
|
||||
final boolean durable,
|
||||
final boolean autoCreated) throws ActiveMQException {
|
||||
final boolean durable, final boolean autoCreated) throws ActiveMQException {
|
||||
createQueue(SimpleString.toSimpleString(address),
|
||||
SimpleString.toSimpleString(queueName),
|
||||
SimpleString.toSimpleString(filterString),
|
||||
|
@ -386,18 +382,38 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
autoCreated);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void createQueue(final SimpleString address, final RoutingType routingType, final SimpleString queueName, final SimpleString filterString,
|
||||
final boolean durable, final boolean autoCreated, final int maxConsumers, final boolean deleteOnNoConsumers) throws ActiveMQException {
|
||||
internalCreateQueue(address,
|
||||
queueName, routingType,
|
||||
filterString,
|
||||
durable,
|
||||
false,
|
||||
maxConsumers,
|
||||
deleteOnNoConsumers,
|
||||
autoCreated);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createQueue(final String address, final RoutingType routingType, final String queueName, final String filterString,
|
||||
final boolean durable, final boolean autoCreated, final int maxConsumers, final boolean deleteOnNoConsumers) throws ActiveMQException {
|
||||
createQueue(SimpleString.toSimpleString(address),
|
||||
routingType,
|
||||
SimpleString.toSimpleString(queueName),
|
||||
SimpleString.toSimpleString(filterString),
|
||||
durable,
|
||||
autoCreated,
|
||||
maxConsumers,
|
||||
deleteOnNoConsumers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createTemporaryQueue(final SimpleString address,
|
||||
final RoutingType routingType,
|
||||
final SimpleString queueName) throws ActiveMQException {
|
||||
internalCreateQueue(address,
|
||||
queueName, routingType,
|
||||
null,
|
||||
false,
|
||||
true,
|
||||
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
|
||||
ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
|
||||
false);
|
||||
createTemporaryQueue(address, routingType, queueName, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBin
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
|
||||
|
@ -306,18 +307,22 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
|
||||
@Override
|
||||
public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException {
|
||||
if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3, getServerVersion())) {
|
||||
if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4, getServerVersion())) {
|
||||
Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V4);
|
||||
SessionBindingQueryResponseMessage_V4 response = (SessionBindingQueryResponseMessage_V4) packet;
|
||||
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), response.isAutoCreateAddresses(), response.isDefaultDeleteOnNoConsumers(), response.getDefaultMaxConsumers());
|
||||
} else if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3, getServerVersion())) {
|
||||
Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V3);
|
||||
SessionBindingQueryResponseMessage_V3 response = (SessionBindingQueryResponseMessage_V3) packet;
|
||||
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateJmsQueues(), response.isAutoCreateJmsTopics());
|
||||
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), response.isAutoCreateAddresses(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers());
|
||||
} else if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2, getServerVersion())) {
|
||||
Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V2);
|
||||
SessionBindingQueryResponseMessage_V2 response = (SessionBindingQueryResponseMessage_V2) packet;
|
||||
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateJmsQueues(), false);
|
||||
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), false, ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers());
|
||||
} else {
|
||||
Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP);
|
||||
SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) packet;
|
||||
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false);
|
||||
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false, ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -174,6 +174,8 @@ public final class ChannelImpl implements Channel {
|
|||
return version >= 127;
|
||||
case PacketImpl.SESS_QUEUEQUERY_RESP_V3:
|
||||
return version >= 129;
|
||||
case PacketImpl.SESS_BINDINGQUERY_RESP_V4:
|
||||
return version >= 129;
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBin
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
|
||||
|
@ -114,6 +115,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
|
|||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V2;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V3;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V4;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_COMMIT;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE;
|
||||
|
@ -287,6 +289,10 @@ public abstract class PacketDecoder implements Serializable {
|
|||
packet = new SessionBindingQueryResponseMessage_V3();
|
||||
break;
|
||||
}
|
||||
case SESS_BINDINGQUERY_RESP_V4: {
|
||||
packet = new SessionBindingQueryResponseMessage_V4();
|
||||
break;
|
||||
}
|
||||
case SESS_XA_START: {
|
||||
packet = new SessionXAStartMessage();
|
||||
break;
|
||||
|
|
|
@ -257,6 +257,8 @@ public class PacketImpl implements Packet {
|
|||
|
||||
public static final byte SESS_QUEUEQUERY_RESP_V3 = -14;
|
||||
|
||||
public static final byte SESS_BINDINGQUERY_RESP_V4 = -15;
|
||||
|
||||
// Static --------------------------------------------------------
|
||||
|
||||
public PacketImpl(final byte type) {
|
||||
|
|
|
@ -92,9 +92,15 @@ public class SessionBindingQueryResponseMessage extends PacketImpl {
|
|||
@Override
|
||||
public String toString() {
|
||||
StringBuffer buff = new StringBuffer(getParentString());
|
||||
buff.append("]");
|
||||
return buff.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getParentString() {
|
||||
StringBuffer buff = new StringBuffer(super.getParentString());
|
||||
buff.append(", exists=" + exists);
|
||||
buff.append(", queueNames=" + queueNames);
|
||||
buff.append("]");
|
||||
return buff.toString();
|
||||
}
|
||||
|
||||
|
|
|
@ -23,18 +23,18 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
|||
|
||||
public class SessionBindingQueryResponseMessage_V2 extends SessionBindingQueryResponseMessage {
|
||||
|
||||
protected boolean autoCreateJmsQueues;
|
||||
protected boolean autoCreateQueues;
|
||||
|
||||
public SessionBindingQueryResponseMessage_V2(final boolean exists,
|
||||
final List<SimpleString> queueNames,
|
||||
final boolean autoCreateJmsQueues) {
|
||||
final boolean autoCreateQueues) {
|
||||
super(SESS_BINDINGQUERY_RESP_V2);
|
||||
|
||||
this.exists = exists;
|
||||
|
||||
this.queueNames = queueNames;
|
||||
|
||||
this.autoCreateJmsQueues = autoCreateJmsQueues;
|
||||
this.autoCreateQueues = autoCreateQueues;
|
||||
}
|
||||
|
||||
public SessionBindingQueryResponseMessage_V2() {
|
||||
|
@ -45,40 +45,44 @@ public class SessionBindingQueryResponseMessage_V2 extends SessionBindingQueryRe
|
|||
super(v);
|
||||
}
|
||||
|
||||
public boolean isAutoCreateJmsQueues() {
|
||||
return autoCreateJmsQueues;
|
||||
public boolean isAutoCreateQueues() {
|
||||
return autoCreateQueues;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encodeRest(final ActiveMQBuffer buffer) {
|
||||
super.encodeRest(buffer);
|
||||
buffer.writeBoolean(autoCreateJmsQueues);
|
||||
buffer.writeBoolean(autoCreateQueues);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decodeRest(final ActiveMQBuffer buffer) {
|
||||
super.decodeRest(buffer);
|
||||
autoCreateJmsQueues = buffer.readBoolean();
|
||||
autoCreateQueues = buffer.readBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = super.hashCode();
|
||||
result = prime * result + (autoCreateJmsQueues ? 1231 : 1237);
|
||||
result = prime * result + (autoCreateQueues ? 1231 : 1237);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuffer buff = new StringBuffer(getParentString());
|
||||
buff.append(", exists=" + exists);
|
||||
buff.append(", queueNames=" + queueNames);
|
||||
buff.append(", autoCreateJmsQueues=" + autoCreateJmsQueues);
|
||||
buff.append("]");
|
||||
return buff.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getParentString() {
|
||||
StringBuffer buff = new StringBuffer(super.getParentString());
|
||||
buff.append(", autoCreateQueues=" + autoCreateQueues);
|
||||
return buff.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
|
@ -88,7 +92,7 @@ public class SessionBindingQueryResponseMessage_V2 extends SessionBindingQueryRe
|
|||
if (!(obj instanceof SessionBindingQueryResponseMessage_V2))
|
||||
return false;
|
||||
SessionBindingQueryResponseMessage_V2 other = (SessionBindingQueryResponseMessage_V2) obj;
|
||||
if (autoCreateJmsQueues != other.autoCreateJmsQueues)
|
||||
if (autoCreateQueues != other.autoCreateQueues)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -23,62 +23,69 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
|||
|
||||
public class SessionBindingQueryResponseMessage_V3 extends SessionBindingQueryResponseMessage_V2 {
|
||||
|
||||
private boolean autoCreateJmsTopics;
|
||||
protected boolean autoCreateAddresses;
|
||||
|
||||
public SessionBindingQueryResponseMessage_V3(final boolean exists,
|
||||
final List<SimpleString> queueNames,
|
||||
final boolean autoCreateJmsQueues,
|
||||
final boolean autoCreateJmsTopics) {
|
||||
final boolean autoCreateQueues,
|
||||
final boolean autoCreateAddresses) {
|
||||
super(SESS_BINDINGQUERY_RESP_V3);
|
||||
|
||||
this.exists = exists;
|
||||
|
||||
this.queueNames = queueNames;
|
||||
|
||||
this.autoCreateJmsQueues = autoCreateJmsQueues;
|
||||
this.autoCreateQueues = autoCreateQueues;
|
||||
|
||||
this.autoCreateJmsTopics = autoCreateJmsTopics;
|
||||
this.autoCreateAddresses = autoCreateAddresses;
|
||||
}
|
||||
|
||||
public SessionBindingQueryResponseMessage_V3() {
|
||||
super(SESS_BINDINGQUERY_RESP_V3);
|
||||
}
|
||||
|
||||
public boolean isAutoCreateJmsTopics() {
|
||||
return autoCreateJmsTopics;
|
||||
public SessionBindingQueryResponseMessage_V3(byte v) {
|
||||
super(v);
|
||||
}
|
||||
|
||||
public boolean isAutoCreateAddresses() {
|
||||
return autoCreateAddresses;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encodeRest(final ActiveMQBuffer buffer) {
|
||||
super.encodeRest(buffer);
|
||||
buffer.writeBoolean(autoCreateJmsTopics);
|
||||
buffer.writeBoolean(autoCreateAddresses);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decodeRest(final ActiveMQBuffer buffer) {
|
||||
super.decodeRest(buffer);
|
||||
autoCreateJmsTopics = buffer.readBoolean();
|
||||
autoCreateAddresses = buffer.readBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = super.hashCode();
|
||||
result = prime * result + (autoCreateJmsTopics ? 1231 : 1237);
|
||||
result = prime * result + (autoCreateAddresses ? 1231 : 1237);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuffer buff = new StringBuffer(getParentString());
|
||||
buff.append(", exists=" + exists);
|
||||
buff.append(", queueNames=" + queueNames);
|
||||
buff.append(", autoCreateJmsQueues=" + autoCreateJmsQueues);
|
||||
buff.append(", autoCreateJmsTopics=" + autoCreateJmsTopics);
|
||||
buff.append("]");
|
||||
return buff.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getParentString() {
|
||||
StringBuffer buff = new StringBuffer(super.getParentString());
|
||||
buff.append(", autoCreateAddresses=" + autoCreateAddresses);
|
||||
return buff.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
|
@ -88,7 +95,7 @@ public class SessionBindingQueryResponseMessage_V3 extends SessionBindingQueryRe
|
|||
if (!(obj instanceof SessionBindingQueryResponseMessage_V3))
|
||||
return false;
|
||||
SessionBindingQueryResponseMessage_V3 other = (SessionBindingQueryResponseMessage_V3) obj;
|
||||
if (autoCreateJmsTopics != other.autoCreateJmsTopics)
|
||||
if (autoCreateAddresses != other.autoCreateAddresses)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
||||
public class SessionBindingQueryResponseMessage_V4 extends SessionBindingQueryResponseMessage_V3 {
|
||||
|
||||
private boolean defaultDeleteOnNoConsumers;
|
||||
|
||||
private int defaultMaxConsumers;
|
||||
|
||||
public SessionBindingQueryResponseMessage_V4(final boolean exists,
|
||||
final List<SimpleString> queueNames,
|
||||
final boolean autoCreateQueues,
|
||||
final boolean autoCreateAddresses,
|
||||
final boolean defaultDeleteOnNoConsumers,
|
||||
final int defaultMaxConsumers) {
|
||||
super(SESS_BINDINGQUERY_RESP_V4);
|
||||
|
||||
this.exists = exists;
|
||||
|
||||
this.queueNames = queueNames;
|
||||
|
||||
this.autoCreateQueues = autoCreateQueues;
|
||||
|
||||
this.autoCreateAddresses = autoCreateAddresses;
|
||||
|
||||
this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers;
|
||||
|
||||
this.defaultMaxConsumers = defaultMaxConsumers;
|
||||
}
|
||||
|
||||
public SessionBindingQueryResponseMessage_V4() {
|
||||
super(SESS_BINDINGQUERY_RESP_V4);
|
||||
}
|
||||
|
||||
public boolean isDefaultDeleteOnNoConsumers() {
|
||||
return defaultDeleteOnNoConsumers;
|
||||
}
|
||||
|
||||
public int getDefaultMaxConsumers() {
|
||||
return defaultMaxConsumers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encodeRest(final ActiveMQBuffer buffer) {
|
||||
super.encodeRest(buffer);
|
||||
buffer.writeBoolean(defaultDeleteOnNoConsumers);
|
||||
buffer.writeInt(defaultMaxConsumers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decodeRest(final ActiveMQBuffer buffer) {
|
||||
super.decodeRest(buffer);
|
||||
defaultDeleteOnNoConsumers = buffer.readBoolean();
|
||||
defaultMaxConsumers = buffer.readInt();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = super.hashCode();
|
||||
result = prime * result + (defaultDeleteOnNoConsumers ? 1231 : 1237);
|
||||
result = prime * result + defaultMaxConsumers;
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuffer buff = new StringBuffer(getParentString());
|
||||
buff.append("]");
|
||||
return buff.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getParentString() {
|
||||
StringBuffer buff = new StringBuffer(super.getParentString());
|
||||
buff.append(", defaultDeleteOnNoConsumers=" + defaultDeleteOnNoConsumers);
|
||||
buff.append(", defaultMaxConsumers=" + defaultMaxConsumers);
|
||||
return buff.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (!super.equals(obj))
|
||||
return false;
|
||||
if (!(obj instanceof SessionBindingQueryResponseMessage_V4))
|
||||
return false;
|
||||
SessionBindingQueryResponseMessage_V4 other = (SessionBindingQueryResponseMessage_V4) obj;
|
||||
if (defaultDeleteOnNoConsumers != other.defaultDeleteOnNoConsumers)
|
||||
return false;
|
||||
if (defaultMaxConsumers != other.defaultMaxConsumers)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -28,8 +28,10 @@ public class AddressQueryResult {
|
|||
private final boolean autoCreated;
|
||||
private final boolean exists;
|
||||
private final boolean autoCreateAddresses;
|
||||
private final boolean defaultDeleteOnNoConsumers;
|
||||
private final int defaultMaxConsumers;
|
||||
|
||||
public AddressQueryResult(SimpleString name, Set<RoutingType> routingTypes, long id, boolean autoCreated, boolean exists, boolean autoCreateAddresses) {
|
||||
public AddressQueryResult(SimpleString name, Set<RoutingType> routingTypes, long id, boolean autoCreated, boolean exists, boolean autoCreateAddresses, boolean defaultDeleteOnNoConsumers, int defaultMaxConsumers) {
|
||||
|
||||
this.name = name;
|
||||
this.routingTypes = routingTypes;
|
||||
|
@ -38,6 +40,8 @@ public class AddressQueryResult {
|
|||
this.autoCreated = autoCreated;
|
||||
this.exists = exists;
|
||||
this.autoCreateAddresses = autoCreateAddresses;
|
||||
this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers;
|
||||
this.defaultMaxConsumers = defaultMaxConsumers;
|
||||
}
|
||||
|
||||
public SimpleString getName() {
|
||||
|
@ -63,4 +67,12 @@ public class AddressQueryResult {
|
|||
public boolean isAutoCreateAddresses() {
|
||||
return autoCreateAddresses;
|
||||
}
|
||||
|
||||
public boolean isDefaultDeleteOnNoConsumers() {
|
||||
return defaultDeleteOnNoConsumers;
|
||||
}
|
||||
|
||||
public int getDefaultMaxConsumers() {
|
||||
return defaultMaxConsumers;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -406,17 +406,17 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
|
|||
ClientSession.AddressQuery query = clientSession.addressQuery(address);
|
||||
|
||||
if (!query.isExists()) {
|
||||
if (destination.isQueue() && query.isAutoCreateJmsQueues()) {
|
||||
if (destination.isQueue() && query.isAutoCreateQueues()) {
|
||||
clientSession.createAddress(address, RoutingType.ANYCAST, true);
|
||||
if (destination.isTemporary()) {
|
||||
// TODO is it right to use the address for the queue name here?
|
||||
clientSession.createTemporaryQueue(address, RoutingType.ANYCAST, address);
|
||||
} else {
|
||||
clientSession.createQueue(address, RoutingType.ANYCAST, address, null, true, true);
|
||||
clientSession.createQueue(address, RoutingType.ANYCAST, address, null, true, true, query.getDefaultMaxConsumers(), query.isDefaultDeleteOnNoConsumers());
|
||||
}
|
||||
} else if (!destination.isQueue() && query.isAutoCreateJmsTopics()) {
|
||||
} else if (!destination.isQueue() && query.isAutoCreateAddresses()) {
|
||||
clientSession.createAddress(address, RoutingType.MULTICAST, true);
|
||||
} else if ((destination.isQueue() && !query.isAutoCreateJmsQueues()) || (!destination.isQueue() && !query.isAutoCreateJmsTopics())) {
|
||||
} else if ((destination.isQueue() && !query.isAutoCreateQueues()) || (!destination.isQueue() && !query.isAutoCreateAddresses())) {
|
||||
throw new InvalidDestinationException("Destination " + address + " does not exist");
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -302,11 +302,11 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
|||
|
||||
if (!response.isExists()) {
|
||||
try {
|
||||
if (jbd.isQueue() && response.isAutoCreateJmsQueues()) {
|
||||
if (jbd.isQueue() && response.isAutoCreateQueues()) {
|
||||
// perhaps just relying on the broker to do it is simplest (i.e. deleteOnNoConsumers)
|
||||
session.createAddress(jbd.getSimpleAddress(), RoutingType.ANYCAST, true);
|
||||
session.createQueue(jbd.getSimpleAddress(), RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true);
|
||||
} else if (!jbd.isQueue() && response.isAutoCreateJmsTopics()) {
|
||||
session.createQueue(jbd.getSimpleAddress(), RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultDeleteOnNoConsumers());
|
||||
} else if (!jbd.isQueue() && response.isAutoCreateAddresses()) {
|
||||
session.createAddress(jbd.getSimpleAddress(), RoutingType.MULTICAST, true);
|
||||
} else {
|
||||
throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
|
||||
|
@ -571,7 +571,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
|||
|
||||
AddressQuery response = session.addressQuery(dest.getSimpleAddress());
|
||||
|
||||
if (!response.isExists() && !response.isAutoCreateJmsTopics()) {
|
||||
if (!response.isExists() && !response.isAutoCreateAddresses()) {
|
||||
throw ActiveMQJMSClientBundle.BUNDLE.destinationDoesNotExist(dest.getSimpleAddress());
|
||||
}
|
||||
|
||||
|
@ -651,9 +651,9 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
|||
* not a LOCAL binding for the address exists. If no LOCAL binding exists then it should be created here.
|
||||
*/
|
||||
if (!response.isExists() || !response.getQueueNames().contains(dest.getSimpleAddress())) {
|
||||
if (response.isAutoCreateJmsQueues()) {
|
||||
if (response.isAutoCreateQueues()) {
|
||||
try {
|
||||
session.createQueue(dest.getSimpleAddress(), RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true);
|
||||
session.createQueue(dest.getSimpleAddress(), RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultDeleteOnNoConsumers());
|
||||
} catch (ActiveMQQueueExistsException e) {
|
||||
// The queue was created by another client/admin between the query check and send create queue packet
|
||||
}
|
||||
|
@ -669,7 +669,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
|||
AddressQuery response = session.addressQuery(dest.getSimpleAddress());
|
||||
|
||||
if (!response.isExists()) {
|
||||
if (response.isAutoCreateJmsTopics()) {
|
||||
if (response.isAutoCreateAddresses()) {
|
||||
session.createAddress(dest.getSimpleAddress(), RoutingType.MULTICAST, true);
|
||||
} else {
|
||||
throw new InvalidDestinationException("Topic " + dest.getName() + " does not exist");
|
||||
|
@ -709,7 +709,8 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
|||
QueueQuery subResponse = session.queueQuery(queueName);
|
||||
|
||||
if (!subResponse.isExists()) {
|
||||
session.createQueue(dest.getSimpleAddress(), RoutingType.MULTICAST, queueName, coreFilterString, true);
|
||||
// durable subscription queues are not technically considered to be auto-created
|
||||
session.createQueue(dest.getSimpleAddress(), RoutingType.MULTICAST, queueName, coreFilterString, true, false, response.getDefaultMaxConsumers(), response.isDefaultDeleteOnNoConsumers());
|
||||
} else {
|
||||
// Already exists
|
||||
if (subResponse.getConsumerCount() > 0) {
|
||||
|
@ -718,12 +719,10 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
|||
|
||||
// From javax.jms.Session Javadoc (and also JMS 1.1 6.11.1):
|
||||
// A client can change an existing durable subscription by
|
||||
// creating a durable
|
||||
// TopicSubscriber with the same name and a new topic and/or
|
||||
// message selector.
|
||||
// Changing a durable subscriber is equivalent to
|
||||
// unsubscribing (deleting) the old
|
||||
// one and creating a new one.
|
||||
// creating a durable TopicSubscriber with the same name and
|
||||
// a new topic and/or message selector.
|
||||
// Changing a durable subscriber is equivalent to unsubscribing
|
||||
// (deleting) the old one and creating a new one.
|
||||
|
||||
SimpleString oldFilterString = subResponse.getFilterString();
|
||||
|
||||
|
@ -742,7 +741,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
|||
session.deleteQueue(queueName);
|
||||
|
||||
// Create the new one
|
||||
session.createQueue(dest.getSimpleAddress(), queueName, coreFilterString, true);
|
||||
session.createQueue(dest.getSimpleAddress(), RoutingType.MULTICAST, queueName, coreFilterString, true, false, response.getDefaultMaxConsumers(), response.isDefaultDeleteOnNoConsumers());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -803,8 +802,8 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
|||
try {
|
||||
AddressQuery response = session.addressQuery(new SimpleString(activeMQDestination.getAddress()));
|
||||
if (!response.isExists()) {
|
||||
if (response.isAutoCreateJmsQueues()) {
|
||||
session.createQueue(activeMQDestination.getSimpleAddress(), activeMQDestination.getSimpleAddress(), null, true, true);
|
||||
if (response.isAutoCreateQueues()) {
|
||||
session.createQueue(activeMQDestination.getSimpleAddress(), RoutingType.ANYCAST, activeMQDestination.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultDeleteOnNoConsumers());
|
||||
} else {
|
||||
throw new InvalidDestinationException("Destination " + activeMQDestination.getName() + " does not exist");
|
||||
}
|
||||
|
@ -1102,7 +1101,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
|||
|
||||
AddressQuery query = session.addressQuery(topic.getSimpleAddress());
|
||||
|
||||
if (!query.isExists() && !query.isAutoCreateJmsTopics()) {
|
||||
if (!query.isExists() && !query.isAutoCreateAddresses()) {
|
||||
return null;
|
||||
} else {
|
||||
return topic;
|
||||
|
|
|
@ -1618,9 +1618,9 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
|||
// @Override
|
||||
// public boolean create(SimpleString address) throws Exception {
|
||||
// AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
|
||||
// if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.isAutoCreateJmsQueues()) {
|
||||
// if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.isAutoCreateQueues()) {
|
||||
// return internalCreateJMSQueue(false, address.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), null, true, true);
|
||||
// } else if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateJmsTopics()) {
|
||||
// } else if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateAddresses()) {
|
||||
// return createTopic(false, address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()), true);
|
||||
// } else {
|
||||
// return false;
|
||||
|
@ -1667,7 +1667,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
|
|||
// * for that dummy subscription is created we don't want to call createTopic again. Therefore we make sure the
|
||||
// * queue name doesn't start with the topic prefix.
|
||||
// */
|
||||
// if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateJmsTopics() && !queueName.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX)) {
|
||||
// if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateAddresses() && !queueName.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX)) {
|
||||
// createTopic(false, address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()), true);
|
||||
// }
|
||||
// }
|
||||
|
|
|
@ -20,7 +20,6 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
|
@ -362,7 +361,7 @@ public class EmbeddedActiveMQResource extends ExternalResource {
|
|||
boolean temporary = false;
|
||||
Queue queue = null;
|
||||
try {
|
||||
queue = server.getActiveMQServer().createQueue(address, RoutingType.MULTICAST, name, filter, isUseDurableQueue(), temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), true);
|
||||
queue = server.getActiveMQServer().createQueue(address, RoutingType.MULTICAST, name, filter, isUseDurableQueue(), temporary, server.getActiveMQServer().getAddressSettingsRepository().getMatch(address.toString()).getDefaultMaxConsumers(), server.getActiveMQServer().getAddressSettingsRepository().getMatch(address.toString()).isDefaultDeleteOnNoConsumers(), true);
|
||||
} catch (Exception ex) {
|
||||
throw new EmbeddedActiveMQResourceException(String.format("Failed to create queue: queueName = %s, name = %s", address.toString(), name.toString()), ex);
|
||||
}
|
||||
|
|
|
@ -235,7 +235,7 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
|
||||
public boolean bindingQuery(String address) throws Exception {
|
||||
BindingQueryResult bindingQueryResult = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
|
||||
if (!bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateJmsQueues()) {
|
||||
if (!bindingQueryResult.isExists() && bindingQueryResult.isAutoCreateQueues()) {
|
||||
try {
|
||||
serverSession.createQueue(new SimpleString(address), new SimpleString(address), null, false, true);
|
||||
} catch (ActiveMQQueueExistsException e) {
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.hornetq.client;
|
|||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
|
@ -71,7 +72,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext {
|
|||
public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException {
|
||||
SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) getSessionChannel().sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP);
|
||||
|
||||
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false);
|
||||
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false, ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -913,7 +913,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
if (destination.isQueue()) {
|
||||
SimpleString physicalName = new SimpleString(destination.getPhysicalName());
|
||||
BindingQueryResult result = server.bindingQuery(physicalName);
|
||||
if (!result.isExists() && !result.isAutoCreateJmsQueues()) {
|
||||
if (!result.isExists() && !result.isAutoCreateQueues()) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -173,7 +173,7 @@ public class AMQSession implements SessionCallback {
|
|||
BindingQueryResult bindingQuery = server.bindingQuery(queueName);
|
||||
QueueQueryResult queueBinding = server.queueQuery(queueName);
|
||||
|
||||
boolean isAutoCreate = bindingQuery.isExists() ? true : bindingQuery.isAutoCreateJmsQueues();
|
||||
boolean isAutoCreate = bindingQuery.isExists() ? true : bindingQuery.isAutoCreateQueues();
|
||||
|
||||
if (!queueBinding.isExists()) {
|
||||
if (isAutoCreate) {
|
||||
|
|
|
@ -166,6 +166,17 @@ public final class Validators {
|
|||
};
|
||||
|
||||
public static final Validator ROUTING_TYPE = new Validator() {
|
||||
@Override
|
||||
public void validate(final String name, final Object value) {
|
||||
String val = (String) value;
|
||||
if (val == null || !val.equals(RoutingType.ANYCAST.toString()) &&
|
||||
!val.equals(RoutingType.MULTICAST.toString())) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingType(val);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
public static final Validator DIVERT_ROUTING_TYPE = new Validator() {
|
||||
@Override
|
||||
public void validate(final String name, final Object value) {
|
||||
String val = (String) value;
|
||||
|
|
|
@ -183,6 +183,14 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
private static final String AUTO_DELETE_ADDRESSES = "auto-delete-addresses";
|
||||
|
||||
private static final String DEFAULT_DELETE_ON_NO_CONSUMERS = "default-delete-on-no-consumers";
|
||||
|
||||
private static final String DEFAULT_MAX_CONSUMERS = "default-max-consumers";
|
||||
|
||||
private static final String DEFAULT_QUEUE_ROUTING_TYPE = "default-queue-routing-type";
|
||||
|
||||
private static final String DEFAULT_ADDRESS_ROUTING_TYPE = "default-address-routing-type";
|
||||
|
||||
private static final String MANAGEMENT_BROWSE_PAGE_SIZE = "management-browse-page-size";
|
||||
|
||||
private static final String MAX_CONNECTIONS_NODE_NAME = "max-connections";
|
||||
|
@ -871,6 +879,20 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
addressSettings.setAutoDeleteAddresses(XMLUtil.parseBoolean(child));
|
||||
} else if (MANAGEMENT_BROWSE_PAGE_SIZE.equalsIgnoreCase(name)) {
|
||||
addressSettings.setManagementBrowsePageSize(XMLUtil.parseInt(child));
|
||||
} else if (DEFAULT_DELETE_ON_NO_CONSUMERS.equalsIgnoreCase(name)) {
|
||||
addressSettings.setDefaultDeleteOnNoConsumers(XMLUtil.parseBoolean(child));
|
||||
} else if (DEFAULT_MAX_CONSUMERS.equalsIgnoreCase(name)) {
|
||||
addressSettings.setDefaultMaxConsumers(XMLUtil.parseInt(child));
|
||||
} else if (DEFAULT_QUEUE_ROUTING_TYPE.equalsIgnoreCase(name)) {
|
||||
String value = getTrimmedTextContent(child);
|
||||
Validators.ROUTING_TYPE.validate(DEFAULT_QUEUE_ROUTING_TYPE, value);
|
||||
RoutingType routingType = RoutingType.valueOf(value);
|
||||
addressSettings.setDefaultQueueRoutingType(routingType);
|
||||
} else if (DEFAULT_ADDRESS_ROUTING_TYPE.equalsIgnoreCase(name)) {
|
||||
String value = getTrimmedTextContent(child);
|
||||
Validators.ROUTING_TYPE.validate(DEFAULT_ADDRESS_ROUTING_TYPE, value);
|
||||
RoutingType routingType = RoutingType.valueOf(value);
|
||||
addressSettings.setDefaultAddressRoutingType(routingType);
|
||||
}
|
||||
}
|
||||
return setting;
|
||||
|
@ -1570,7 +1592,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
String transformerClassName = getString(e, "transformer-class-name", null, Validators.NO_CHECK);
|
||||
|
||||
RoutingType routingType = RoutingType.valueOf(getString(e, "routing-type", ActiveMQDefaultConfiguration.getDefaultDivertRoutingType(), Validators.ROUTING_TYPE));
|
||||
RoutingType routingType = RoutingType.valueOf(getString(e, "routing-type", ActiveMQDefaultConfiguration.getDefaultDivertRoutingType(), Validators.DIVERT_ROUTING_TYPE));
|
||||
|
||||
String filterString = null;
|
||||
|
||||
|
|
|
@ -619,14 +619,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
|||
@Deprecated
|
||||
@Override
|
||||
public void deployQueue(final String address, final String name, final String filterString) throws Exception {
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
try {
|
||||
server.deployQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), new SimpleString(filterString), true, false);
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
deployQueue(address, name, filterString, true);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
|
@ -640,7 +633,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
|||
SimpleString filter = filterStr == null ? null : new SimpleString(filterStr);
|
||||
clearIO();
|
||||
try {
|
||||
server.deployQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), filter, durable, false);
|
||||
server.deployQueue(SimpleString.toSimpleString(address), server.getAddressSettingsRepository().getMatch(address).getDefaultQueueRoutingType(), new SimpleString(name), filter, durable, false);
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
|
@ -648,92 +641,34 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
|||
|
||||
@Override
|
||||
public void createQueue(final String address, final String name) throws Exception {
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
try {
|
||||
server.createQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), null, true, false);
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
createQueue(address, name, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createQueue(final String address, final String name, final String routingType) throws Exception {
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
try {
|
||||
server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), null, true, false);
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
createQueue(address, name, routingType, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createQueue(final String address, final String name, final boolean durable) throws Exception {
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
try {
|
||||
server.createQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), null, durable, false);
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
createQueue(address, name, null, durable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createQueue(final String address, final String name, final boolean durable, final String routingType) throws Exception {
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
try {
|
||||
server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), null, durable, false);
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
createQueue(address, name, null, durable, routingType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createQueue(final String address,
|
||||
final String name,
|
||||
final String filterStr,
|
||||
final boolean durable) throws Exception {
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
try {
|
||||
SimpleString filter = null;
|
||||
if (filterStr != null && !filterStr.trim().equals("")) {
|
||||
filter = new SimpleString(filterStr);
|
||||
}
|
||||
|
||||
server.createQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), filter, durable, false);
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
public void createQueue(final String address, final String name, final String filterStr, final boolean durable) throws Exception {
|
||||
createQueue(address, name, filterStr, durable, server.getAddressSettingsRepository().getMatch(address).getDefaultQueueRoutingType().toString());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void createQueue(final String address,
|
||||
final String name,
|
||||
final String filterStr,
|
||||
final boolean durable,
|
||||
final String routingType) throws Exception {
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
try {
|
||||
SimpleString filter = null;
|
||||
if (filterStr != null && !filterStr.trim().equals("")) {
|
||||
filter = new SimpleString(filterStr);
|
||||
}
|
||||
|
||||
server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false);
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
public void createQueue(final String address, final String name, final String filterStr, final boolean durable, final String routingType) throws Exception {
|
||||
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(address);
|
||||
createQueue(address, routingType, name, filterStr, durable, addressSettings.getDefaultMaxConsumers(), addressSettings.isDefaultDeleteOnNoConsumers(), addressSettings.isAutoCreateAddresses());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -225,6 +225,7 @@ public class SimpleAddressManager implements AddressManager {
|
|||
return addressInfoMap.computeIfPresent(addressName, remappingFunction);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean addOrUpdateAddressInfo(AddressInfo addressInfo) {
|
||||
boolean isNew = addAddressInfo(addressInfo);
|
||||
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBin
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
|
||||
|
@ -300,10 +301,12 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
requiresResponse = true;
|
||||
SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet;
|
||||
BindingQueryResult result = session.executeBindingQuery(request.getAddress());
|
||||
if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) {
|
||||
response = new SessionBindingQueryResponseMessage_V3(result.isExists(), result.getQueueNames(), result.isAutoCreateJmsQueues(), result.isAutoCreateJmsTopics());
|
||||
if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) {
|
||||
response = new SessionBindingQueryResponseMessage_V4(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultDeleteOnNoConsumers(), result.getDefaultMaxConsumers());
|
||||
} else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) {
|
||||
response = new SessionBindingQueryResponseMessage_V3(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses());
|
||||
} else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2)) {
|
||||
response = new SessionBindingQueryResponseMessage_V2(result.isExists(), result.getQueueNames(), result.isAutoCreateJmsQueues());
|
||||
response = new SessionBindingQueryResponseMessage_V2(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues());
|
||||
} else {
|
||||
response = new SessionBindingQueryResponseMessage(result.isExists(), result.getQueueNames());
|
||||
}
|
||||
|
|
|
@ -26,36 +26,54 @@ public class BindingQueryResult {
|
|||
|
||||
private List<SimpleString> queueNames;
|
||||
|
||||
private boolean autoCreateJmsQueues;
|
||||
private boolean autoCreateQueues;
|
||||
|
||||
private boolean autoCreateJmsTopics;
|
||||
private boolean autoCreateAddresses;
|
||||
|
||||
private boolean defaultDeleteOnNoConsumers;
|
||||
|
||||
private int defaultMaxConsumers;
|
||||
|
||||
public BindingQueryResult(final boolean exists,
|
||||
final List<SimpleString> queueNames,
|
||||
final boolean autoCreateJmsQueues,
|
||||
final boolean autoCreateJmsTopics) {
|
||||
final boolean autoCreateQueues,
|
||||
final boolean autoCreateAddresses,
|
||||
final boolean defaultDeleteOnNoConsumers,
|
||||
final int defaultMaxConsumers) {
|
||||
this.exists = exists;
|
||||
|
||||
this.queueNames = queueNames;
|
||||
|
||||
this.autoCreateJmsQueues = autoCreateJmsQueues;
|
||||
this.autoCreateQueues = autoCreateQueues;
|
||||
|
||||
this.autoCreateJmsTopics = autoCreateJmsTopics;
|
||||
this.autoCreateAddresses = autoCreateAddresses;
|
||||
|
||||
this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers;
|
||||
|
||||
this.defaultMaxConsumers = defaultMaxConsumers;
|
||||
}
|
||||
|
||||
public boolean isExists() {
|
||||
return exists;
|
||||
}
|
||||
|
||||
public boolean isAutoCreateJmsQueues() {
|
||||
return autoCreateJmsQueues;
|
||||
public boolean isAutoCreateQueues() {
|
||||
return autoCreateQueues;
|
||||
}
|
||||
|
||||
public boolean isAutoCreateJmsTopics() {
|
||||
return autoCreateJmsTopics;
|
||||
public boolean isAutoCreateAddresses() {
|
||||
return autoCreateAddresses;
|
||||
}
|
||||
|
||||
public List<SimpleString> getQueueNames() {
|
||||
return queueNames;
|
||||
}
|
||||
|
||||
public boolean isDefaultDeleteOnNoConsumers() {
|
||||
return defaultDeleteOnNoConsumers;
|
||||
}
|
||||
|
||||
public int getDefaultMaxConsumers() {
|
||||
return defaultMaxConsumers;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -691,8 +691,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
|
||||
}
|
||||
|
||||
boolean autoCreateJmsQueues = getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateQueues();
|
||||
boolean autoCreateJmsTopics = getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateAddresses();
|
||||
AddressSettings addressSettings = getAddressSettingsRepository().getMatch(address.toString());
|
||||
|
||||
boolean autoCreateQeueus = addressSettings.isAutoCreateQueues();
|
||||
boolean autoCreateAddresses = addressSettings.isAutoCreateAddresses();
|
||||
boolean defaultDeleteOnNoConsumers = addressSettings.isDefaultDeleteOnNoConsumers();
|
||||
int defaultMaxConsumers = addressSettings.getDefaultMaxConsumers();
|
||||
|
||||
List<SimpleString> names = new ArrayList<>();
|
||||
|
||||
|
@ -700,7 +704,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
ManagementService managementService = getManagementService();
|
||||
if (managementService != null) {
|
||||
if (address.equals(managementService.getManagementAddress())) {
|
||||
return new BindingQueryResult(true, names, autoCreateJmsQueues, autoCreateJmsTopics);
|
||||
return new BindingQueryResult(true, names, autoCreateQeueus, autoCreateAddresses, defaultDeleteOnNoConsumers, defaultMaxConsumers);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -712,7 +716,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
}
|
||||
}
|
||||
|
||||
return new BindingQueryResult(getAddressInfo(address) != null, names, autoCreateJmsQueues, autoCreateJmsTopics);
|
||||
return new BindingQueryResult(getAddressInfo(address) != null, names, autoCreateQeueus, autoCreateAddresses, defaultDeleteOnNoConsumers, defaultMaxConsumers);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -722,6 +726,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
}
|
||||
|
||||
boolean autoCreateQueues = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateQueues();
|
||||
boolean defaultDeleteOnNoConsumers = getAddressSettingsRepository().getMatch(name.toString()).isDefaultDeleteOnNoConsumers();
|
||||
int defaultMaxConsumers = getAddressSettingsRepository().getMatch(name.toString()).getDefaultMaxConsumers();
|
||||
|
||||
QueueQueryResult response;
|
||||
|
||||
|
@ -741,7 +747,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
// make an exception for the management address (see HORNETQ-29)
|
||||
response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1);
|
||||
} else if (autoCreateQueues) {
|
||||
response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false, false, false, RoutingType.MULTICAST, 0);
|
||||
response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false, false, defaultDeleteOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers);
|
||||
} else {
|
||||
response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, 0);
|
||||
}
|
||||
|
@ -755,14 +761,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
|
||||
}
|
||||
|
||||
boolean autoCreateAddresses = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateAddresses();
|
||||
AddressSettings addressSettings = getAddressSettingsRepository().getMatch(name.toString());
|
||||
|
||||
boolean autoCreateAddresses = addressSettings.isAutoCreateAddresses();
|
||||
boolean defaultDeleteOnNoConsumers = addressSettings.isDefaultDeleteOnNoConsumers();
|
||||
int defaultMaxConsumers = addressSettings.getDefaultMaxConsumers();
|
||||
|
||||
AddressInfo addressInfo = postOffice.getAddressInfo(name);
|
||||
AddressQueryResult response;
|
||||
if (addressInfo != null) {
|
||||
response = new AddressQueryResult(addressInfo.getName(), addressInfo.getRoutingTypes(), addressInfo.getId(), addressInfo.isAutoCreated(), true, autoCreateAddresses);
|
||||
response = new AddressQueryResult(addressInfo.getName(), addressInfo.getRoutingTypes(), addressInfo.getId(), addressInfo.isAutoCreated(), true, autoCreateAddresses, defaultDeleteOnNoConsumers, defaultMaxConsumers);
|
||||
} else {
|
||||
response = new AddressQueryResult(name, null, -1, false, false, autoCreateAddresses);
|
||||
response = new AddressQueryResult(name, null, -1, false, false, autoCreateAddresses, defaultDeleteOnNoConsumers, defaultMaxConsumers);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
@ -1490,7 +1500,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
final SimpleString filterString,
|
||||
final boolean durable,
|
||||
final boolean temporary) throws Exception {
|
||||
return createQueue(address, routingType, queueName, filterString, null, durable, temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), false);
|
||||
AddressSettings as = getAddressSettingsRepository().getMatch(address.toString());
|
||||
return createQueue(address, routingType, queueName, filterString, null, durable, temporary, as.getDefaultMaxConsumers(), as.isDefaultDeleteOnNoConsumers(), as.isAutoCreateAddresses());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1499,7 +1510,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
final SimpleString filterString,
|
||||
final boolean durable,
|
||||
final boolean temporary) throws Exception {
|
||||
return createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filterString, durable, temporary);
|
||||
return createQueue(address, getAddressSettingsRepository().getMatch(address.toString()).getDefaultQueueRoutingType(), queueName, filterString, durable, temporary);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1523,7 +1534,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
final SimpleString user,
|
||||
final boolean durable,
|
||||
final boolean temporary) throws Exception {
|
||||
return createQueue(address, routingType, queueName, filterString, user, durable, temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), true);
|
||||
AddressSettings as = getAddressSettingsRepository().getMatch(address.toString());
|
||||
return createQueue(address, routingType, queueName, filterString, user, durable, temporary, as.getDefaultMaxConsumers(), as.isDefaultDeleteOnNoConsumers(), as.isAutoCreateAddresses());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1546,9 +1558,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
final boolean durable,
|
||||
final boolean temporary,
|
||||
final boolean autoCreated) throws Exception {
|
||||
AddressSettings as = getAddressSettingsRepository().getMatch(address.toString());
|
||||
return createQueue(address, routingType, queueName, filterString, user, durable, temporary,
|
||||
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
|
||||
ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), autoCreated);
|
||||
as.getDefaultMaxConsumers(),
|
||||
as.isDefaultDeleteOnNoConsumers(), autoCreated);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1577,7 +1590,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
|
||||
if (routingType == null) {
|
||||
AddressInfo addressInfo = getAddressInfo(address);
|
||||
routingType = addressInfo.getRoutingTypes().size() == 1 ? addressInfo.getRoutingType() : ActiveMQDefaultConfiguration.getDefaultRoutingType();
|
||||
routingType = addressInfo.getRoutingTypes().size() == 1 ? addressInfo.getRoutingType() : getAddressSettingsRepository().getMatch(address.toString()).getDefaultQueueRoutingType();
|
||||
if (routingType == null) {
|
||||
// TODO (mtaylor) throw exception Can not determine routing type info from address
|
||||
}
|
||||
|
@ -1634,7 +1647,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
final SimpleString filterString,
|
||||
final boolean durable,
|
||||
final boolean temporary) throws Exception {
|
||||
return deployQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), resourceName, filterString, durable, temporary, false);
|
||||
return deployQueue(address, getAddressSettingsRepository().getMatch(address.toString()).getDefaultQueueRoutingType(), resourceName, filterString, durable, temporary);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1664,7 +1677,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
final boolean durable,
|
||||
final boolean temporary,
|
||||
final boolean autoCreated) throws Exception {
|
||||
return deployQueue(address, routingType, queueName, filterString, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), true);
|
||||
AddressSettings as = getAddressSettingsRepository().getMatch(address.toString());
|
||||
return deployQueue(address, routingType, queueName, filterString, durable, temporary, autoCreated, as.getDefaultMaxConsumers(), as.isDefaultDeleteOnNoConsumers(), as.isAutoCreateAddresses());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -2398,6 +2412,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
postOffice.removeRoutingType(addressName,routingType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean createAddressInfo(AddressInfo addressInfo) throws Exception {
|
||||
boolean result = postOffice.addAddressInfo(addressInfo);
|
||||
|
||||
|
|
|
@ -31,7 +31,6 @@ import java.util.Set;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
|
||||
|
@ -79,6 +78,7 @@ import org.apache.activemq.artemis.core.server.ServerSession;
|
|||
import org.apache.activemq.artemis.core.server.TempQueueObserver;
|
||||
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
||||
import org.apache.activemq.artemis.core.server.management.Notification;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.transaction.ResourceManager;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction.State;
|
||||
|
@ -503,7 +503,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
final SimpleString filterString,
|
||||
final boolean temporary,
|
||||
final boolean durable) throws Exception {
|
||||
return createQueue(address, name, ActiveMQDefaultConfiguration.getDefaultRoutingType(), filterString, temporary, durable, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), false);
|
||||
AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString());
|
||||
return createQueue(address, name, as.getDefaultQueueRoutingType(), filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultDeleteOnNoConsumers(), false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -513,7 +514,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
final SimpleString filterString,
|
||||
final boolean temporary,
|
||||
final boolean durable) throws Exception {
|
||||
return createQueue(address, name, routingType, filterString, temporary, durable, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), false);
|
||||
AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString());
|
||||
return createQueue(address, name, routingType, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultDeleteOnNoConsumers(), false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -576,7 +578,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
boolean temporary,
|
||||
boolean durable,
|
||||
boolean autoCreated) throws Exception {
|
||||
return createQueue(address, name, routingType, filterString, temporary, durable, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), autoCreated);
|
||||
AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString());
|
||||
return createQueue(address, name, routingType, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultDeleteOnNoConsumers(), autoCreated);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,11 +18,14 @@ package org.apache.activemq.artemis.core.settings.impl;
|
|||
|
||||
import java.io.Serializable;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.settings.Mergeable;
|
||||
import org.apache.activemq.artemis.utils.BufferHelper;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
/**
|
||||
* Configuration settings that are applied on the address level
|
||||
|
@ -129,12 +132,16 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
private SlowConsumerPolicy slowConsumerPolicy = null;
|
||||
|
||||
@Deprecated
|
||||
private Boolean autoCreateJmsQueues = null;
|
||||
|
||||
@Deprecated
|
||||
private Boolean autoDeleteJmsQueues = null;
|
||||
|
||||
@Deprecated
|
||||
private Boolean autoCreateJmsTopics = null;
|
||||
|
||||
@Deprecated
|
||||
private Boolean autoDeleteJmsTopics = null;
|
||||
|
||||
private Boolean autoCreateQueues = null;
|
||||
|
@ -149,6 +156,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
private Long maxSizeBytesRejectThreshold = null;
|
||||
|
||||
private Integer defaultMaxConsumers = null;
|
||||
|
||||
private Boolean defaultDeleteOnNoConsumers = null;
|
||||
|
||||
private RoutingType defaultQueueRoutingType = null;
|
||||
|
||||
private RoutingType defaultAddressRoutingType = null;
|
||||
|
||||
//from amq5
|
||||
//make it transient
|
||||
private transient Integer queuePrefetch = null;
|
||||
|
@ -184,6 +199,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
this.managementBrowsePageSize = other.managementBrowsePageSize;
|
||||
this.queuePrefetch = other.queuePrefetch;
|
||||
this.maxSizeBytesRejectThreshold = other.maxSizeBytesRejectThreshold;
|
||||
this.defaultMaxConsumers = other.defaultMaxConsumers;
|
||||
this.defaultDeleteOnNoConsumers = other.defaultDeleteOnNoConsumers;
|
||||
this.defaultQueueRoutingType = other.defaultQueueRoutingType;
|
||||
this.defaultAddressRoutingType = other.defaultAddressRoutingType;
|
||||
}
|
||||
|
||||
public AddressSettings() {
|
||||
|
@ -269,6 +288,42 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return this;
|
||||
}
|
||||
|
||||
public int getDefaultMaxConsumers() {
|
||||
return defaultMaxConsumers != null ? defaultMaxConsumers : ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
|
||||
}
|
||||
|
||||
public AddressSettings setDefaultMaxConsumers(Integer defaultMaxConsumers) {
|
||||
this.defaultMaxConsumers = defaultMaxConsumers;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isDefaultDeleteOnNoConsumers() {
|
||||
return defaultDeleteOnNoConsumers != null ? defaultDeleteOnNoConsumers : ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers();
|
||||
}
|
||||
|
||||
public AddressSettings setDefaultDeleteOnNoConsumers(Boolean defaultDeleteOnNoConsumers) {
|
||||
this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RoutingType getDefaultQueueRoutingType() {
|
||||
return defaultQueueRoutingType != null ? defaultQueueRoutingType : ActiveMQDefaultConfiguration.getDefaultRoutingType();
|
||||
}
|
||||
|
||||
public AddressSettings setDefaultQueueRoutingType(RoutingType defaultQueueRoutingType) {
|
||||
this.defaultQueueRoutingType = defaultQueueRoutingType;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RoutingType getDefaultAddressRoutingType() {
|
||||
return defaultAddressRoutingType != null ? defaultAddressRoutingType : ActiveMQDefaultConfiguration.getDefaultRoutingType();
|
||||
}
|
||||
|
||||
public AddressSettings setDefaultAddressRoutingType(RoutingType defaultAddressRoutingType) {
|
||||
this.defaultAddressRoutingType = defaultAddressRoutingType;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isLastValueQueue() {
|
||||
return lastValueQueue != null ? lastValueQueue : AddressSettings.DEFAULT_LAST_VALUE_QUEUE;
|
||||
}
|
||||
|
@ -554,6 +609,18 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
if (maxSizeBytesRejectThreshold == null) {
|
||||
maxSizeBytesRejectThreshold = merged.maxSizeBytesRejectThreshold;
|
||||
}
|
||||
if (defaultMaxConsumers == null) {
|
||||
defaultMaxConsumers = merged.defaultMaxConsumers;
|
||||
}
|
||||
if (defaultDeleteOnNoConsumers == null) {
|
||||
defaultDeleteOnNoConsumers = merged.defaultDeleteOnNoConsumers;
|
||||
}
|
||||
if (defaultQueueRoutingType == null) {
|
||||
defaultQueueRoutingType = merged.defaultQueueRoutingType;
|
||||
}
|
||||
if (defaultAddressRoutingType == null) {
|
||||
defaultAddressRoutingType = merged.defaultAddressRoutingType;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -627,6 +694,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
managementBrowsePageSize = BufferHelper.readNullableInteger(buffer);
|
||||
|
||||
maxSizeBytesRejectThreshold = BufferHelper.readNullableLong(buffer);
|
||||
|
||||
defaultMaxConsumers = BufferHelper.readNullableInteger(buffer);
|
||||
|
||||
defaultDeleteOnNoConsumers = BufferHelper.readNullableBoolean(buffer);
|
||||
|
||||
defaultQueueRoutingType = RoutingType.getType(buffer.readByte());
|
||||
|
||||
defaultAddressRoutingType = RoutingType.getType(buffer.readByte());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -660,7 +735,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
BufferHelper.sizeOfNullableBoolean(autoCreateAddresses) +
|
||||
BufferHelper.sizeOfNullableBoolean(autoDeleteAddresses) +
|
||||
BufferHelper.sizeOfNullableInteger(managementBrowsePageSize) +
|
||||
BufferHelper.sizeOfNullableLong(maxSizeBytesRejectThreshold);
|
||||
BufferHelper.sizeOfNullableLong(maxSizeBytesRejectThreshold) +
|
||||
BufferHelper.sizeOfNullableInteger(defaultMaxConsumers) +
|
||||
BufferHelper.sizeOfNullableBoolean(defaultDeleteOnNoConsumers) +
|
||||
DataConstants.SIZE_BYTE +
|
||||
DataConstants.SIZE_BYTE;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -722,6 +801,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
BufferHelper.writeNullableInteger(buffer, managementBrowsePageSize);
|
||||
|
||||
BufferHelper.writeNullableLong(buffer, maxSizeBytesRejectThreshold);
|
||||
|
||||
BufferHelper.writeNullableInteger(buffer, defaultMaxConsumers);
|
||||
|
||||
BufferHelper.writeNullableBoolean(buffer, defaultDeleteOnNoConsumers);
|
||||
|
||||
buffer.writeByte(defaultQueueRoutingType == null ? -1 : defaultQueueRoutingType.getType());
|
||||
|
||||
buffer.writeByte(defaultAddressRoutingType == null ? -1 : defaultAddressRoutingType.getType());
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
|
@ -760,7 +847,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
result = prime * result + ((autoDeleteAddresses == null) ? 0 : autoDeleteAddresses.hashCode());
|
||||
result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode());
|
||||
result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode());
|
||||
result = prime * result + ((maxSizeBytesRejectThreshold == null) ? 0 : queuePrefetch.hashCode());
|
||||
result = prime * result + ((maxSizeBytesRejectThreshold == null) ? 0 : maxSizeBytesRejectThreshold.hashCode());
|
||||
result = prime * result + ((defaultMaxConsumers == null) ? 0 : defaultMaxConsumers.hashCode());
|
||||
result = prime * result + ((defaultDeleteOnNoConsumers == null) ? 0 : defaultDeleteOnNoConsumers.hashCode());
|
||||
result = prime * result + ((defaultQueueRoutingType == null) ? 0 : defaultQueueRoutingType.hashCode());
|
||||
result = prime * result + ((defaultAddressRoutingType == null) ? 0 : defaultAddressRoutingType.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -927,6 +1018,30 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return false;
|
||||
} else if (!maxSizeBytesRejectThreshold.equals(other.maxSizeBytesRejectThreshold))
|
||||
return false;
|
||||
|
||||
if (defaultMaxConsumers == null) {
|
||||
if (other.defaultMaxConsumers != null)
|
||||
return false;
|
||||
} else if (!defaultMaxConsumers.equals(other.defaultMaxConsumers))
|
||||
return false;
|
||||
|
||||
if (defaultDeleteOnNoConsumers == null) {
|
||||
if (other.defaultDeleteOnNoConsumers != null)
|
||||
return false;
|
||||
} else if (!defaultDeleteOnNoConsumers.equals(other.defaultDeleteOnNoConsumers))
|
||||
return false;
|
||||
|
||||
if (defaultQueueRoutingType == null) {
|
||||
if (other.defaultQueueRoutingType != null)
|
||||
return false;
|
||||
} else if (!defaultQueueRoutingType.equals(other.defaultQueueRoutingType))
|
||||
return false;
|
||||
|
||||
if (defaultAddressRoutingType == null) {
|
||||
if (other.defaultAddressRoutingType != null)
|
||||
return false;
|
||||
} else if (!defaultAddressRoutingType.equals(other.defaultAddressRoutingType))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -992,6 +1107,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
autoDeleteAddresses +
|
||||
", managementBrowsePageSize=" +
|
||||
managementBrowsePageSize +
|
||||
", defaultMaxConsumers=" +
|
||||
defaultMaxConsumers +
|
||||
", defaultDeleteOnNoConsumers=" +
|
||||
defaultDeleteOnNoConsumers +
|
||||
", defaultQueueRoutingType=" +
|
||||
defaultQueueRoutingType +
|
||||
", defaultAddressRoutingType=" +
|
||||
defaultAddressRoutingType +
|
||||
"]";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1577,20 +1577,12 @@
|
|||
|
||||
<xsd:element ref="filter" maxOccurs="1" minOccurs="0"/>
|
||||
|
||||
<xsd:element name="routing-type" default="STRIP" maxOccurs="1" minOccurs="0">
|
||||
<xsd:element name="routing-type" type="divert-routing-type" default="STRIP" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
how should the routing-type on the diverted messages be set?
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
<xsd:simpleType>
|
||||
<xsd:restriction base="xsd:string">
|
||||
<xsd:enumeration value="ANYCAST"/>
|
||||
<xsd:enumeration value="MULTICAST"/>
|
||||
<xsd:enumeration value="STRIP"/>
|
||||
<xsd:enumeration value="PASS"/>
|
||||
</xsd:restriction>
|
||||
</xsd:simpleType>
|
||||
</xsd:element>
|
||||
</xsd:all>
|
||||
|
||||
|
@ -2489,8 +2481,7 @@
|
|||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="management-browse-page-size" type="xsd:int" default="200" maxOccurs="1"
|
||||
minOccurs="0">
|
||||
<xsd:element name="management-browse-page-size" type="xsd:int" default="200" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
how many message a management resource can browse
|
||||
|
@ -2498,6 +2489,38 @@
|
|||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="default-delete-on-no-consumers" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Purge and pause this queue when the last consumer disconnects
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="default-max-consumers" type="xsd:int" default="200" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
the maximum number of consumers allowed on this queue at any one time
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="default-queue-routing-type" type="routing-type" default="MULTICAST" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
the routing-type used on auto-created queues
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="default-address-routing-type" type="routing-type" default="MULTICAST" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
the routing-type used on auto-created addresses
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
</xsd:all>
|
||||
|
||||
<xsd:attribute name="match" type="xsd:string" use="required">
|
||||
|
@ -2594,6 +2617,22 @@
|
|||
|
||||
<!-- 2.0 Addressing configuration -->
|
||||
|
||||
<xsd:simpleType name="routing-type">
|
||||
<xsd:restriction base="xsd:string">
|
||||
<xsd:enumeration value="ANYCAST"/>
|
||||
<xsd:enumeration value="MULTICAST"/>
|
||||
</xsd:restriction>
|
||||
</xsd:simpleType>
|
||||
|
||||
<xsd:simpleType name="divert-routing-type">
|
||||
<xsd:restriction base="xsd:string">
|
||||
<xsd:enumeration value="ANYCAST"/>
|
||||
<xsd:enumeration value="MULTICAST"/>
|
||||
<xsd:enumeration value="STRIP"/>
|
||||
<xsd:enumeration value="PASS"/>
|
||||
</xsd:restriction>
|
||||
</xsd:simpleType>
|
||||
|
||||
<xsd:complexType name="queueType">
|
||||
<xsd:all>
|
||||
<xsd:element ref="filter" maxOccurs="1" minOccurs="0"/>
|
||||
|
|
|
@ -305,6 +305,10 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
assertEquals(true, conf.getAddressesSettings().get("a1").isAutoDeleteJmsQueues());
|
||||
assertEquals(true, conf.getAddressesSettings().get("a1").isAutoCreateJmsTopics());
|
||||
assertEquals(true, conf.getAddressesSettings().get("a1").isAutoDeleteJmsTopics());
|
||||
assertEquals(false, conf.getAddressesSettings().get("a1").isDefaultDeleteOnNoConsumers());
|
||||
assertEquals(5, conf.getAddressesSettings().get("a1").getDefaultMaxConsumers());
|
||||
assertEquals(RoutingType.ANYCAST, conf.getAddressesSettings().get("a1").getDefaultQueueRoutingType());
|
||||
assertEquals(RoutingType.MULTICAST, conf.getAddressesSettings().get("a1").getDefaultAddressRoutingType());
|
||||
|
||||
assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
|
||||
assertEquals("a2.2", conf.getAddressesSettings().get("a2").getExpiryAddress().toString());
|
||||
|
@ -320,6 +324,10 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsQueues());
|
||||
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoCreateJmsTopics());
|
||||
assertEquals(false, conf.getAddressesSettings().get("a2").isAutoDeleteJmsTopics());
|
||||
assertEquals(true, conf.getAddressesSettings().get("a2").isDefaultDeleteOnNoConsumers());
|
||||
assertEquals(15, conf.getAddressesSettings().get("a2").getDefaultMaxConsumers());
|
||||
assertEquals(RoutingType.MULTICAST, conf.getAddressesSettings().get("a2").getDefaultQueueRoutingType());
|
||||
assertEquals(RoutingType.ANYCAST, conf.getAddressesSettings().get("a2").getDefaultAddressRoutingType());
|
||||
|
||||
assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
|
||||
assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.settings;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
|
@ -47,6 +48,8 @@ public class AddressSettingsTest extends ActiveMQTestBase {
|
|||
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_DELETE_QUEUES, addressSettings.isAutoDeleteQueues());
|
||||
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_ADDRESSES, addressSettings.isAutoCreateAddresses());
|
||||
Assert.assertEquals(AddressSettings.DEFAULT_AUTO_DELETE_ADDRESSES, addressSettings.isAutoDeleteAddresses());
|
||||
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), addressSettings.isDefaultDeleteOnNoConsumers());
|
||||
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), addressSettings.getDefaultMaxConsumers());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -272,6 +272,10 @@
|
|||
<auto-delete-queues>true</auto-delete-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-delete-addresses>true</auto-delete-addresses>
|
||||
<default-delete-on-no-consumers>false</default-delete-on-no-consumers>
|
||||
<default-max-consumers>5</default-max-consumers>
|
||||
<default-queue-routing-type>ANYCAST</default-queue-routing-type>
|
||||
<default-address-routing-type>MULTICAST</default-address-routing-type>
|
||||
</address-setting>
|
||||
<address-setting match="a2">
|
||||
<dead-letter-address>a2.1</dead-letter-address>
|
||||
|
@ -292,6 +296,10 @@
|
|||
<auto-delete-queues>false</auto-delete-queues>
|
||||
<auto-create-addresses>false</auto-create-addresses>
|
||||
<auto-delete-addresses>false</auto-delete-addresses>
|
||||
<default-delete-on-no-consumers>true</default-delete-on-no-consumers>
|
||||
<default-max-consumers>15</default-max-consumers>
|
||||
<default-queue-routing-type>MULTICAST</default-queue-routing-type>
|
||||
<default-address-routing-type>ANYCAST</default-address-routing-type>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
<resource-limit-settings>
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -86,7 +87,7 @@ public class SimpleTest extends ActiveMQTestBase {
|
|||
final String addressName = "simpleAddress";
|
||||
|
||||
// Create a queue bound to a particular address where the test will send to & consume from.
|
||||
session.createQueue(addressName, queueName);
|
||||
session.createQueue(addressName, RoutingType.ANYCAST, queueName);
|
||||
|
||||
// Create a producer to send a message to the previously created address.
|
||||
ClientProducer producer = session.createProducer(addressName);
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.UUID;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.tests.util.SingleServerTestBase;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -43,7 +44,7 @@ public class SingleServerSimpleTest extends SingleServerTestBase {
|
|||
final String addressName = "simpleAddress";
|
||||
|
||||
// Create a queue bound to a particular address where the test will send to & consume from.
|
||||
session.createQueue(addressName, queueName);
|
||||
session.createQueue(addressName, RoutingType.ANYCAST, queueName);
|
||||
|
||||
// Create a producer to send a message to the previously created address.
|
||||
ClientProducer producer = session.createProducer(addressName);
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -58,7 +59,7 @@ public class BlockingSendTest extends ActiveMQTestBase {
|
|||
|
||||
session = factory.createSession();
|
||||
|
||||
session.createQueue("address", "queue");
|
||||
session.createQueue("address", RoutingType.ANYCAST, "queue");
|
||||
|
||||
ClientProducer prod = session.createProducer("address");
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
|
@ -53,7 +54,7 @@ public class ConsumerFilterTest extends ActiveMQTestBase {
|
|||
session = sf.createSession();
|
||||
|
||||
session.start();
|
||||
session.createQueue("foo", "foo");
|
||||
session.createQueue("foo", RoutingType.ANYCAST, "foo");
|
||||
|
||||
producer = session.createProducer("foo");
|
||||
consumer = session.createConsumer("foo", "animal='giraffe'");
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
|||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
|
@ -227,7 +228,7 @@ public class InVMNonPersistentMessageBufferTest extends ActiveMQTestBase {
|
|||
|
||||
session = cf.createSession();
|
||||
|
||||
session.createQueue(InVMNonPersistentMessageBufferTest.address, InVMNonPersistentMessageBufferTest.queueName);
|
||||
session.createQueue(InVMNonPersistentMessageBufferTest.address, RoutingType.ANYCAST, InVMNonPersistentMessageBufferTest.queueName);
|
||||
|
||||
producer = session.createProducer(InVMNonPersistentMessageBufferTest.address);
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
|||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -55,7 +56,7 @@ public class MessageBufferTest extends ActiveMQTestBase {
|
|||
final String queueName = "simpleQueue";
|
||||
final String addressName = "simpleAddress";
|
||||
|
||||
session.createQueue(addressName, queueName);
|
||||
session.createQueue(addressName, RoutingType.ANYCAST, queueName);
|
||||
ClientProducer producer = session.createProducer(addressName);
|
||||
|
||||
ClientMessageImpl message = (ClientMessageImpl) session.createMessage(true);
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
|
|||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
|
@ -248,7 +249,7 @@ public class NIOvsOIOTest extends ActiveMQTestBase {
|
|||
|
||||
queueName = UUIDGenerator.getInstance().generateStringUUID();
|
||||
|
||||
session.createQueue(dest, queueName);
|
||||
session.createQueue(dest, RoutingType.ANYCAST, queueName);
|
||||
|
||||
consumer = session.createConsumer(queueName);
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
|||
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.Assert;
|
||||
|
@ -43,7 +44,7 @@ public class SessionClosedOnRemotingConnectionFailureTest extends ActiveMQTestBa
|
|||
public void testSessionClosedOnRemotingConnectionFailure() throws Exception {
|
||||
ClientSession session = addClientSession(sf.createSession());
|
||||
|
||||
session.createQueue("fooaddress", "fooqueue");
|
||||
session.createQueue("fooaddress", RoutingType.ANYCAST, "fooqueue");
|
||||
|
||||
ClientProducer prod = session.createProducer("fooaddress");
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.junit.Assert;
|
||||
|
@ -91,9 +92,9 @@ public class SimpleSendMultipleQueuesTest extends ActiveMQTestBase {
|
|||
|
||||
session = cf.createSession();
|
||||
|
||||
session.createQueue(SimpleSendMultipleQueuesTest.address, "queue1");
|
||||
session.createQueue(SimpleSendMultipleQueuesTest.address, "queue2");
|
||||
session.createQueue(SimpleSendMultipleQueuesTest.address, "queue3");
|
||||
session.createQueue(SimpleSendMultipleQueuesTest.address, RoutingType.MULTICAST, "queue1");
|
||||
session.createQueue(SimpleSendMultipleQueuesTest.address, RoutingType.MULTICAST, "queue2");
|
||||
session.createQueue(SimpleSendMultipleQueuesTest.address, RoutingType.MULTICAST, "queue3");
|
||||
|
||||
producer = session.createProducer(SimpleSendMultipleQueuesTest.address);
|
||||
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
|
||||
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
|
@ -201,9 +202,9 @@ public class TemporaryQueueTest extends SingleServerTestBase {
|
|||
|
||||
@Test
|
||||
public void testQueueWithWildcard() throws Exception {
|
||||
session.createQueue("a.b", "queue1");
|
||||
session.createTemporaryQueue("a.#", "queue2");
|
||||
session.createTemporaryQueue("a.#", "queue3");
|
||||
session.createQueue("a.b", RoutingType.MULTICAST, "queue1");
|
||||
session.createTemporaryQueue("a.#", RoutingType.MULTICAST, "queue2");
|
||||
session.createTemporaryQueue("a.#", RoutingType.MULTICAST, "queue3");
|
||||
|
||||
ClientProducer producer = session.createProducer("a.b");
|
||||
producer.send(session.createMessage(false));
|
||||
|
@ -240,9 +241,9 @@ public class TemporaryQueueTest extends SingleServerTestBase {
|
|||
|
||||
@Test
|
||||
public void testQueueWithWildcard2() throws Exception {
|
||||
session.createQueue("a.b", "queue1");
|
||||
session.createTemporaryQueue("a.#", "queue2");
|
||||
session.createTemporaryQueue("a.#", "queue3");
|
||||
session.createQueue("a.b", RoutingType.MULTICAST, "queue1");
|
||||
session.createTemporaryQueue("a.#", RoutingType.MULTICAST, "queue2");
|
||||
session.createTemporaryQueue("a.#", RoutingType.MULTICAST, "queue3");
|
||||
|
||||
ClientProducer producer = session.createProducer("a.b");
|
||||
producer.send(session.createMessage(false));
|
||||
|
@ -279,9 +280,9 @@ public class TemporaryQueueTest extends SingleServerTestBase {
|
|||
|
||||
@Test
|
||||
public void testQueueWithWildcard3() throws Exception {
|
||||
session.createQueue("a.b", "queue1");
|
||||
session.createTemporaryQueue("a.#", "queue2");
|
||||
session.createTemporaryQueue("a.#", "queue2.1");
|
||||
session.createQueue("a.b", RoutingType.MULTICAST, "queue1");
|
||||
session.createTemporaryQueue("a.#", RoutingType.MULTICAST, "queue2");
|
||||
session.createTemporaryQueue("a.#", RoutingType.MULTICAST, "queue2.1");
|
||||
|
||||
session.deleteQueue("queue2");
|
||||
}
|
||||
|
|
|
@ -67,6 +67,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
|
|||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Bridge;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Transformer;
|
||||
|
@ -762,7 +763,7 @@ public class BridgeTest extends ActiveMQTestBase {
|
|||
ClientSession session1 = sf1.createSession(false, true, true);
|
||||
|
||||
try {
|
||||
session1.createQueue(forwardAddress, queueName1);
|
||||
session1.createQueue(forwardAddress, RoutingType.ANYCAST, queueName1);
|
||||
} catch (Throwable ignored) {
|
||||
ignored.printStackTrace();
|
||||
}
|
||||
|
@ -893,7 +894,7 @@ public class BridgeTest extends ActiveMQTestBase {
|
|||
ClientSession session1 = sf1.createSession(false, true, true);
|
||||
|
||||
try {
|
||||
session1.createQueue(forwardAddress, queueName1);
|
||||
session1.createQueue(forwardAddress, RoutingType.ANYCAST, queueName1);
|
||||
} catch (Throwable ignored) {
|
||||
ignored.printStackTrace();
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnector;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
|
@ -656,7 +657,7 @@ public class ReattachTest extends ActiveMQTestBase {
|
|||
t.start();
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
session.createQueue("address", "queue" + i);
|
||||
session.createQueue("address", RoutingType.ANYCAST, "queue" + i);
|
||||
}
|
||||
|
||||
//
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
|||
import org.apache.activemq.artemis.api.core.client.TopologyMember;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
|
@ -148,7 +149,7 @@ public abstract class TopologyClusterTestBase extends ClusterTestBase {
|
|||
protected ClientSession checkSessionOrReconnect(ClientSession session, ServerLocator locator) throws Exception {
|
||||
try {
|
||||
String rand = RandomUtil.randomString();
|
||||
session.createQueue(rand, rand);
|
||||
session.createQueue(rand, RoutingType.MULTICAST, rand);
|
||||
session.deleteQueue(rand);
|
||||
return session;
|
||||
} catch (ActiveMQObjectClosedException oce) {
|
||||
|
|
|
@ -16,18 +16,21 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.jms;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.JMSConsumer;
|
||||
import javax.jms.JMSContext;
|
||||
import javax.jms.JMSProducer;
|
||||
import javax.jms.MessageFormatRuntimeException;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQJMSContext;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQSession;
|
||||
import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration;
|
||||
|
@ -133,6 +136,42 @@ public class JmsProducerTest extends JMSTestBase {
|
|||
context.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void defaultAutoCreatedQueueConfigTest() throws Exception {
|
||||
final String queueName = "q1";
|
||||
|
||||
server.getAddressSettingsRepository().addMatch(queueName, new AddressSettings().setDefaultMaxConsumers(5).setDefaultDeleteOnNoConsumers(true));
|
||||
|
||||
Queue q1 = context.createQueue(queueName);
|
||||
|
||||
context.createProducer().setProperty("prop1", 1).setProperty("prop2", 2).send(q1, "Text1");
|
||||
|
||||
org.apache.activemq.artemis.core.server.Queue queue = server.locateQueue(SimpleString.toSimpleString(queueName));
|
||||
|
||||
assertEquals(5, queue.getMaxConsumers());
|
||||
assertEquals(true, queue.isDeleteOnNoConsumers());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void defaultAutoCreatedQueueConfigTest2() throws Exception {
|
||||
final String queueName = "q1";
|
||||
|
||||
server.getAddressSettingsRepository().addMatch(queueName, new AddressSettings().setDefaultMaxConsumers(5).setDefaultDeleteOnNoConsumers(true));
|
||||
|
||||
Connection connection = cf.createConnection();
|
||||
|
||||
Session session = connection.createSession();
|
||||
|
||||
session.createProducer(session.createQueue(queueName));
|
||||
|
||||
org.apache.activemq.artemis.core.server.Queue queue = server.locateQueue(SimpleString.toSimpleString(queueName));
|
||||
|
||||
assertEquals(5, queue.getMaxConsumers());
|
||||
assertEquals(true, queue.isDeleteOnNoConsumers());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeliveryMode() {
|
||||
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
|||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
|
||||
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
||||
|
@ -45,7 +46,7 @@ import org.junit.Assert;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ConsumerTest extends JMSTestBase {
|
||||
public class JmsConsumerTest extends JMSTestBase {
|
||||
|
||||
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
||||
|
||||
|
@ -67,7 +68,7 @@ public class ConsumerTest extends JMSTestBase {
|
|||
topic = ActiveMQJMSClient.createTopic(T_NAME);
|
||||
topic2 = ActiveMQJMSClient.createTopic(T2_NAME);
|
||||
|
||||
jmsServer.createQueue(false, ConsumerTest.Q_NAME, null, true, ConsumerTest.Q_NAME);
|
||||
jmsServer.createQueue(false, JmsConsumerTest.Q_NAME, null, true, JmsConsumerTest.Q_NAME);
|
||||
jmsServer.createTopic(true, T_NAME, "/topic/" + T_NAME);
|
||||
jmsServer.createTopic(true, T2_NAME, "/topic/" + T2_NAME);
|
||||
cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
|
||||
|
@ -117,7 +118,7 @@ public class ConsumerTest extends JMSTestBase {
|
|||
public void testPreCommitAcks() throws Exception {
|
||||
conn = cf.createConnection();
|
||||
Session session = conn.createSession(false, ActiveMQJMSConstants.PRE_ACKNOWLEDGE);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
|
||||
MessageProducer producer = session.createProducer(jBossQueue);
|
||||
MessageConsumer consumer = session.createConsumer(jBossQueue);
|
||||
int noOfMessages = 100;
|
||||
|
@ -131,7 +132,7 @@ public class ConsumerTest extends JMSTestBase {
|
|||
Assert.assertNotNull(m);
|
||||
}
|
||||
|
||||
SimpleString queueName = new SimpleString(ConsumerTest.Q_NAME);
|
||||
SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
|
||||
Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
|
||||
Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
|
||||
}
|
||||
|
@ -140,7 +141,7 @@ public class ConsumerTest extends JMSTestBase {
|
|||
public void testIndividualACK() throws Exception {
|
||||
Connection conn = cf.createConnection();
|
||||
Session session = conn.createSession(false, ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
|
||||
MessageProducer producer = session.createProducer(jBossQueue);
|
||||
MessageConsumer consumer = session.createConsumer(jBossQueue);
|
||||
int noOfMessages = 100;
|
||||
|
@ -177,7 +178,7 @@ public class ConsumerTest extends JMSTestBase {
|
|||
Assert.assertEquals("m" + i, m.getText());
|
||||
}
|
||||
|
||||
SimpleString queueName = new SimpleString(ConsumerTest.Q_NAME);
|
||||
SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
|
||||
Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
|
||||
Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
|
||||
conn.close();
|
||||
|
@ -187,7 +188,7 @@ public class ConsumerTest extends JMSTestBase {
|
|||
public void testIndividualACKMessageConsumer() throws Exception {
|
||||
Connection conn = cf.createConnection();
|
||||
Session session = conn.createSession(false, ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
|
||||
MessageProducer producer = session.createProducer(jBossQueue);
|
||||
MessageConsumer consumer = session.createConsumer(jBossQueue);
|
||||
int noOfMessages = 100;
|
||||
|
@ -251,7 +252,7 @@ public class ConsumerTest extends JMSTestBase {
|
|||
Assert.assertEquals("m" + i, m.getText());
|
||||
}
|
||||
|
||||
SimpleString queueName = new SimpleString(ConsumerTest.Q_NAME);
|
||||
SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
|
||||
Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
|
||||
Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
|
||||
conn.close();
|
||||
|
@ -263,7 +264,7 @@ public class ConsumerTest extends JMSTestBase {
|
|||
conn = cf.createConnection();
|
||||
|
||||
Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
|
||||
MessageProducer producer = session.createProducer(jBossQueue);
|
||||
MessageConsumer consumer = session.createConsumer(jBossQueue);
|
||||
int noOfMessages = 100;
|
||||
|
@ -278,18 +279,18 @@ public class ConsumerTest extends JMSTestBase {
|
|||
}
|
||||
|
||||
// Messages should all have been acked since we set pre ack on the cf
|
||||
SimpleString queueName = new SimpleString(ConsumerTest.Q_NAME);
|
||||
SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
|
||||
Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
|
||||
Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreCommitAcksWithMessageExpiry() throws Exception {
|
||||
ConsumerTest.log.info("starting test");
|
||||
JmsConsumerTest.log.info("starting test");
|
||||
|
||||
conn = cf.createConnection();
|
||||
Session session = conn.createSession(false, ActiveMQJMSConstants.PRE_ACKNOWLEDGE);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
|
||||
MessageProducer producer = session.createProducer(jBossQueue);
|
||||
MessageConsumer consumer = session.createConsumer(jBossQueue);
|
||||
int noOfMessages = 1000;
|
||||
|
@ -317,7 +318,7 @@ public class ConsumerTest extends JMSTestBase {
|
|||
((ActiveMQConnectionFactory) cf).setPreAcknowledge(true);
|
||||
conn = cf.createConnection();
|
||||
Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
|
||||
MessageProducer producer = session.createProducer(jBossQueue);
|
||||
MessageConsumer consumer = session.createConsumer(jBossQueue);
|
||||
int noOfMessages = 1000;
|
||||
|
@ -344,7 +345,7 @@ public class ConsumerTest extends JMSTestBase {
|
|||
conn = cf.createConnection();
|
||||
|
||||
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
|
||||
MessageProducer producer = session.createProducer(jBossQueue);
|
||||
|
||||
QueueBrowser browser = session.createBrowser(jBossQueue);
|
||||
|
@ -390,7 +391,7 @@ public class ConsumerTest extends JMSTestBase {
|
|||
Session sessionConsumer = connConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
|
||||
MessageProducer producer = session.createProducer(jBossQueue);
|
||||
MessageConsumer consumer = sessionConsumer.createConsumer(jBossQueue);
|
||||
int noOfMessages = 1000;
|
||||
|
@ -428,7 +429,7 @@ public class ConsumerTest extends JMSTestBase {
|
|||
conn = cf.createConnection();
|
||||
|
||||
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
|
||||
MessageProducer producer = session.createProducer(jBossQueue);
|
||||
int noOfMessages = 10;
|
||||
for (int i = 0; i < noOfMessages; i++) {
|
||||
|
@ -461,7 +462,7 @@ public class ConsumerTest extends JMSTestBase {
|
|||
public void testClearExceptionListener() throws Exception {
|
||||
conn = cf.createConnection();
|
||||
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
|
||||
MessageConsumer consumer = session.createConsumer(jBossQueue);
|
||||
consumer.setMessageListener(new MessageListener() {
|
||||
@Override
|
||||
|
@ -477,7 +478,7 @@ public class ConsumerTest extends JMSTestBase {
|
|||
public void testCantReceiveWhenListenerIsSet() throws Exception {
|
||||
conn = cf.createConnection();
|
||||
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(ConsumerTest.Q_NAME);
|
||||
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
|
||||
MessageConsumer consumer = session.createConsumer(jBossQueue);
|
||||
consumer.setMessageListener(new MessageListener() {
|
||||
@Override
|
||||
|
@ -739,4 +740,27 @@ public class ConsumerTest extends JMSTestBase {
|
|||
conn.unsubscribe("c1");
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void defaultAutoCreatedQueueConfigTest() throws Exception {
|
||||
final String queueName = "q1";
|
||||
|
||||
server.getAddressSettingsRepository()
|
||||
.addMatch(queueName, new AddressSettings()
|
||||
.setDefaultMaxConsumers(5)
|
||||
.setDefaultDeleteOnNoConsumers(true));
|
||||
|
||||
Connection connection = cf.createConnection();
|
||||
|
||||
Session session = connection.createSession();
|
||||
|
||||
session.createConsumer(session.createQueue(queueName));
|
||||
|
||||
org.apache.activemq.artemis.core.server.Queue queue = server.locateQueue(SimpleString.toSimpleString(queueName));
|
||||
|
||||
assertEquals(5, queue.getMaxConsumers());
|
||||
assertEquals(true, queue.isDeleteOnNoConsumers());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
}
|
|
@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
|||
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -69,7 +70,7 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
|
|||
|
||||
fileMessage.releaseResources();
|
||||
|
||||
session.createQueue("A", "A");
|
||||
session.createQueue("A", RoutingType.ANYCAST, "A");
|
||||
|
||||
ClientProducer prod = session.createProducer("A");
|
||||
|
||||
|
|
|
@ -584,7 +584,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
|||
assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy());
|
||||
assertEquals(autoCreateJmsQueues, info.isAutoCreateJmsQueues());
|
||||
assertEquals(autoDeleteJmsQueues, info.isAutoDeleteJmsQueues());
|
||||
// assertEquals(autoCreateJmsTopics, info.isAutoCreateJmsTopics());
|
||||
// assertEquals(autoCreateJmsTopics, info.isAutoCreateAddresses());
|
||||
assertEquals(autoDeleteJmsTopics, info.isAutoDeleteJmsTopics());
|
||||
|
||||
serverControl.addAddressSettings(addressMatch, DLA, expiryAddress, expiryDelay, lastValueQueue, deliveryAttempts, -1, 1000, pageMaxCacheSize, redeliveryDelay, redeliveryMultiplier, maxRedeliveryDelay, redistributionDelay, sendToDLAOnNoRoute, addressFullMessagePolicy, slowConsumerThreshold, slowConsumerCheckPeriod, slowConsumerPolicy, autoCreateJmsQueues, autoDeleteJmsQueues, autoCreateJmsTopics, autoDeleteJmsTopics);
|
||||
|
@ -610,7 +610,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
|||
assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy());
|
||||
assertEquals(autoCreateJmsQueues, info.isAutoCreateJmsQueues());
|
||||
assertEquals(autoDeleteJmsQueues, info.isAutoDeleteJmsQueues());
|
||||
// assertEquals(autoCreateJmsTopics, info.isAutoCreateJmsTopics());
|
||||
// assertEquals(autoCreateJmsTopics, info.isAutoCreateAddresses());
|
||||
assertEquals(autoDeleteJmsTopics, info.isAutoDeleteJmsTopics());
|
||||
|
||||
ex = false;
|
||||
|
@ -675,8 +675,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
|||
|
||||
String divertQueue = RandomUtil.randomString();
|
||||
String queue = RandomUtil.randomString();
|
||||
session.createQueue(forwardingAddress, divertQueue);
|
||||
session.createQueue(address, queue);
|
||||
session.createQueue(forwardingAddress, RoutingType.ANYCAST, divertQueue);
|
||||
session.createQueue(address, RoutingType.ANYCAST, queue);
|
||||
|
||||
ClientProducer producer = session.createProducer(address);
|
||||
ClientMessage message = session.createMessage(false);
|
||||
|
@ -737,8 +737,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
|||
ClientSessionFactory csf = createSessionFactory(locator);
|
||||
ClientSession session = csf.createSession();
|
||||
|
||||
session.createQueue(sourceAddress, sourceQueue);
|
||||
session.createQueue(targetAddress, targetQueue);
|
||||
session.createQueue(sourceAddress, RoutingType.ANYCAST, sourceQueue);
|
||||
session.createQueue(targetAddress, RoutingType.ANYCAST, targetQueue);
|
||||
|
||||
serverControl.createBridge(name, sourceQueue, targetAddress, null, // forwardingAddress
|
||||
null, // filterString
|
||||
|
@ -985,8 +985,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
|||
ClientSessionFactory csf = createSessionFactory(locator);
|
||||
ClientSession session = csf.createSession();
|
||||
|
||||
session.createQueue(random1, random1);
|
||||
session.createQueue(random2, random2);
|
||||
session.createQueue(random1, RoutingType.ANYCAST, random1);
|
||||
session.createQueue(random2, RoutingType.ANYCAST, random2);
|
||||
|
||||
ClientProducer producer1 = session.createProducer(random1);
|
||||
ClientProducer producer2 = session.createProducer(random2);
|
||||
|
@ -1034,8 +1034,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
|||
ClientSessionFactory csf = createSessionFactory(locator);
|
||||
ClientSession session = csf.createSession();
|
||||
|
||||
session.createQueue(random1, random1);
|
||||
session.createQueue(random2, random2);
|
||||
session.createQueue(random1, RoutingType.ANYCAST, random1);
|
||||
session.createQueue(random2, RoutingType.ANYCAST, random2);
|
||||
|
||||
ClientProducer producer1 = session.createProducer(random1);
|
||||
ClientProducer producer2 = session.createProducer(random2);
|
||||
|
@ -1080,8 +1080,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
|||
ClientSessionFactory csf = createSessionFactory(locator);
|
||||
ClientSession session = csf.createSession();
|
||||
|
||||
session.createQueue(random1, random1);
|
||||
session.createQueue(random2, random2);
|
||||
session.createQueue(random1, RoutingType.ANYCAST, random1);
|
||||
session.createQueue(random2, RoutingType.ANYCAST, random2);
|
||||
|
||||
ClientProducer producer1 = session.createProducer(random1);
|
||||
ClientProducer producer2 = session.createProducer(random2);
|
||||
|
@ -1121,15 +1121,15 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
|||
String random2 = RandomUtil.randomString();
|
||||
|
||||
ActiveMQServerControl serverControl = createManagementControl();
|
||||
QueueControl queueControl1 = ManagementControlHelper.createQueueControl(SimpleString.toSimpleString(random1), SimpleString.toSimpleString(random1), mbeanServer);
|
||||
QueueControl queueControl2 = ManagementControlHelper.createQueueControl(SimpleString.toSimpleString(random2), SimpleString.toSimpleString(random2), mbeanServer);
|
||||
QueueControl queueControl1 = ManagementControlHelper.createQueueControl(SimpleString.toSimpleString(random1), SimpleString.toSimpleString(random1), RoutingType.ANYCAST, mbeanServer);
|
||||
QueueControl queueControl2 = ManagementControlHelper.createQueueControl(SimpleString.toSimpleString(random2), SimpleString.toSimpleString(random2), RoutingType.ANYCAST, mbeanServer);
|
||||
|
||||
ServerLocator locator = createInVMNonHALocator();
|
||||
ClientSessionFactory csf = createSessionFactory(locator);
|
||||
ClientSession session = csf.createSession();
|
||||
|
||||
session.createQueue(random1, random1);
|
||||
session.createQueue(random2, random2);
|
||||
session.createQueue(random1, RoutingType.ANYCAST, random1);
|
||||
session.createQueue(random2, RoutingType.ANYCAST, random2);
|
||||
|
||||
ClientConsumer consumer1 = session.createConsumer(random1);
|
||||
ClientConsumer consumer2 = session.createConsumer(random2);
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
|||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
@ -241,7 +242,7 @@ public class PagingWithFailoverAndCountersTest extends ActiveMQTestBase {
|
|||
ClientSessionFactory factory = locator.createSessionFactory();
|
||||
ClientSession session = factory.createSession();
|
||||
|
||||
session.createQueue("new-queue", "new-queue");
|
||||
session.createQueue("new-queue", RoutingType.ANYCAST, "new-queue");
|
||||
|
||||
System.out.println("created queue");
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
|||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -73,7 +74,7 @@ public class BatchDelayTest extends ActiveMQTestBase {
|
|||
|
||||
final String foo = "foo";
|
||||
|
||||
session.createQueue(foo, foo);
|
||||
session.createQueue(foo, RoutingType.ANYCAST, foo);
|
||||
|
||||
ClientProducer prod = session.createProducer(foo);
|
||||
|
||||
|
@ -93,7 +94,7 @@ public class BatchDelayTest extends ActiveMQTestBase {
|
|||
|
||||
final String foo = "foo";
|
||||
|
||||
session.createQueue(foo, foo);
|
||||
session.createQueue(foo, RoutingType.ANYCAST, foo);
|
||||
|
||||
ClientProducer prod = session.createProducer(foo);
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
|
|||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.Before;
|
||||
|
@ -70,7 +71,7 @@ public class DirectDeliverTest extends ActiveMQTestBase {
|
|||
|
||||
ClientSession session = sf.createSession();
|
||||
|
||||
session.createQueue(foo, foo);
|
||||
session.createQueue(foo, RoutingType.ANYCAST, foo);
|
||||
|
||||
Binding binding = server.getPostOffice().getBinding(new SimpleString(foo));
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
|||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -64,7 +65,7 @@ public class GracefulShutdownTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
// confirm we can still do work on the original connection even though the server is stopping
|
||||
session.createQueue("testAddress", "testQueue");
|
||||
session.createQueue("testAddress", RoutingType.ANYCAST, "testQueue");
|
||||
ClientProducer producer = session.createProducer("testAddress");
|
||||
producer.send(session.createMessage(true));
|
||||
session.start();
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
|
|||
import org.apache.activemq.artemis.core.security.Role;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
|
@ -97,20 +98,20 @@ public class ResourceLimitTest extends ActiveMQTestBase {
|
|||
ServerLocator locator = addServerLocator(createNonHALocator(false));
|
||||
ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
|
||||
ClientSession clientSession = clientSessionFactory.createSession("myUser", "password", false, true, true, false, 0);
|
||||
clientSession.createQueue("address", "queue");
|
||||
clientSession.createQueue("address", RoutingType.ANYCAST, "queue");
|
||||
|
||||
try {
|
||||
clientSession.createQueue("address", "anotherQueue");
|
||||
clientSession.createQueue("address", RoutingType.ANYCAST, "anotherQueue");
|
||||
} catch (Exception e) {
|
||||
assertTrue(e instanceof ActiveMQSessionCreationException);
|
||||
}
|
||||
|
||||
clientSession.deleteQueue("queue");
|
||||
|
||||
clientSession.createQueue("address", "queue");
|
||||
clientSession.createQueue("address", RoutingType.ANYCAST, "queue");
|
||||
|
||||
try {
|
||||
clientSession.createQueue("address", "anotherQueue");
|
||||
clientSession.createQueue("address", RoutingType.ANYCAST, "anotherQueue");
|
||||
} catch (Exception e) {
|
||||
assertTrue(e instanceof ActiveMQSessionCreationException);
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
|||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
||||
import org.apache.activemq.artemis.ra.ActiveMQRAXAResource;
|
||||
|
@ -116,7 +117,7 @@ public class BasicXaTest extends ActiveMQTestBase {
|
|||
|
||||
ClientSession session = addClientSession(factory.createSession(true, false, false));
|
||||
|
||||
session.createQueue("Test", "Test");
|
||||
session.createQueue("Test", RoutingType.ANYCAST, "Test");
|
||||
|
||||
ClientProducer prod = session.createProducer("Test");
|
||||
|
||||
|
@ -137,7 +138,7 @@ public class BasicXaTest extends ActiveMQTestBase {
|
|||
|
||||
ClientSession session = addClientSession(factory.createSession(false, true, true));
|
||||
|
||||
session.createQueue("Test", "Test");
|
||||
session.createQueue("Test", RoutingType.ANYCAST, "Test");
|
||||
|
||||
ClientProducer prod = session.createProducer("Test");
|
||||
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
|
||||
|
@ -381,7 +382,7 @@ public class XaTimeoutTest extends ActiveMQTestBase {
|
|||
simpleTXSession.commit();
|
||||
|
||||
// This test needs 2 queues
|
||||
simpleTXSession.createQueue(outQueue, outQueue);
|
||||
simpleTXSession.createQueue(outQueue, RoutingType.MULTICAST, outQueue);
|
||||
|
||||
simpleTXSession.close();
|
||||
}
|
||||
|
|
|
@ -1005,6 +1005,18 @@ public class MessageHeaderTest extends MessageHeaderTestBase {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createQueue(SimpleString address,
|
||||
RoutingType routingType,
|
||||
SimpleString queueName,
|
||||
SimpleString filter,
|
||||
boolean durable,
|
||||
boolean autoCreated,
|
||||
int maxConsumers,
|
||||
boolean deleteOnNoConsumers) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a <em>non-temporary</em>queue.
|
||||
*
|
||||
|
@ -1023,6 +1035,18 @@ public class MessageHeaderTest extends MessageHeaderTestBase {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createQueue(String address,
|
||||
RoutingType routingType,
|
||||
String queueName,
|
||||
String filter,
|
||||
boolean durable,
|
||||
boolean autoCreated,
|
||||
int maxConsumers,
|
||||
boolean deleteOnNoConsumers) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a <em>temporary</em> queue.
|
||||
*
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -67,7 +68,7 @@ public class SendStressTest extends ActiveMQTestBase {
|
|||
|
||||
session = sf.createSession(false, false);
|
||||
|
||||
session.createQueue("address", "queue");
|
||||
session.createQueue("address", RoutingType.MULTICAST, "queue");
|
||||
|
||||
ClientProducer producer = session.createProducer("address");
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.junit.Test;
|
||||
|
@ -66,7 +67,7 @@ public class JournalRestartStressTest extends ActiveMQTestBase {
|
|||
ClientSession session = sf.createSession(true, true);
|
||||
|
||||
try {
|
||||
session.createQueue("slow-queue", "slow-queue");
|
||||
session.createQueue("slow-queue", RoutingType.MULTICAST, "slow-queue");
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue