ARTEMIS-4212 fix sending msgs to address w/mismatching routing types

When sending, for example, to a predefined anycast address and queue
from a multicast (JMS topic) producer, the routed count on the address
is incremented, but the message count on the matching queue is not. No
indication is given at the client end that the messages failed to get
routed - the messages are just silently dropped.

Fixing this problem requires a slight semantic change. The broker is now
more strict in what it allows specifically with regards to
auto-creation. If, for example, a JMS application attempts to send a
message to a topic and the corresponding multicast address doesn't exist
already or the broker cannot automatically create it or update it then
sending the message will fail.

Also, part of this commit moves a chunk of auto-create logic into
ServerSession and adds an enum for auto-create results. Aside from
helping fix this specific issue this can serve as a foundation for
de-duplicating the auto-create logic spread across many of the protocol
implementations.
This commit is contained in:
Justin Bertram 2023-03-17 16:44:05 -05:00
parent 101eabdda8
commit 74fa4ca758
47 changed files with 735 additions and 232 deletions

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core;
public enum AutoCreateResult {
EXISTED, CREATED, UPDATED, NOT_FOUND;
}

View File

@ -76,6 +76,7 @@ public class QueueConfiguration implements Serializable {
public static final String INTERNAL = "internal";
public static final String TRANSIENT = "transient";
public static final String AUTO_CREATED = "auto-created";
public static final String FQQN = "fqqn";
private Long id; // internal use
private SimpleString name;
@ -108,10 +109,46 @@ public class QueueConfiguration implements Serializable {
private Boolean internal;
private Boolean _transient;
private Boolean autoCreated;
private Boolean fqqn;
public QueueConfiguration() {
}
public QueueConfiguration(QueueConfiguration o) {
id = o.id;
name = o.name;
address = o.address;
routingType = o.routingType;
filterString = o.filterString;
durable = o.durable;
user = o.user;
maxConsumers = o.maxConsumers;
exclusive = o.exclusive;
groupRebalance = o.groupRebalance;
groupRebalancePauseDispatch = o.groupRebalancePauseDispatch;
groupBuckets = o.groupBuckets;
groupFirstKey = o.groupFirstKey;
lastValue = o.lastValue;
lastValueKey = o.lastValueKey;
nonDestructive = o.nonDestructive;
purgeOnNoConsumers = o.purgeOnNoConsumers;
enabled = o.enabled;
consumersBeforeDispatch = o.consumersBeforeDispatch;
delayBeforeDispatch = o.delayBeforeDispatch;
consumerPriority = o.consumerPriority;
autoDelete = o.autoDelete;
autoDeleteDelay = o.autoDeleteDelay;
autoDeleteMessageCount = o.autoDeleteMessageCount;
ringSize = o.ringSize;
configurationManaged = o.configurationManaged;
temporary = o.temporary;
autoCreateAddress = o.autoCreateAddress;
internal = o.internal;
_transient = o._transient;
autoCreated = o.autoCreated;
fqqn = o.fqqn;
}
/**
* Instantiate this object and invoke {@link #setName(SimpleString)}
*
@ -261,7 +298,7 @@ public class QueueConfiguration implements Serializable {
}
/**
* Set the name. If the fully-qualified queue name is used then it will be parsed and the corresponding values for
* Set the address. If the fully-qualified queue name is used then it will be parsed and the corresponding values for
* {@code address} and {@code name} will be set automatically. For example if "myAddress::myQueue" is passed then the
* resulting value for {@code address} will be "myAddress" and the value for {@code name} will be "myQueue".
*
@ -272,6 +309,7 @@ public class QueueConfiguration implements Serializable {
if (CompositeAddress.isFullyQualified(address)) {
this.name = CompositeAddress.extractQueueName(address);
this.address = CompositeAddress.extractAddressName(address);
this.fqqn = Boolean.TRUE;
} else {
this.address = address;
}
@ -301,6 +339,7 @@ public class QueueConfiguration implements Serializable {
if (CompositeAddress.isFullyQualified(name)) {
this.name = CompositeAddress.extractQueueName(name);
this.address = CompositeAddress.extractAddressName(name);
this.fqqn = Boolean.TRUE;
} else {
this.name = name;
}
@ -607,6 +646,16 @@ public class QueueConfiguration implements Serializable {
return this;
}
/**
* Based on if the name or address uses FQQN when set
*
* defaults to {@code false}
* @return
*/
public Boolean isFqqn() {
return fqqn == null ? Boolean.FALSE : fqqn;
}
/**
* This method returns a JSON-formatted {@code String} representation of this {@code QueueConfiguration}. It is a
* simple collection of key/value pairs. The keys used are referenced in {@link #set(String, String)}.
@ -709,6 +758,9 @@ public class QueueConfiguration implements Serializable {
if (isAutoCreated() != null) {
builder.add(AUTO_CREATED, isAutoCreated());
}
if (isFqqn() != null) {
builder.add(FQQN, isFqqn());
}
return builder.build().toString();
}
@ -807,6 +859,8 @@ public class QueueConfiguration implements Serializable {
return false;
if (!Objects.equals(autoCreated, that.autoCreated))
return false;
if (!Objects.equals(fqqn, that.fqqn))
return false;
return true;
}
@ -844,6 +898,7 @@ public class QueueConfiguration implements Serializable {
result = 31 * result + Objects.hashCode(internal);
result = 31 * result + Objects.hashCode(_transient);
result = 31 * result + Objects.hashCode(autoCreated);
result = 31 * result + Objects.hashCode(fqqn);
return result;
}
@ -880,6 +935,7 @@ public class QueueConfiguration implements Serializable {
+ ", autoCreateAddress=" + autoCreateAddress
+ ", internal=" + internal
+ ", transient=" + _transient
+ ", autoCreated=" + autoCreated + ']';
+ ", autoCreated=" + autoCreated
+ ", fqqn=" + fqqn + ']';
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.api.core;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Test;
@ -39,4 +41,34 @@ public class QueueConfigurationTest {
queueConfiguration.set(QueueConfiguration.GROUP_REBALANCE_PAUSE_DISPATCH, Boolean.toString(false));
Assert.assertEquals(false, queueConfiguration.isGroupRebalancePauseDispatch());
}
@Test
public void testFqqn() {
final SimpleString ADDRESS = RandomUtil.randomSimpleString();
final SimpleString QUEUE = RandomUtil.randomSimpleString();
QueueConfiguration queueConfiguration = new QueueConfiguration(CompositeAddress.toFullyQualified(ADDRESS, QUEUE));
Assert.assertEquals(ADDRESS, queueConfiguration.getAddress());
Assert.assertEquals(QUEUE, queueConfiguration.getName());
Assert.assertTrue(queueConfiguration.isFqqn());
}
@Test
public void testFqqnNegative() {
final SimpleString ADDRESS = RandomUtil.randomSimpleString();
final SimpleString QUEUE = RandomUtil.randomSimpleString();
QueueConfiguration queueConfiguration = new QueueConfiguration(QUEUE).setAddress(ADDRESS);
Assert.assertEquals(ADDRESS, queueConfiguration.getAddress());
Assert.assertEquals(QUEUE, queueConfiguration.getName());
Assert.assertFalse(queueConfiguration.isFqqn());
}
@Test
public void testFqqnViaAddress() {
final SimpleString ADDRESS = RandomUtil.randomSimpleString();
final SimpleString QUEUE = RandomUtil.randomSimpleString();
QueueConfiguration queueConfiguration = new QueueConfiguration(RandomUtil.randomSimpleString()).setAddress(CompositeAddress.toFullyQualified(ADDRESS, QUEUE));
Assert.assertEquals(ADDRESS, queueConfiguration.getAddress());
Assert.assertEquals(QUEUE, queueConfiguration.getName());
Assert.assertTrue(queueConfiguration.isFqqn());
}
}

View File

@ -94,6 +94,10 @@ public interface ClientSession extends XAResource, AutoCloseable {
Integer getDefaultConsumersBeforeDispatch();
Long getDefaultDelayBeforeDispatch();
boolean isSupportsMulticast();
boolean isSupportsAnycast();
}
/**

View File

@ -48,6 +48,10 @@ public class AddressQueryImpl implements ClientSession.AddressQuery {
private final Long defaultDelayBeforeDispatch;
private final boolean supportsMulticast;
private final boolean supportsAnycast;
public AddressQueryImpl(final boolean exists,
final List<SimpleString> queueNames,
final boolean autoCreateQueues,
@ -59,7 +63,9 @@ public class AddressQueryImpl implements ClientSession.AddressQuery {
final SimpleString defaultLastValueKey,
final Boolean defaultNonDestructive,
final Integer defaultConsumersBeforeDispatch,
final Long defaultDelayBeforeDispatch) {
final Long defaultDelayBeforeDispatch,
final boolean supportsMulticast,
final boolean supportsAnycast) {
this.exists = exists;
this.queueNames = new ArrayList<>(queueNames);
this.autoCreateQueues = autoCreateQueues;
@ -72,6 +78,8 @@ public class AddressQueryImpl implements ClientSession.AddressQuery {
this.defaultNonDestructive = defaultNonDestructive;
this.defaultConsumersBeforeDispatch = defaultConsumersBeforeDispatch;
this.defaultDelayBeforeDispatch = defaultDelayBeforeDispatch;
this.supportsMulticast = supportsMulticast;
this.supportsAnycast = supportsAnycast;
}
@Override
@ -133,4 +141,14 @@ public class AddressQueryImpl implements ClientSession.AddressQuery {
public Long getDefaultDelayBeforeDispatch() {
return defaultDelayBeforeDispatch;
}
@Override
public boolean isSupportsMulticast() {
return supportsMulticast;
}
@Override
public boolean isSupportsAnycast() {
return supportsAnycast;
}
}

View File

@ -87,6 +87,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBin
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V5;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage_V2;
@ -429,22 +430,26 @@ public class ActiveMQSessionContext extends SessionContext {
@Override
public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException {
if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4, getServerVersion())) {
if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V5, getServerVersion())) {
Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V5);
SessionBindingQueryResponseMessage_V5 response = (SessionBindingQueryResponseMessage_V5) packet;
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), response.isAutoCreateAddresses(), response.isDefaultPurgeOnNoConsumers(), response.getDefaultMaxConsumers(), response.isDefaultExclusive(), response.isDefaultLastValue(), response.getDefaultLastValueKey(), response.isDefaultNonDestructive(), response.getDefaultConsumersBeforeDispatch(), response.getDefaultDelayBeforeDispatch(), response.isSupportsMulticast(), response.isSupportsAnycast());
} else if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4, getServerVersion())) {
Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V4);
SessionBindingQueryResponseMessage_V4 response = (SessionBindingQueryResponseMessage_V4) packet;
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), response.isAutoCreateAddresses(), response.isDefaultPurgeOnNoConsumers(), response.getDefaultMaxConsumers(), response.isDefaultExclusive(), response.isDefaultLastValue(), response.getDefaultLastValueKey(), response.isDefaultNonDestructive(), response.getDefaultConsumersBeforeDispatch(), response.getDefaultDelayBeforeDispatch());
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), response.isAutoCreateAddresses(), response.isDefaultPurgeOnNoConsumers(), response.getDefaultMaxConsumers(), response.isDefaultExclusive(), response.isDefaultLastValue(), response.getDefaultLastValueKey(), response.isDefaultNonDestructive(), response.getDefaultConsumersBeforeDispatch(), response.getDefaultDelayBeforeDispatch(), true, true);
} else if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3, getServerVersion())) {
Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V3);
SessionBindingQueryResponseMessage_V3 response = (SessionBindingQueryResponseMessage_V3) packet;
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), response.isAutoCreateAddresses(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null, null, null, null, null);
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), response.isAutoCreateAddresses(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null, null, null, null, null, true, true);
} else if (sessionChannel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2, getServerVersion())) {
Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V2);
SessionBindingQueryResponseMessage_V2 response = (SessionBindingQueryResponseMessage_V2) packet;
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null, null, null, null, null);
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateQueues(), false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null, null, null, null, null, true, true);
} else {
Packet packet = sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP);
SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) packet;
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null, null, null, null, null);
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), null, null, null, null, null, null, true, true);
}
}

View File

@ -193,6 +193,8 @@ public final class ChannelImpl implements Channel {
case PacketImpl.CREATESESSION_V2:
case PacketImpl.DISCONNECT_V3:
return version >= PacketImpl.ARTEMIS_2_18_0_VERSION;
case PacketImpl.SESS_BINDINGQUERY_RESP_V5:
return version >= PacketImpl.ARTEMIS_2_29_0_VERSION;
default:
return true;
}

View File

@ -59,6 +59,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBin
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V5;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage_V2;
@ -132,6 +133,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V3;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V4;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP_V5;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_COMMIT;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE;
@ -325,6 +327,10 @@ public abstract class PacketDecoder implements Serializable {
packet = new SessionBindingQueryResponseMessage_V4();
break;
}
case SESS_BINDINGQUERY_RESP_V5: {
packet = new SessionBindingQueryResponseMessage_V5();
break;
}
case SESS_XA_START: {
packet = new SessionXAStartMessage();
break;

View File

@ -50,6 +50,9 @@ public class PacketImpl implements Packet {
// 2.28.0
public static final int ARTEMIS_2_28_0_VERSION = 134;
// 2.29.0
public static final int ARTEMIS_2_29_0_VERSION = 135;
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
public static final SimpleString OLD_TEMP_QUEUE_PREFIX = new SimpleString("jms.tempqueue.");
public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic.");
@ -300,6 +303,8 @@ public class PacketImpl implements Packet {
public static final byte REMOVE_PRODUCER = -21;
public static final byte SESS_BINDINGQUERY_RESP_V5 = -22;
public PacketImpl(final byte type) {
this.type = type;
}

View File

@ -24,21 +24,21 @@ import org.apache.activemq.artemis.utils.BufferHelper;
public class SessionBindingQueryResponseMessage_V4 extends SessionBindingQueryResponseMessage_V3 {
private boolean defaultPurgeOnNoConsumers;
protected boolean defaultPurgeOnNoConsumers;
private int defaultMaxConsumers;
protected int defaultMaxConsumers;
private Boolean defaultExclusive;
protected Boolean defaultExclusive;
private Boolean defaultLastValue;
protected Boolean defaultLastValue;
private SimpleString defaultLastValueKey;
protected SimpleString defaultLastValueKey;
private Boolean defaultNonDestructive;
protected Boolean defaultNonDestructive;
private Integer defaultConsumersBeforeDispatch;
protected Integer defaultConsumersBeforeDispatch;
private Long defaultDelayBeforeDispatch;
protected Long defaultDelayBeforeDispatch;
public SessionBindingQueryResponseMessage_V4(final boolean exists,
final List<SimpleString> queueNames,
@ -83,6 +83,10 @@ public class SessionBindingQueryResponseMessage_V4 extends SessionBindingQueryRe
super(SESS_BINDINGQUERY_RESP_V4);
}
public SessionBindingQueryResponseMessage_V4(byte v) {
super(v);
}
public boolean isDefaultPurgeOnNoConsumers() {
return defaultPurgeOnNoConsumers;
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
public class SessionBindingQueryResponseMessage_V5 extends SessionBindingQueryResponseMessage_V4 {
protected boolean supportsMulticast;
protected boolean supportsAnycast;
public SessionBindingQueryResponseMessage_V5(final boolean exists,
final List<SimpleString> queueNames,
final boolean autoCreateQueues,
final boolean autoCreateAddresses,
final boolean defaultPurgeOnNoConsumers,
final int defaultMaxConsumers,
final Boolean defaultExclusive,
final Boolean defaultLastValue,
final SimpleString defaultLastValueKey,
final Boolean defaultNonDestructive,
final Integer defaultConsumersBeforeDispatch,
final Long defaultDelayBeforeDispatch,
final boolean supportsMulticast,
final boolean supportsAnycast) {
super(SESS_BINDINGQUERY_RESP_V5);
this.exists = exists;
this.queueNames = queueNames;
this.autoCreateQueues = autoCreateQueues;
this.autoCreateAddresses = autoCreateAddresses;
this.defaultPurgeOnNoConsumers = defaultPurgeOnNoConsumers;
this.defaultMaxConsumers = defaultMaxConsumers;
this.defaultExclusive = defaultExclusive;
this.defaultLastValue = defaultLastValue;
this.defaultLastValueKey = defaultLastValueKey;
this.defaultNonDestructive = defaultNonDestructive;
this.defaultConsumersBeforeDispatch = defaultConsumersBeforeDispatch;
this.defaultDelayBeforeDispatch = defaultDelayBeforeDispatch;
this.supportsMulticast = supportsMulticast;
this.supportsAnycast = supportsAnycast;
}
public SessionBindingQueryResponseMessage_V5() {
super(SESS_BINDINGQUERY_RESP_V5);
}
public Boolean isSupportsMulticast() {
return supportsMulticast;
}
public Boolean isSupportsAnycast() {
return supportsAnycast;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
buffer.writeBoolean(supportsMulticast);
buffer.writeBoolean(supportsAnycast);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
if (buffer.readableBytes() > 0) {
supportsMulticast = buffer.readBoolean();
supportsAnycast = buffer.readBoolean();
}
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + (supportsMulticast ? 1231 : 1237);
result = prime * result + (supportsAnycast ? 1231 : 1237);
return result;
}
@Override
protected String getPacketString() {
StringBuffer buff = new StringBuffer(super.getPacketString());
buff.append(", supportsMulticast=" + supportsMulticast);
buff.append(", supportsAnycast=" + supportsAnycast);
return buff.toString();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!super.equals(obj)) {
return false;
}
if (!(obj instanceof SessionBindingQueryResponseMessage_V5)) {
return false;
}
SessionBindingQueryResponseMessage_V5 other = (SessionBindingQueryResponseMessage_V5) obj;
if (supportsMulticast != other.supportsMulticast) {
return false;
}
if (supportsAnycast != other.supportsAnycast) {
return false;
}
return true;
}
}

View File

@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion}
activemq.version.microVersion=${activemq.version.microVersion}
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
activemq.version.versionTag=${activemq.version.versionTag}
activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130,131,132,133,134
activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130,131,132,133,134,135

View File

@ -404,6 +404,10 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw new InvalidDestinationException("Destination " + address + " does not exist, autoCreateAddresses=" + addressQuery.isAutoCreateAddresses());
}
}
} else {
if ((!addressQuery.isSupportsMulticast() && !destination.isQueue()) || (!addressQuery.isSupportsAnycast() && destination.isQueue())) {
throw new InvalidDestinationException("Destination " + address + " exists, but does not support " + (destination.isQueue() ? RoutingType.ANYCAST.name() : RoutingType.MULTICAST.name()) + " routing");
}
}
// Second we create the queue, the address would have existed or successfully created.

View File

@ -45,6 +45,9 @@ public class EmbeddedActiveMQResourceCustomConfigurationTest {
@Rule
public RuleChain rulechain = RuleChain.outerRule(server);
public EmbeddedActiveMQResourceCustomConfigurationTest() throws Exception {
}
@After
public void tear() {
server.stop();

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.AutoCreateResult;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -329,7 +330,8 @@ public class AMQPSessionCallback implements SessionCallback {
public boolean checkAddressAndAutocreateIfPossible(SimpleString address, RoutingType routingType) throws Exception {
return serverSession.checkAutoCreate(address, routingType);
AutoCreateResult autoCreateResult = serverSession.checkAutoCreate(new QueueConfiguration(address).setRoutingType(routingType));
return autoCreateResult != AutoCreateResult.NOT_FOUND;
}
public AddressQueryResult addressQuery(SimpleString addressName,
@ -469,7 +471,7 @@ public class AMQPSessionCallback implements SessionCallback {
//here check queue-autocreation
if (!checkAddressAndAutocreateIfPossible(address, routingType)) {
ActiveMQException e = ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
ActiveMQException e = ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(address.toString());
if (transaction != null) {
transaction.markAsRollbackOnly(e);
}
@ -767,7 +769,6 @@ public class AMQPSessionCallback implements SessionCallback {
return this.transactionHandler;
}
class AddressQueryCache<T> {
SimpleString address;
T result;

View File

@ -39,8 +39,8 @@ public interface ActiveMQAMQPProtocolMessageBundle {
@Message(id = 119001, value = "error creating temporary queue, {}")
ActiveMQAMQPInternalErrorException errorCreatingTemporaryQueue(String message);
@Message(id = 119002, value = "target address does not exist")
ActiveMQAMQPNotFoundException addressDoesntExist();
@Message(id = 119002, value = "target address {} does not exist")
ActiveMQAMQPNotFoundException addressDoesntExist(String address);
@Message(id = 119003, value = "error finding temporary queue, {}")
ActiveMQAMQPNotFoundException errorFindingTemporaryQueue(String message);

View File

@ -119,7 +119,7 @@ public class ProtonServerReceiverContext extends ProtonAbstractReceiver {
defRoutingType = getRoutingType(target.getCapabilities(), address);
try {
if (!sessionSPI.checkAddressAndAutocreateIfPossible(address, defRoutingType)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(address.toString());
}
} catch (ActiveMQAMQPNotFoundException e) {
throw e;
@ -157,6 +157,30 @@ public class ProtonServerReceiverContext extends ProtonAbstractReceiver {
}
private RoutingType getRoutingType(Symbol[] symbols, SimpleString address) {
RoutingType explicitRoutingType = getExplicitRoutingType(symbols);
if (explicitRoutingType != null) {
return explicitRoutingType;
} else {
final AddressInfo addressInfo = sessionSPI.getAddress(address);
/*
* If we're dealing with an *existing* address that has just one routing-type simply use that.
* This allows "bare" AMQP clients (which have no built-in routing semantics) to send messages
* wherever they want in this case because the routing ambiguity is eliminated.
*/
if (addressInfo != null && addressInfo.getRoutingTypes().size() == 1) {
return addressInfo.getRoutingType();
} else {
return getDefaultRoutingType(address);
}
}
}
private RoutingType getDefaultRoutingType(SimpleString address) {
RoutingType defaultRoutingType = sessionSPI.getDefaultRoutingType(address);
return defaultRoutingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : defaultRoutingType;
}
private RoutingType getExplicitRoutingType(Symbol[] symbols) {
if (symbols != null) {
for (Symbol symbol : symbols) {
if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
@ -166,15 +190,7 @@ public class ProtonServerReceiverContext extends ProtonAbstractReceiver {
}
}
}
final AddressInfo addressInfo = sessionSPI.getAddress(address);
if (addressInfo != null && !addressInfo.getRoutingTypes().isEmpty()) {
if (addressInfo.getRoutingTypes().size() == 1 && addressInfo.getRoutingType() == RoutingType.MULTICAST) {
return RoutingType.MULTICAST;
}
}
RoutingType defaultRoutingType = sessionSPI.getDefaultRoutingType(address);
defaultRoutingType = defaultRoutingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : defaultRoutingType;
return defaultRoutingType;
return null;
}
@Override

