ARTEMIS-3374 config-managed queue can be deleted by durable subscriber

This commit is contained in:
Justin Bertram 2021-06-23 14:22:17 -05:00
parent 3e5795e965
commit 1d02d06eab
10 changed files with 114 additions and 15 deletions

View File

@ -177,6 +177,8 @@ public interface ClientSession extends XAResource, AutoCloseable {
Long getRingSize(); Long getRingSize();
Boolean isEnabled(); Boolean isEnabled();
Boolean isConfigurationManaged();
} }
// Lifecycle operations ------------------------------------------ // Lifecycle operations ------------------------------------------

View File

@ -78,6 +78,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
private final Boolean enabled; private final Boolean enabled;
private final Boolean configurationManaged;
private final Integer defaultConsumerWindowSize; private final Integer defaultConsumerWindowSize;
@ -164,7 +166,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final Long autoDeleteDelay, final Long autoDeleteDelay,
final Long autoDeleteMessageCount, final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize) { final Integer defaultConsumerWindowSize) {
this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, null, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null, null); this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, null, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null, null, null);
} }
public QueueQueryImpl(final boolean durable, public QueueQueryImpl(final boolean durable,
@ -195,7 +197,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final Long autoDeleteMessageCount, final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize, final Integer defaultConsumerWindowSize,
final Long ringSize, final Long ringSize,
final Boolean enabled) { final Boolean enabled,
final Boolean configurationManaged) {
this.durable = durable; this.durable = durable;
this.temporary = temporary; this.temporary = temporary;
this.consumerCount = consumerCount; this.consumerCount = consumerCount;
@ -225,6 +228,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
this.defaultConsumerWindowSize = defaultConsumerWindowSize; this.defaultConsumerWindowSize = defaultConsumerWindowSize;
this.ringSize = ringSize; this.ringSize = ringSize;
this.enabled = enabled; this.enabled = enabled;
this.configurationManaged = configurationManaged;
} }
@Override @Override
@ -371,5 +375,10 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
public Boolean isEnabled() { public Boolean isEnabled() {
return enabled; return enabled;
} }
@Override
public Boolean isConfigurationManaged() {
return configurationManaged;
}
} }

View File

