This commit is contained in:
Justin Bertram 2021-07-06 14:06:39 -05:00
commit 478a28c196
10 changed files with 114 additions and 15 deletions

View File

@ -177,6 +177,8 @@ public interface ClientSession extends XAResource, AutoCloseable {
Long getRingSize();
Boolean isEnabled();
Boolean isConfigurationManaged();
}
// Lifecycle operations ------------------------------------------

View File

@ -78,6 +78,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
private final Boolean enabled;
private final Boolean configurationManaged;
private final Integer defaultConsumerWindowSize;
@ -164,7 +166,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final Long autoDeleteDelay,
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize) {
this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, null, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null, null);
this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, null, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null, null, null);
}
public QueueQueryImpl(final boolean durable,
@ -195,7 +197,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize,
final Long ringSize,
final Boolean enabled) {
final Boolean enabled,
final Boolean configurationManaged) {
this.durable = durable;
this.temporary = temporary;
this.consumerCount = consumerCount;
@ -225,6 +228,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
this.ringSize = ringSize;
this.enabled = enabled;
this.configurationManaged = configurationManaged;
}
@Override
@ -371,5 +375,10 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
public Boolean isEnabled() {
return enabled;
}
@Override
public Boolean isConfigurationManaged() {
return configurationManaged;
}
}

View File

@ -66,12 +66,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
private Boolean enabled;
private Boolean configurationManaged;
public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) {
this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.isGroupRebalancePauseDispatch(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), result.getLastValueKey(), result.isNonDestructive(), result.getConsumersBeforeDispatch(), result.getDelayBeforeDispatch(), result.isAutoDelete(), result.getAutoDeleteDelay(), result.getAutoDeleteMessageCount(), result.getDefaultConsumerWindowSize(), result.getRingSize(), result.isEnabled());
this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.isGroupRebalancePauseDispatch(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), result.getLastValueKey(), result.isNonDestructive(), result.getConsumersBeforeDispatch(), result.getDelayBeforeDispatch(), result.isAutoDelete(), result.getAutoDeleteDelay(), result.getAutoDeleteMessageCount(), result.getDefaultConsumerWindowSize(), result.getRingSize(), result.isEnabled(), result.isConfigurationManaged());
}
public SessionQueueQueryResponseMessage_V3() {
this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null, null, null, null);
this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
}
private SessionQueueQueryResponseMessage_V3(final SimpleString name,
@ -102,7 +104,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize,
final Long ringSize,
final Boolean enabled) {
final Boolean enabled,
final Boolean configurationManaged) {
super(SESS_QUEUEQUERY_RESP_V3);
this.durable = durable;
@ -162,6 +165,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
this.ringSize = ringSize;
this.enabled = enabled;
this.configurationManaged = configurationManaged;
}
public boolean isAutoCreated() {
@ -312,6 +317,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
this.enabled = enabled;
}
public Boolean isConfigurationManaged() {
return configurationManaged;
}
public void setConfigurationManaged(Boolean configurationManaged) {
this.configurationManaged = configurationManaged;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
@ -335,6 +348,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
BufferHelper.writeNullableLong(buffer, ringSize);
BufferHelper.writeNullableBoolean(buffer, enabled);
BufferHelper.writeNullableBoolean(buffer, groupRebalancePauseDispatch);
BufferHelper.writeNullableBoolean(buffer, configurationManaged);
}
@ -375,6 +389,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
if (buffer.readableBytes() > 0) {
groupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer);
}
if (buffer.readableBytes() > 0) {
configurationManaged = BufferHelper.readNullableBoolean(buffer);
}
}
@Override
@ -401,6 +418,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
result = prime * result + (ringSize == null ? 0 : ringSize.hashCode());
result = prime * result + (enabled == null ? 0 : enabled ? 1231 : 1237);
result = prime * result + (configurationManaged == null ? 0 : configurationManaged ? 1231 : 1237);
return result;
}
@ -434,12 +452,13 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
buff.append(", defaultConsumerWindowSize=" + defaultConsumerWindowSize);
buff.append(", ringSize=" + ringSize);
buff.append(", enabled=" + enabled);
buff.append(", configurationManaged=" + configurationManaged);
return buff.toString();
}
@Override
public ClientSession.QueueQuery toQueueQuery() {
return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), isGroupRebalancePauseDispatch(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getAutoDeleteDelay(), getAutoDeleteMessageCount(), getDefaultConsumerWindowSize(), getRingSize(), isEnabled());
return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), isGroupRebalancePauseDispatch(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getAutoDeleteDelay(), getAutoDeleteMessageCount(), getDefaultConsumerWindowSize(), getRingSize(), isEnabled(), isConfigurationManaged());
}
@Override
@ -542,6 +561,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
return false;
if (maxConsumers != other.maxConsumers)
return false;
if (configurationManaged == null) {
if (other.configurationManaged != null)
return false;
} else if (!configurationManaged.equals(other.configurationManaged))
return false;
return true;
}
}

View File