View File

@ -71,7 +71,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext {
public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException {
SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) getSessionChannel().sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP);
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultLastValue(), ActiveMQDefaultConfiguration.getDefaultLastValueKey(), ActiveMQDefaultConfiguration.getDefaultNonDestructive(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch());
return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultLastValue(), ActiveMQDefaultConfiguration.getDefaultLastValueKey(), ActiveMQDefaultConfiguration.getDefaultNonDestructive(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), true, true);
}
@Override

View File

@ -16,7 +16,15 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire;
import javax.jms.IllegalStateException;
import javax.jms.InvalidClientIDException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSSecurityException;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
@ -29,23 +37,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import javax.jms.IllegalStateException;
import javax.jms.InvalidClientIDException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSSecurityException;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.AutoCreateResult;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -64,19 +63,14 @@ import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.TempQueueObserver;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
@ -136,7 +130,6 @@ import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.util.ByteSequence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
/**
* Represents an activemq connection.
@ -906,29 +899,18 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
SimpleString qName = SimpleString.toSimpleString(dest.getPhysicalName());
if (server.locateQueue(qName) == null) {
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(dest.getPhysicalName());
if (dest.isQueue() && (addressSettings.isAutoCreateQueues() || dest.isTemporary())) {
try {
internalSession.createQueue(new QueueConfiguration(qName).setRoutingType(RoutingType.ANYCAST).setDurable(!dest.isTemporary()).setTemporary(dest.isTemporary()).setAutoCreated(!dest.isTemporary()));
created = true;
} catch (ActiveMQQueueExistsException exists) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
}
} else if (dest.isTopic() && (addressSettings.isAutoCreateAddresses() || dest.isTemporary())) {
try {
AddressInfo addressInfo = new AddressInfo(qName, RoutingType.MULTICAST);
if (AdvisorySupport.isAdvisoryTopic(dest) && protocolManager.isSuppressInternalManagementObjects()) {
addressInfo.setInternal(true);
}
if (internalSession.getAddress(addressInfo.getName()) == null) {
internalSession.createAddress(addressInfo, !dest.isTemporary());
created = true;
}
} catch (ActiveMQAddressExistsException exists) {
// The address may have been created by another thread in the mean time. Catch and do nothing.
}
AutoCreateResult autoCreateResult = internalSession.checkAutoCreate(new QueueConfiguration(qName)
.setRoutingType(dest.isQueue() ? RoutingType.ANYCAST : RoutingType.MULTICAST)
.setDurable(!dest.isTemporary())
.setTemporary(dest.isTemporary()));
if (autoCreateResult == AutoCreateResult.CREATED) {
created = true;
if (AdvisorySupport.isAdvisoryTopic(dest) && protocolManager.isSuppressInternalManagementObjects()) {
internalSession.getAddress(qName).setInternal(true);
}
} else if (autoCreateResult == AutoCreateResult.NOT_FOUND) {
throw new InvalidDestinationException(dest.getDestinationTypeAsString() + " " + dest.getPhysicalName() + " does not exist.");
}
if (dest.isTemporary()) {
@ -1172,23 +1154,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
/**
* Checks to see if this destination exists. If it does not throw an invalid destination exception.
*
* @param destination
*/
private void validateDestination(ActiveMQDestination destination) throws Exception {
if (destination.isQueue()) {
SimpleString physicalName = new SimpleString(destination.getPhysicalName());
QueueQueryResult queue = server.queueQuery(physicalName);
AddressQueryResult address = server.addressQuery(physicalName);
if (!address.isExists() && !queue.isAutoCreateQueues()) {
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
}
}
}
private void propagateLastSequenceId(SessionState sessionState, long lastDeliveredSequenceId) {
for (ConsumerState consumerState : sessionState.getConsumerStates()) {
consumerState.getInfo().setLastDeliveredSequenceId(lastDeliveredSequenceId);
@ -1270,11 +1235,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
ActiveMQDestination destination = info.getDestination();
if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
if (destination.isQueue()) {
OpenWireConnection.this.validateDestination(destination);
}
DestinationInfo destInfo = new DestinationInfo(getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
OpenWireConnection.this.addDestination(destInfo);
OpenWireConnection.this.addDestination(new DestinationInfo(getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination));
}
ss.addProducer(info);

View File

@ -16,25 +16,23 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD;
import javax.jms.InvalidDestinationException;
import javax.jms.ResourceAllocationException;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.InvalidDestinationException;
import javax.jms.ResourceAllocationException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.AutoCreateResult;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
@ -42,7 +40,6 @@ import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
@ -66,7 +63,8 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.openwire.OpenWireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD;
public class AMQSession implements SessionCallback {
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -230,39 +228,26 @@ public class AMQSession implements SessionCallback {
boolean hasQueue = true;
if (!connection.containsKnownDestination(queueName)) {
QueueQueryResult queueQuery = server.queueQuery(queueName);
try {
if (!queueQuery.isExists()) {
if (queueQuery.isAutoCreateQueues()) {
SimpleString queueNameToUse = queueName;
SimpleString addressToUse = queueName;
RoutingType routingTypeToUse = RoutingType.ANYCAST;
if (CompositeAddress.isFullyQualified(queueName.toString())) {
addressToUse = CompositeAddress.extractAddressName(queueName);
queueNameToUse = CompositeAddress.extractQueueName(queueName);
AddressInfo addressInfo = server.getAddressInfo(addressToUse);
if (addressInfo != null) {
routingTypeToUse = addressInfo.getRoutingType();
} else {
AddressSettings as = server.getAddressSettingsRepository().getMatch(addressToUse.toString());
routingTypeToUse = as.getDefaultAddressRoutingType();
}
}
coreSession.createQueue(new QueueConfiguration(queueNameToUse).setAddress(addressToUse).setRoutingType(routingTypeToUse).setTemporary(isTemporary).setAutoCreated(true).setFilterString(filter));
connection.addKnownDestination(queueName);
} else {
if (server.getAddressInfo(queueName) == null) {
//Address does not exist and will not get autocreated
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
}
hasQueue = false;
}
RoutingType routingTypeToUse = RoutingType.ANYCAST;
if (CompositeAddress.isFullyQualified(queueName.toString())) {
SimpleString addressToUse = CompositeAddress.extractAddressName(queueName);
AddressInfo addressInfo = server.getAddressInfo(addressToUse);
if (addressInfo != null) {
routingTypeToUse = addressInfo.getRoutingType();
} else {
AddressSettings as = server.getAddressSettingsRepository().getMatch(addressToUse.toString());
routingTypeToUse = as.getDefaultAddressRoutingType();
}
} catch (ActiveMQQueueExistsException e) {
// In case another thread created the queue before us but after we did the binding query
hasQueue = true;
}
AutoCreateResult autoCreateResult = coreSession.checkAutoCreate(new QueueConfiguration(queueName)
.setAddress(queueName)
.setRoutingType(routingTypeToUse)
.setTemporary(isTemporary)
.setFilterString(filter));
if (autoCreateResult == AutoCreateResult.NOT_FOUND) {
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
}
connection.addKnownDestination(queueName);
}
return hasQueue;
}

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
import org.apache.activemq.artemis.api.core.AutoCreateResult;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
@ -1252,14 +1253,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
private AddressInfo checkAddress(RoutingContext context, SimpleString address) throws Exception {
AddressInfo addressInfo = addressManager.getAddressInfo(address);
if (addressInfo == null && context.getServerSession() != null) {
if (context.getServerSession().checkAutoCreate(address, context.getRoutingType())) {
addressInfo = addressManager.getAddressInfo(address);
} else {
AutoCreateResult autoCreateResult = context.getServerSession().checkAutoCreate(new QueueConfiguration(address).setRoutingType(context.getRoutingType()));
if (autoCreateResult == AutoCreateResult.NOT_FOUND) {
ActiveMQException ex = ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
if (context.getTransaction() != null) {
context.getTransaction().markAsRollbackOnly(ex);
}
throw ex;
} else {
addressInfo = addressManager.getAddressInfo(address);
}
}
return addressInfo;
@ -1268,7 +1270,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
Bindings simpleRoute(SimpleString address, RoutingContext context, Message message, AddressInfo addressInfo) throws Exception {
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
if (bindings == null && context.getServerSession() != null) {
if (!context.getServerSession().checkAutoCreate(address, context.getRoutingType())) {
AutoCreateResult autoCreateResult = context.getServerSession().checkAutoCreate(new QueueConfiguration(address).setRoutingType(context.getRoutingType()));
if (autoCreateResult == AutoCreateResult.NOT_FOUND) {
ActiveMQException e = ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
Transaction tx = context.getTransaction();
if (tx != null) {

View File

@ -58,6 +58,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBin
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V5;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
@ -470,7 +471,9 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
}
if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) {
if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V5)) {
response = new SessionBindingQueryResponseMessage_V5(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers(), result.isDefaultExclusive(), result.isDefaultLastValue(), result.getDefaultLastValueKey(), result.isDefaultNonDestructive(), result.getDefaultConsumersBeforeDispatch(), result.getDefaultDelayBeforeDispatch(), result.getAddressInfo() == null ? false : result.getAddressInfo().getRoutingTypes().contains(RoutingType.MULTICAST), result.getAddressInfo() == null ? false : result.getAddressInfo().getRoutingTypes().contains(RoutingType.ANYCAST));
} else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) {
response = new SessionBindingQueryResponseMessage_V4(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers(), result.isDefaultExclusive(), result.isDefaultLastValue(), result.getDefaultLastValueKey(), result.isDefaultNonDestructive(), result.getDefaultConsumersBeforeDispatch(), result.getDefaultDelayBeforeDispatch());
} else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) {
response = new SessionBindingQueryResponseMessage_V3(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses());

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.api.core.AutoCreateResult;
import org.apache.activemq.artemis.json.JsonArrayBuilder;
import javax.transaction.xa.Xid;
import java.util.Collection;
@ -111,7 +112,7 @@ public interface ServerSession extends SecurityAuth {
void addCloseable(Closeable closeable);
boolean checkAutoCreate(SimpleString address, RoutingType routingType) throws Exception;
AutoCreateResult checkAutoCreate(QueueConfiguration queueConfiguration) throws Exception;
ServerConsumer createConsumer(long consumerID,
SimpleString queueName,

View File

@ -16,15 +16,10 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.core.management.impl.view.ProducerField;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.json.JsonArrayBuilder;
import org.apache.activemq.artemis.json.JsonObjectBuilder;
import java.security.cert.X509Certificate;
import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
import java.lang.invoke.MethodHandles;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -39,10 +34,13 @@ import java.util.concurrent.Executor;
import org.apache.activemq.artemis.Closeable;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.AutoCreateResult;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
@ -54,6 +52,7 @@ import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.management.impl.view.ProducerField;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
@ -61,6 +60,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
@ -94,6 +94,8 @@ import org.apache.activemq.artemis.core.transaction.Transaction.State;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.json.JsonArrayBuilder;
import org.apache.activemq.artemis.json.JsonObjectBuilder;
import org.apache.activemq.artemis.json.JsonValue;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@ -105,7 +107,6 @@ import org.apache.activemq.artemis.utils.PrefixUtil;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
/**
* Server side Session implementation
@ -1746,49 +1747,83 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
return tx;
}
@Override
public boolean checkAutoCreate(SimpleString address, RoutingType routingType) throws Exception {
boolean result;
SimpleString unPrefixedAddress = removePrefix(address);
public AutoCreateResult checkAutoCreate(final QueueConfiguration queueConfig) throws Exception {
AutoCreateResult result;
SimpleString unPrefixedAddress = removePrefix(queueConfig.getAddress());
SimpleString unPrefixedQueue = removePrefix(queueConfig.getName());
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(unPrefixedAddress.toString());
if (routingType == RoutingType.MULTICAST) {
if (server.getAddressInfo(unPrefixedAddress) == null) {
if (addressSettings.isAutoCreateAddresses()) {
try {
createAddress(address, routingType, true);
} catch (ActiveMQAddressExistsException e) {
// The address may have been created by another thread in the mean time. Catch and do nothing.
}
result = true;
} else {
result = false;
/*
* This is only here to maintain backwards compatibility with the previous implementation.
*
* TODO: figure out how to get rid of this
*/
if (queueConfig.getRoutingType() == null) {
return AutoCreateResult.EXISTED;
}
// No matter what routing-type is used the address must exist already or be automatically created.
AddressInfo addressInfo = server.getAddressInfo(unPrefixedAddress);
if (addressInfo == null) {
// The address doesn't exist.
if (addressSettings.isAutoCreateAddresses() || queueConfig.isTemporary()) {
// Try to create the address if possible.
try {
createAddress(queueConfig.getAddress(), queueConfig.getRoutingType(), true).setTemporary(queueConfig.isTemporary());
} catch (ActiveMQAddressExistsException e) {
// The address may have been created by another thread in the mean time. Catch and do nothing.
}
result = AutoCreateResult.CREATED;
} else {
result = true;
}
} else if (routingType == RoutingType.ANYCAST) {
if (server.locateQueue(unPrefixedAddress) == null) {
Bindings bindings = server.getPostOffice().lookupBindingsForAddress(address);
if (bindings != null) {
// this means the address has another queue with a different name, which is fine, we just ignore it on this case
result = true;
} else if (addressSettings.isAutoCreateQueues()) {
try {
createQueue(new QueueConfiguration(address).setRoutingType(routingType).setAutoCreated(true));
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
}
result = true;
} else {
result = false;
}
} else {
result = true;
// If the address doesn't exist and can't be autocreated then return that result immediately.
return AutoCreateResult.NOT_FOUND;
}
} else {
result = true;
// The address exists.
if (addressInfo.getRoutingTypes().contains(queueConfig.getRoutingType())) {
// The existing address supports the requested routing-type.
result = AutoCreateResult.EXISTED;
} else {
// The existing address doesn't support the requested routing-type.
if (addressSettings.isAutoCreateAddresses() || queueConfig.isTemporary()) {
// Try to update the address with the new routing-type if possible.
try {
createAddress(addressInfo.addRoutingType(queueConfig.getRoutingType()), true);
} catch (ActiveMQAddressExistsException e) {
// The address may have been created by another thread in the mean-time. Catch and do nothing.
}
result = AutoCreateResult.UPDATED;
} else {
// If the address exists but doesn't support the requested routing-type and can't be updated with the new routing-type then return that result immediately.
return AutoCreateResult.NOT_FOUND;
}
}
}
if (queueConfig.getRoutingType() == RoutingType.ANYCAST || queueConfig.isFqqn()) {
if (server.locateQueue(unPrefixedQueue) == null) {
// The queue doesn't exist.
Bindings bindings = server.getPostOffice().lookupBindingsForAddress(unPrefixedAddress);
if (bindings != null && !queueConfig.isFqqn()) {
// The address has another queue with a different name, which is fine. Just ignore it.
result = AutoCreateResult.EXISTED;
} else if (addressSettings.isAutoCreateQueues() || queueConfig.isTemporary()) {
// Try to create the queue.
try {
createQueue(new QueueConfiguration(queueConfig).setAutoCreated(true));
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean-time. Catch and do nothing.
}
result = AutoCreateResult.CREATED;
} else {
// The queue doesn't exist, and we can't auto-create it so return that result immediately.
return AutoCreateResult.NOT_FOUND;
}
} else {
// The queue exists.
result = AutoCreateResult.EXISTED;
}
}
return result;

View File

@ -182,7 +182,7 @@
<activemq.version.majorVersion>1</activemq.version.majorVersion>
<activemq.version.minorVersion>0</activemq.version.minorVersion>
<activemq.version.microVersion>0</activemq.version.microVersion>
<activemq.version.incrementingVersion>134,133,132,131,130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
<activemq.version.incrementingVersion>135,134,133,132,131,130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
<activemq.version.versionTag>${project.version}</activemq.version.versionTag>
<ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>

View File

@ -166,8 +166,8 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
Set<TransportConfiguration> acceptors = server.getConfiguration().getAcceptorConfigurations();
for (TransportConfiguration tc : acceptors) {
if (tc.getName().equals(NETTY_ACCEPTOR)) {
tc.getExtraParams().put("anycastPrefix", "anycast://");
tc.getExtraParams().put("multicastPrefix", "multicast://");
tc.getExtraParams().put("anycastPrefix", ANYCAST_PREFIX);
tc.getExtraParams().put("multicastPrefix", MULTICAST_PREFIX);
}
}
}

View File

@ -72,8 +72,8 @@ public class AmqpDurableReceiverReconnectWithMulticastPrefixTest extends JMSClie
@Override
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
params.put("anycastPrefix", "anycast://");
params.put("multicastPrefix", "multicast://");
params.put("anycastPrefix", ANYCAST_PREFIX);
params.put("multicastPrefix", MULTICAST_PREFIX);
}
@Override
@ -96,7 +96,7 @@ public class AmqpDurableReceiverReconnectWithMulticastPrefixTest extends JMSClie
@Test(timeout = 60000)
public void testReattachToDurableNodeAndTryAndReceiveNewlySentMessage() throws Exception {
final String addressName = "test-address";
final String prefixedName = "multicast://" + addressName;
final String prefixedName = MULTICAST_PREFIX + addressName;
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.createConnection());
@ -143,7 +143,7 @@ public class AmqpDurableReceiverReconnectWithMulticastPrefixTest extends JMSClie
@Test(timeout = 60000)
public void testReattachToDurableNodeAndTryAndReceivePreviouslySentMessage() throws Exception {
final String addressName = "test-address";
final String prefixedName = "multicast://" + addressName;
final String prefixedName = MULTICAST_PREFIX + addressName;
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.createConnection());

View File

@ -62,7 +62,7 @@ public class AmqpMessageRoutingTest extends JMSClientTestSupport {
serverControl.createQueue(new QueueConfiguration(queueB).setAddress(addressA).setRoutingType(RoutingType.ANYCAST).toJSON());
serverControl.createQueue(new QueueConfiguration(queueC).setAddress(addressA).setRoutingType(RoutingType.MULTICAST).toJSON());
sendMessages("anycast://" + addressA, 1);
sendMessages(ANYCAST_PREFIX + addressA, 1);
Wait.assertEquals(1, () -> (server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()));
Wait.assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC))::getMessageCount);
@ -100,7 +100,7 @@ public class AmqpMessageRoutingTest extends JMSClientTestSupport {
serverControl.createQueue(new QueueConfiguration(queueB).setAddress(addressA).setRoutingType(RoutingType.MULTICAST).toJSON());
serverControl.createQueue(new QueueConfiguration(queueC).setAddress(addressA).setRoutingType(RoutingType.MULTICAST).toJSON());
sendMessages("multicast://" + addressA, 1);
sendMessages(MULTICAST_PREFIX + addressA, 1);
Wait.assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA))::getMessageCount);
Wait.assertEquals(2, () -> (server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()));

View File

@ -302,7 +302,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
assertNotNull(expectedException);
assertTrue(expectedException.getMessage().contains("amqp:not-found"));
assertTrue(expectedException.getMessage().contains("target address does not exist"));
assertTrue(expectedException.getMessage().contains("target address AnAddressThatDoesNotExist does not exist"));
} finally {
connection.close();
}

View File

@ -185,6 +185,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testMessageDurableTrue() throws Exception {
assertNotNull(server.locateQueue(getQueueName()));
sendMessages(getQueueName(), 1, true);
AmqpClient client = createAmqpClient();
@ -1115,7 +1116,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
assertNotNull(expectedException);
assertTrue(expectedException.getMessage().contains("amqp:not-found"));
assertTrue(expectedException.getMessage().contains("target address does not exist"));
assertTrue(expectedException.getMessage().contains("target address AnAddressThatDoesNotExist does not exist"));
connection.close();
}

View File

@ -44,6 +44,9 @@ public class AmqpTestSupport extends ActiveMQTestBase {
protected static final String BROKER_NAME = "localhost";
protected static final String NETTY_ACCEPTOR = "netty-acceptor";
protected static final String MULTICAST_PREFIX = "multicast://";
protected static final String ANYCAST_PREFIX = "anycast://";
protected String noprivUser = "noprivs";
protected String noprivPass = "noprivs";

View File

@ -74,8 +74,8 @@ public class AutoCreateWithDefaultRoutingTypesTest extends JMSClientTestSupport
@Override
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
params.put("anycastPrefix", "anycast://");
params.put("multicastPrefix", "multicast://");
params.put("anycastPrefix", ANYCAST_PREFIX);
params.put("multicastPrefix", MULTICAST_PREFIX);
}
@Override
@ -272,9 +272,9 @@ public class AutoCreateWithDefaultRoutingTypesTest extends JMSClientTestSupport
final String prefixedName;
if (routingType == RoutingType.ANYCAST) {
prefixedName = "anycast://" + addressName;
prefixedName = ANYCAST_PREFIX + addressName;
} else {
prefixedName = "multicast://" + addressName;
prefixedName = MULTICAST_PREFIX + addressName;
}
AmqpSender sender = session.createSender(prefixedName);
@ -320,9 +320,9 @@ public class AutoCreateWithDefaultRoutingTypesTest extends JMSClientTestSupport
final String prefixedName;
if (routingType == RoutingType.ANYCAST) {
prefixedName = "anycast://" + addressName;
prefixedName = ANYCAST_PREFIX + addressName;
} else {
prefixedName = "multicast://" + addressName;
prefixedName = MULTICAST_PREFIX + addressName;
}
final AmqpReceiver receiver = session.createReceiver(prefixedName);

View File

@ -40,6 +40,16 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport {
SimpleString queue1 = new SimpleString("queue1");
SimpleString queue2 = new SimpleString("queue2");
@Override
protected boolean isAutoCreateQueues() {
return false;
}
@Override
protected boolean isAutoCreateAddresses() {
return false;
}
@Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressSameName() throws Exception {
server.addAddressInfo(new AddressInfo(address, RoutingType.MULTICAST));
@ -64,6 +74,7 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testConsumeWhenOnlyAnycast() throws Exception {
server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(address).setAddress(address).setRoutingType(RoutingType.ANYCAST));
sendMessages(address.toString(), 1);

View File

@ -124,7 +124,7 @@ public class QueueAutoCreationTest extends JMSClientTestSupport {
Connection connection = factory.createConnection();
SimpleString addressName = UUIDGenerator.getInstance().generateSimpleStringUUID();
logger.debug("Address is {}", addressName);
clientSession.createAddress(addressName, RoutingType.ANYCAST, false);
clientSession.createAddress(addressName, RoutingType.MULTICAST, false);
Topic topic = new ActiveMQTopic(addressName.toString());
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(topic);

View File

@ -389,7 +389,7 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase {
Connection connection = factory.createConnection();
SimpleString addressName = UUIDGenerator.getInstance().generateSimpleStringUUID();
logger.debug("Address is {}", addressName);
clientSession.createAddress(addressName, RoutingType.ANYCAST, false);
clientSession.createAddress(addressName, RoutingType.MULTICAST, false);
Topic topic = new ActiveMQTopic(addressName.toString());
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(topic);

View File

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
import javax.jms.Connection;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.junit.Test;
public class JMSMismatchedRoutingTypeTest extends MultiprotocolJMSClientTestSupport {
protected final String ANYCAST_ADDRESS = RandomUtil.randomString();
protected final String MULTICAST_ADDRESS = RandomUtil.randomString();
@Override
protected boolean isAutoCreateAddresses() {
return false;
}
@Override
protected boolean isAutoCreateQueues() {
return false;
}
@Override
protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(ANYCAST_ADDRESS), RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(RandomUtil.randomString()).setAddress(ANYCAST_ADDRESS).setRoutingType(RoutingType.ANYCAST));
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(MULTICAST_ADDRESS), RoutingType.MULTICAST));
server.createQueue(new QueueConfiguration(RandomUtil.randomString()).setAddress(MULTICAST_ADDRESS).setRoutingType(RoutingType.MULTICAST));
}
@Test
public void testSendingMulticastToAnycastAMQP() throws Exception {
internalTestSendingMulticastToAnycast(AMQPConnection);
}
@Test
public void testSendingMulticastToAnycastCore() throws Exception {
internalTestSendingMulticastToAnycast(CoreConnection);
}
@Test
public void testSendingMulticastToAnycastOpenWire() throws Exception {
internalTestSendingMulticastToAnycast(OpenWireConnection);
}
private void internalTestSendingMulticastToAnycast(ConnectionSupplier connectionSupplier) throws Exception {
Connection connection = null;
try {
connection = connectionSupplier.createConnection();
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = s.createTopic(ANYCAST_ADDRESS);
MessageProducer producer = s.createProducer(topic);
producer.send(s.createMessage());
fail("Sending message here should fail!");
} catch (InvalidDestinationException e) {
// expected
} finally {
if (connection != null) {
connection.close();
}
}
}
@Test
public void testSendingAnycastToMulticastAMQP() throws Exception {
internalTestSendingAnycastToMulticast(AMQPConnection);
}
@Test
public void testSendingAnycastToMulticastCore() throws Exception {
internalTestSendingAnycastToMulticast(CoreConnection);
}
@Test
public void testSendingAnycastToMulticastOpenWire() throws Exception {
internalTestSendingAnycastToMulticast(OpenWireConnection);
}
private void internalTestSendingAnycastToMulticast(ConnectionSupplier connectionSupplier) throws Exception {
Connection connection = null;
try {
connection = connectionSupplier.createConnection();
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q;
try {
// when using core this will fail because the client actually checks for the queue on the broker (which won't be there)
q = s.createQueue(MULTICAST_ADDRESS);
} catch (JMSException e) {
if (connection instanceof ActiveMQConnection) {
// expected
return;
} else {
throw e;
}
}
try {
MessageProducer producer = s.createProducer(q);
producer.send(s.createMessage());
fail("Sending message here should fail!");
} catch (InvalidDestinationException e) {
// expected
}
} finally {
if (connection != null) {
connection.close();
}
}
}
}

View File

@ -378,6 +378,12 @@ public class SecurityTest extends ActiveMQTestBase {
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//Add the corresponding addresses to the server
SimpleString address = SimpleString.toSimpleString("test.queue");
server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
SimpleString address2 = SimpleString.toSimpleString("test.topic");
server.addAddressInfo(new AddressInfo(address2, RoutingType.MULTICAST));
//Test queue creation permission
try {
session.createConsumer(session.createQueue("test.queue"));
@ -394,14 +400,9 @@ public class SecurityTest extends ActiveMQTestBase {
assertTrue(e.getMessage().contains("User: test-user does not have permission='CREATE_NON_DURABLE_QUEUE'"));
}
//Add a test queue and topic to the server
SimpleString address = SimpleString.toSimpleString("test.queue");
server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
//Add a test queue to the server
server.createQueue(new QueueConfiguration(address).setRoutingType(RoutingType.ANYCAST));
SimpleString address2 = SimpleString.toSimpleString("test.topic");
server.addAddressInfo(new AddressInfo(address2, RoutingType.MULTICAST));
//Test queue produce permission
try {
MessageProducer producer = session.createProducer(session.createQueue("test.queue"));
@ -441,7 +442,7 @@ public class SecurityTest extends ActiveMQTestBase {
session.createTemporaryQueue();
Assert.fail("should throw exception here");
} catch (Exception e) {
assertTrue(e.getMessage().contains("User: test-user does not have permission='CREATE_NON_DURABLE_QUEUE'"));
assertTrue(e.getMessage().contains("User: test-user does not have permission='CREATE_ADDRESS'"));
}
//Test temp topic

View File

@ -299,7 +299,7 @@ public class RetroactiveAddressTest extends ActiveMQTestBase {
final int COUNT = 10;
final SimpleString divertQueue = ResourceNames.getRetroactiveResourceQueueName(internalNamingPrefix, delimiter, addressName, RoutingType.MULTICAST);
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setRetroactiveMessageCount(COUNT));
server.addAddressInfo(new AddressInfo(addressName));
server.addAddressInfo(new AddressInfo(addressName).addRoutingType(RoutingType.MULTICAST));
ConnectionFactory cf = new ActiveMQConnectionFactory("vm://0");
Connection c = cf.createConnection();

View File

@ -48,9 +48,15 @@ under the License.
</acceptors>
<addresses>
<address name="source1"/>
<address name="source2"/>
<address name="target"/>
<address name="source1">
<anycast/>
</address>
<address name="source2">
<anycast/>
</address>
<address name="target">
<anycast/>
</address>
</addresses>
<diverts>

View File

@ -48,9 +48,15 @@ under the License.
</acceptors>
<addresses>
<address name="source1"/>
<address name="source2"/>
<address name="target"/>
<address name="source1">
<anycast/>
</address>
<address name="source2">
<anycast/>
</address>
<address name="target">
<anycast/>
</address>
</addresses>
<diverts>

View File

@ -48,9 +48,15 @@ under the License.
</acceptors>
<addresses>
<address name="source"/>
<address name="target1"/>
<address name="target2"/>
<address name="source">
<anycast/>
</address>
<address name="target1">
<anycast/>
</address>
<address name="target1">
<anycast/>
</address>
</addresses>
<diverts>

View File

@ -48,9 +48,15 @@ under the License.
</acceptors>
<addresses>
<address name="source"/>
<address name="target1"/>
<address name="target2"/>
<address name="source">
<anycast/>
</address>
<address name="target1">
<anycast/>
</address>
<address name="target1">
<anycast/>
</address>
</addresses>
<diverts>

View File

@ -48,8 +48,12 @@ under the License.
</acceptors>
<addresses>
<address name="source"/>
<address name="target"/>
<address name="source">
<anycast/>
</address>
<address name="target">
<anycast/>
</address>
</addresses>
<diverts>

View File

@ -48,8 +48,12 @@ under the License.
</acceptors>
<addresses>
<address name="source"/>
<address name="target"/>
<address name="source">
<anycast/>
</address>
<address name="target">
<anycast/>
</address>
</addresses>
<diverts>

View File

@ -48,8 +48,12 @@ under the License.
</acceptors>
<addresses>
<address name="source"/>
<address name="target"/>
<address name="source">
<anycast/>
</address>
<address name="target">
<anycast/>
</address>
</addresses>
<diverts>

View File

@ -48,8 +48,12 @@ under the License.
</acceptors>
<addresses>
<address name="source"/>
<address name="target"/>
<address name="source">
<anycast/>
</address>
<address name="target">
<anycast/>
</address>
</addresses>
<diverts>

View File

@ -48,8 +48,12 @@ under the License.
</acceptors>
<addresses>
<address name="source"/>
<address name="target"/>
<address name="source">
<anycast/>
</address>
<address name="target">
<anycast/>
</address>
</addresses>
<diverts>