@ -66,12 +66,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
private Boolean enabled; private Boolean enabled;
private Boolean configurationManaged;
public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) { public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) {
this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.isGroupRebalancePauseDispatch(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), result.getLastValueKey(), result.isNonDestructive(), result.getConsumersBeforeDispatch(), result.getDelayBeforeDispatch(), result.isAutoDelete(), result.getAutoDeleteDelay(), result.getAutoDeleteMessageCount(), result.getDefaultConsumerWindowSize(), result.getRingSize(), result.isEnabled()); this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.isGroupRebalancePauseDispatch(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), result.getLastValueKey(), result.isNonDestructive(), result.getConsumersBeforeDispatch(), result.getDelayBeforeDispatch(), result.isAutoDelete(), result.getAutoDeleteDelay(), result.getAutoDeleteMessageCount(), result.getDefaultConsumerWindowSize(), result.getRingSize(), result.isEnabled(), result.isConfigurationManaged());
} }
public SessionQueueQueryResponseMessage_V3() { public SessionQueueQueryResponseMessage_V3() {
this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null, null, null, null); this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
} }
private SessionQueueQueryResponseMessage_V3(final SimpleString name, private SessionQueueQueryResponseMessage_V3(final SimpleString name,
@ -102,7 +104,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
final Long autoDeleteMessageCount, final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize, final Integer defaultConsumerWindowSize,
final Long ringSize, final Long ringSize,
final Boolean enabled) { final Boolean enabled,
final Boolean configurationManaged) {
super(SESS_QUEUEQUERY_RESP_V3); super(SESS_QUEUEQUERY_RESP_V3);
this.durable = durable; this.durable = durable;
@ -162,6 +165,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
this.ringSize = ringSize; this.ringSize = ringSize;
this.enabled = enabled; this.enabled = enabled;
this.configurationManaged = configurationManaged;
} }
public boolean isAutoCreated() { public boolean isAutoCreated() {
@ -312,6 +317,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
this.enabled = enabled; this.enabled = enabled;
} }
public Boolean isConfigurationManaged() {
return configurationManaged;
}
public void setConfigurationManaged(Boolean configurationManaged) {
this.configurationManaged = configurationManaged;
}
@Override @Override
public void encodeRest(final ActiveMQBuffer buffer) { public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer); super.encodeRest(buffer);
@ -335,6 +348,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
BufferHelper.writeNullableLong(buffer, ringSize); BufferHelper.writeNullableLong(buffer, ringSize);
BufferHelper.writeNullableBoolean(buffer, enabled); BufferHelper.writeNullableBoolean(buffer, enabled);
BufferHelper.writeNullableBoolean(buffer, groupRebalancePauseDispatch); BufferHelper.writeNullableBoolean(buffer, groupRebalancePauseDispatch);
BufferHelper.writeNullableBoolean(buffer, configurationManaged);
} }
@ -375,6 +389,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
if (buffer.readableBytes() > 0) { if (buffer.readableBytes() > 0) {
groupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer); groupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer);
} }
if (buffer.readableBytes() > 0) {
configurationManaged = BufferHelper.readNullableBoolean(buffer);
}
} }
@Override @Override
@ -401,6 +418,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode()); result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
result = prime * result + (ringSize == null ? 0 : ringSize.hashCode()); result = prime * result + (ringSize == null ? 0 : ringSize.hashCode());
result = prime * result + (enabled == null ? 0 : enabled ? 1231 : 1237); result = prime * result + (enabled == null ? 0 : enabled ? 1231 : 1237);
result = prime * result + (configurationManaged == null ? 0 : configurationManaged ? 1231 : 1237);
return result; return result;
} }
@ -434,12 +452,13 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
buff.append(", defaultConsumerWindowSize=" + defaultConsumerWindowSize); buff.append(", defaultConsumerWindowSize=" + defaultConsumerWindowSize);
buff.append(", ringSize=" + ringSize); buff.append(", ringSize=" + ringSize);
buff.append(", enabled=" + enabled); buff.append(", enabled=" + enabled);
buff.append(", configurationManaged=" + configurationManaged);
return buff.toString(); return buff.toString();
} }
@Override @Override
public ClientSession.QueueQuery toQueueQuery() { public ClientSession.QueueQuery toQueueQuery() {
return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), isGroupRebalancePauseDispatch(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getAutoDeleteDelay(), getAutoDeleteMessageCount(), getDefaultConsumerWindowSize(), getRingSize(), isEnabled()); return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), isGroupRebalancePauseDispatch(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getAutoDeleteDelay(), getAutoDeleteMessageCount(), getDefaultConsumerWindowSize(), getRingSize(), isEnabled(), isConfigurationManaged());
} }
@Override @Override
@ -542,6 +561,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
return false; return false;
if (maxConsumers != other.maxConsumers) if (maxConsumers != other.maxConsumers)
return false; return false;
if (configurationManaged == null) {
if (other.configurationManaged != null)
return false;
} else if (!configurationManaged.equals(other.configurationManaged))
return false;
return true; return true;
} }
} }

View File

@ -79,6 +79,8 @@ public class QueueQueryResult {
private Boolean enabled; private Boolean enabled;
private Boolean configurationManaged;
public QueueQueryResult(final SimpleString name, public QueueQueryResult(final SimpleString name,
final SimpleString address, final SimpleString address,
final boolean durable, final boolean durable,
@ -107,7 +109,8 @@ public class QueueQueryResult {
final Long autoDeleteMessageCount, final Long autoDeleteMessageCount,
final Integer defaultConsumerWindowSize, final Integer defaultConsumerWindowSize,
final Long ringSize, final Long ringSize,
final Boolean enabled) { final Boolean enabled,
final Boolean configurationManaged) {
this.durable = durable; this.durable = durable;
this.temporary = temporary; this.temporary = temporary;
@ -165,6 +168,8 @@ public class QueueQueryResult {
this.ringSize = ringSize; this.ringSize = ringSize;
this.enabled = enabled; this.enabled = enabled;
this.configurationManaged = configurationManaged;
} }
public boolean isExists() { public boolean isExists() {
@ -286,4 +291,8 @@ public class QueueQueryResult {
public Boolean isEnabled() { public Boolean isEnabled() {
return enabled; return enabled;
} }
public Boolean isConfigurationManaged() {
return configurationManaged;
}
} }

View File

@ -912,7 +912,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
boolean topicChanged = !oldTopicName.equals(dest.getSimpleAddress()); boolean topicChanged = !oldTopicName.equals(dest.getSimpleAddress());
if (selectorChanged || topicChanged) { if ((selectorChanged || topicChanged) && !subResponse.isConfigurationManaged()) {
// Delete the old durable sub // Delete the old durable sub
session.deleteQueue(queueName); session.deleteQueue(queueName);

View File

@ -1098,9 +1098,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false); queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false);
QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false); QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
if (result.isExists()) { if (result.isExists()) {
// If a client reattaches to a durable subscription with a different no-local /*
// filter value, selector or address then we must recreate the queue (JMS semantics). * If a client reattaches to a durable subscription with a different filter or address then we must
if (!Objects.equals(result.getFilterString(), simpleStringSelector) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) { * recreate the queue (JMS semantics). However, if the corresponding queue is managed via the
* configuration then we don't want to change it
*/
if (!result.isConfigurationManaged() && (!Objects.equals(result.getFilterString(), simpleStringSelector) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString())))) {
if (result.getConsumerCount() == 0) { if (result.getConsumerCount() == 0) {
sessionSPI.deleteQueue(queue); sessionSPI.deleteQueue(queue);

View File

@ -199,7 +199,7 @@ public class AMQConsumer {
boolean topicChanged = !oldTopicName.equals(address); boolean topicChanged = !oldTopicName.equals(address);
if (selectorChanged || topicChanged) { if ((selectorChanged || topicChanged) && !result.isConfigurationManaged()) {
// Delete the old durable sub // Delete the old durable sub
session.getCoreSession().deleteQueue(queueName); session.getCoreSession().deleteQueue(queueName);

View File

@ -1060,12 +1060,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SimpleString filterString = filter == null ? null : filter.getFilterString(); SimpleString filterString = filter == null ? null : filter.getFilterString();
response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), defaultConsumerWindowSize, queue.getRingSize(), queue.isEnabled()); response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), defaultConsumerWindowSize, queue.getRingSize(), queue.isEnabled(), queue.isConfigurationManaged());
} else if (realName.equals(managementAddress)) { } else if (realName.equals(managementAddress)) {
// make an exception for the management address (see HORNETQ-29) // make an exception for the management address (see HORNETQ-29)
response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null, null); response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null, null, null);
} else { } else {
response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupRebalancePauseDispatch, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDeleteQueuesDelay, autoDeleteQueuesMessageCount, defaultConsumerWindowSize, defaultRingSize, defaultEnabled); response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupRebalancePauseDispatch, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDeleteQueuesDelay, autoDeleteQueuesMessageCount, defaultConsumerWindowSize, defaultRingSize, defaultEnabled, false);
} }
return response; return response;