@ -79,6 +79,8 @@ public class QueueQueryResult {
private Boolean enabled;
private Boolean configurationManaged;
public QueueQueryResult(final SimpleString name,
final SimpleString address,
final boolean durable,
@ -107,7 +109,8 @@ public class QueueQueryResult {
final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize,
final Long ringSize,
final Boolean enabled) {
final Boolean enabled,
final Boolean configurationManaged) {
this.durable = durable;
this.temporary = temporary;
@ -165,6 +168,8 @@ public class QueueQueryResult {
this.ringSize = ringSize;
this.enabled = enabled;
this.configurationManaged = configurationManaged;
}
public boolean isExists() {
@ -286,4 +291,8 @@ public class QueueQueryResult {
public Boolean isEnabled() {
return enabled;
}
public Boolean isConfigurationManaged() {
return configurationManaged;
}
}

View File

@ -912,7 +912,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
boolean topicChanged = !oldTopicName.equals(dest.getSimpleAddress());
if (selectorChanged || topicChanged) {
if ((selectorChanged || topicChanged) && !subResponse.isConfigurationManaged()) {
// Delete the old durable sub
session.deleteQueue(queueName);

View File

@ -1098,9 +1098,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false);
QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
if (result.isExists()) {
// If a client reattaches to a durable subscription with a different no-local
// filter value, selector or address then we must recreate the queue (JMS semantics).
if (!Objects.equals(result.getFilterString(), simpleStringSelector) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
/*
* If a client reattaches to a durable subscription with a different filter or address then we must
* recreate the queue (JMS semantics). However, if the corresponding queue is managed via the
* configuration then we don't want to change it
*/
if (!result.isConfigurationManaged() && (!Objects.equals(result.getFilterString(), simpleStringSelector) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString())))) {
if (result.getConsumerCount() == 0) {
sessionSPI.deleteQueue(queue);

View File

@ -199,7 +199,7 @@ public class AMQConsumer {
boolean topicChanged = !oldTopicName.equals(address);
if (selectorChanged || topicChanged) {
if ((selectorChanged || topicChanged) && !result.isConfigurationManaged()) {
// Delete the old durable sub
session.getCoreSession().deleteQueue(queueName);

View File

@ -1060,12 +1060,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SimpleString filterString = filter == null ? null : filter.getFilterString();
response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), defaultConsumerWindowSize, queue.getRingSize(), queue.isEnabled());
response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), defaultConsumerWindowSize, queue.getRingSize(), queue.isEnabled(), queue.isConfigurationManaged());
} else if (realName.equals(managementAddress)) {
// make an exception for the management address (see HORNETQ-29)
response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null, null);
response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null, null, null);
} else {
response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupRebalancePauseDispatch, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDeleteQueuesDelay, autoDeleteQueuesMessageCount, defaultConsumerWindowSize, defaultRingSize, defaultEnabled);
response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupRebalancePauseDispatch, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDeleteQueuesDelay, autoDeleteQueuesMessageCount, defaultConsumerWindowSize, defaultRingSize, defaultEnabled, false);
}
return response;

View File

@ -205,6 +205,10 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
}
protected Connection createCoreConnection(boolean start) throws JMSException {
return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, start);
}
private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
@ -257,6 +261,10 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, true);
}
protected Connection createOpenWireConnection(boolean start) throws JMSException {
return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, false);
}
private Connection createOpenWireConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionString);

View File

@ -40,9 +40,12 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.DestinationUtil;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.junit.Assert;
@ -217,6 +220,47 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
}
}
@Test(timeout = 60000)
public void testDurableSubscriptionWithConfigurationManagedQueueWithCore() throws Exception {
testDurableSubscriptionWithConfigurationManagedQueue(() -> createCoreConnection(false));
}
@Test(timeout = 60000)
public void testDurableSubscriptionWithConfigurationManagedQueueWithOpenWire() throws Exception {
testDurableSubscriptionWithConfigurationManagedQueue(() -> createOpenWireConnection(false));
}
@Test(timeout = 60000)
public void testDurableSubscriptionWithConfigurationManagedQueueWithAMQP() throws Exception {
testDurableSubscriptionWithConfigurationManagedQueue(() -> JMSMessageConsumerTest.super.createConnection(false));
}
private void testDurableSubscriptionWithConfigurationManagedQueue(ConnectionSupplier connectionSupplier) throws Exception {
final String clientId = "bar";
final String subName = "foo";
final String queueName = DestinationUtil.createQueueNameForSubscription(true, clientId, subName).toString();
server.stop();
server.getConfiguration().addQueueConfiguration(new QueueConfiguration(queueName).setAddress("myTopic").setFilterString("color = 'BLUE'").setRoutingType(RoutingType.MULTICAST));
server.getConfiguration().setAmqpUseCoreSubscriptionNaming(true);
server.start();
try (Connection connection = connectionSupplier.createConnection()) {
connection.setClientID(clientId);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic("myTopic");
MessageConsumer messageConsumer = session.createDurableSubscriber(destination, subName);
messageConsumer.close();
Queue queue = server.locateQueue(queueName);
assertNotNull(queue);
assertNotNull(queue.getFilter());
assertEquals("color = 'BLUE'", queue.getFilter().getFilterString().toString());
}
}
@Test(timeout = 30000)
public void testSelectorsWithJMSTypeOnTopic() throws Exception {
doTestSelectorsWithJMSType(true);