View File

@ -205,6 +205,10 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true); return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
} }
protected Connection createCoreConnection(boolean start) throws JMSException {
return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, start);
}
private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException { private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString); ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
@ -257,6 +261,10 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, true); return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, true);
} }
protected Connection createOpenWireConnection(boolean start) throws JMSException {
return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, false);
}
private Connection createOpenWireConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException { private Connection createOpenWireConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionString); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionString);

View File

@ -40,9 +40,12 @@ import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import javax.jms.Topic; import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.DestinationUtil;
import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.junit.Assert; import org.junit.Assert;
@ -217,6 +220,47 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
} }
} }
@Test(timeout = 60000)
public void testDurableSubscriptionWithConfigurationManagedQueueWithCore() throws Exception {
testDurableSubscriptionWithConfigurationManagedQueue(() -> createCoreConnection(false));
}
@Test(timeout = 60000)
public void testDurableSubscriptionWithConfigurationManagedQueueWithOpenWire() throws Exception {
testDurableSubscriptionWithConfigurationManagedQueue(() -> createOpenWireConnection(false));
}
@Test(timeout = 60000)
public void testDurableSubscriptionWithConfigurationManagedQueueWithAMQP() throws Exception {
testDurableSubscriptionWithConfigurationManagedQueue(() -> JMSMessageConsumerTest.super.createConnection(false));
}
private void testDurableSubscriptionWithConfigurationManagedQueue(ConnectionSupplier connectionSupplier) throws Exception {
final String clientId = "bar";
final String subName = "foo";
final String queueName = DestinationUtil.createQueueNameForSubscription(true, clientId, subName).toString();
server.stop();
server.getConfiguration().addQueueConfiguration(new QueueConfiguration(queueName).setAddress("myTopic").setFilterString("color = 'BLUE'").setRoutingType(RoutingType.MULTICAST));
server.getConfiguration().setAmqpUseCoreSubscriptionNaming(true);
server.start();
try (Connection connection = connectionSupplier.createConnection()) {
connection.setClientID(clientId);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic("myTopic");
MessageConsumer messageConsumer = session.createDurableSubscriber(destination, subName);
messageConsumer.close();
Queue queue = server.locateQueue(queueName);
assertNotNull(queue);
assertNotNull(queue.getFilter());
assertEquals("color = 'BLUE'", queue.getFilter().getFilterString().toString());
}
}
@Test(timeout = 30000) @Test(timeout = 30000)
public void testSelectorsWithJMSTypeOnTopic() throws Exception { public void testSelectorsWithJMSTypeOnTopic() throws Exception {
doTestSelectorsWithJMSType(true); doTestSelectorsWithJMSType(true);