ARTEMIS-799 Fix issues with the AMQP Durable Topic Subscription model

Fixes several issues found in the handling of durable topic
subscriptions (test cases added).
This commit is contained in:
Timothy Bish 2016-10-12 17:33:08 -04:00 committed by Clebert Suconic
parent 9743043fb8
commit 226f28abf5
7 changed files with 483 additions and 17 deletions

View File

@ -24,7 +24,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@ -46,6 +45,8 @@ import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.Transport;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf;
public class AMQPConnectionContext extends ProtonInitializable { public class AMQPConnectionContext extends ProtonInitializable {
private static final Logger log = Logger.getLogger(AMQPConnectionContext.class); private static final Logger log = Logger.getLogger(AMQPConnectionContext.class);
@ -181,7 +182,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
protected void remoteLinkOpened(Link link) throws Exception { protected void remoteLinkOpened(Link link) throws Exception {
AMQPSessionContext protonSession = (AMQPSessionContext) getSessionExtension(link.getSession()); AMQPSessionContext protonSession = getSessionExtension(link.getSession());
link.setSource(link.getRemoteSource()); link.setSource(link.getRemoteSource());
link.setTarget(link.getRemoteTarget()); link.setTarget(link.getRemoteTarget());
@ -321,6 +322,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
public void onRemoteClose(Connection connection) { public void onRemoteClose(Connection connection) {
synchronized (getLock()) { synchronized (getLock()) {
connection.close(); connection.close();
connection.free();
for (AMQPSessionContext protonSession : sessions.values()) { for (AMQPSessionContext protonSession : sessions.values()) {
protonSession.close(); protonSession.close();
} }
@ -352,6 +354,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
public void onRemoteClose(Session session) throws Exception { public void onRemoteClose(Session session) throws Exception {
synchronized (getLock()) { synchronized (getLock()) {
session.close(); session.close();
session.free();
} }
AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext(); AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext();
@ -375,6 +378,7 @@ public class AMQPConnectionContext extends ProtonInitializable {
@Override @Override
public void onRemoteClose(Link link) throws Exception { public void onRemoteClose(Link link) throws Exception {
link.close(); link.close();
link.free();
ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext(); ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
if (linkContext != null) { if (linkContext != null) {
linkContext.close(true); linkContext.close(true);
@ -384,10 +388,11 @@ public class AMQPConnectionContext extends ProtonInitializable {
@Override @Override
public void onRemoteDetach(Link link) throws Exception { public void onRemoteDetach(Link link) throws Exception {
link.detach(); link.detach();
link.free();
} }
@Override @Override
public void onDetach(Link link) throws Exception { public void onLocalDetach(Link link) throws Exception {
Object context = link.getContext(); Object context = link.getContext();
if (context instanceof ProtonServerSenderContext) { if (context instanceof ProtonServerSenderContext) {
ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context; ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context;
@ -402,10 +407,8 @@ public class AMQPConnectionContext extends ProtonInitializable {
handler.onMessage(delivery); handler.onMessage(delivery);
} else { } else {
// TODO: logs // TODO: logs
System.err.println("Handler is null, can't delivery " + delivery); System.err.println("Handler is null, can't delivery " + delivery);
} }
} }
} }
} }

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.proton;
import org.apache.qpid.proton.amqp.DescribedType;
/**
* A Described Type wrapper for JMS selector values.
*/
public class AmqpJmsSelectorFilter implements DescribedType {
private final String selector;
public AmqpJmsSelectorFilter(String selector) {
this.selector = selector;
}
@Override
public Object getDescriptor() {
return AmqpSupport.JMS_SELECTOR_CODE;
}
@Override
public Object getDescribed() {
return this.selector;
}
@Override
public String toString() {
return "AmqpJmsSelectorType{" + selector + "}";
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.proton;
import org.apache.qpid.proton.amqp.DescribedType;
/**
* A Described Type wrapper for JMS no local option for MessageConsumer.
*/
public class AmqpNoLocalFilter implements DescribedType {
public static final AmqpNoLocalFilter NO_LOCAL = new AmqpNoLocalFilter();
private final String noLocal;
public AmqpNoLocalFilter() {
this.noLocal = "NoLocalFilter{}";
}
@Override
public Object getDescriptor() {
return AmqpSupport.NO_LOCAL_CODE;
}
@Override
public Object getDescribed() {
return this.noLocal;
}
}

View File

@ -170,21 +170,46 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
String clientId = connection.getRemoteContainer(); String clientId = connection.getRemoteContainer();
String pubId = sender.getName(); String pubId = sender.getName();
queue = createQueueName(clientId, pubId); queue = createQueueName(clientId, pubId);
boolean exists = sessionSPI.queueQuery(queue, false).isExists(); QueueQueryResult result = sessionSPI.queueQuery(queue, false);
// Once confirmed that the address exists we need to return a Source that reflects // Once confirmed that the address exists we need to return a Source that reflects
// the lifetime policy and capabilities of the new subscription. // the lifetime policy and capabilities of the new subscription.
// if (result.isExists()) {
// TODO we are not applying selector or noLocal filters to the source we just
// looked up which would violate expectations if the client checked that they
// are present on subscription recovery (JMS Durable Re-subscribe) etc
if (exists) {
source = new org.apache.qpid.proton.amqp.messaging.Source(); source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress(queue); source.setAddress(queue);
source.setDurable(TerminusDurability.UNSETTLED_STATE); source.setDurable(TerminusDurability.UNSETTLED_STATE);
source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
source.setDistributionMode(COPY); source.setDistributionMode(COPY);
source.setCapabilities(TOPIC); source.setCapabilities(TOPIC);
SimpleString filterString = result.getFilterString();
if (filterString != null) {
selector = filterString.toString();
boolean noLocal = false;
String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
if (selector.endsWith(noLocalFilter)) {
if (selector.length() > noLocalFilter.length()) {
noLocalFilter = " AND " + noLocalFilter;
selector = selector.substring(0, selector.length() - noLocalFilter.length());
} else {
selector = null;
}
noLocal = true;
}
if (noLocal) {
supportedFilters.put(AmqpSupport.NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
}
if (selector != null && !selector.trim().isEmpty()) {
supportedFilters.put(AmqpSupport.JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(selector));
}
}
sender.setSource(source); sender.setSource(source);
} else { } else {
throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + sender.getName()); throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + sender.getName());
@ -228,7 +253,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} else { } else {
sessionSPI.createDurableQueue(source.getAddress(), queue, selector); sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
} }
source.setAddress(queue);
} else { } else {
// otherwise we are a volatile subscription // otherwise we are a volatile subscription
queue = java.util.UUID.randomUUID().toString(); queue = java.util.UUID.randomUUID().toString();
@ -237,7 +261,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} catch (Exception e) { } catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
} }
source.setAddress(queue);
} }
} else { } else {
queue = source.getAddress(); queue = source.getAddress();
@ -308,7 +331,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// any durable resources for say pub subs // any durable resources for say pub subs
if (remoteLinkClose) { if (remoteLinkClose) {
Source source = (Source) sender.getSource(); Source source = (Source) sender.getSource();
if (source != null && source.getAddress() != null && hasCapabilities(TOPIC, source)) { if (source != null && source.getAddress() != null && (hasCapabilities(TOPIC, source) || isPubSub(source))) {
String queueName = source.getAddress(); String queueName = source.getAddress();
QueueQueryResult result = sessionSPI.queueQuery(queueName, false); QueueQueryResult result = sessionSPI.queueQuery(queueName, false);
if (result.isExists() && source.getDynamic()) { if (result.isExists() && source.getDynamic()) {

View File

@ -69,7 +69,7 @@ public interface EventHandler {
void onRemoteDetach(Link link) throws Exception; void onRemoteDetach(Link link) throws Exception;
void onDetach(Link link) throws Exception; void onLocalDetach(Link link) throws Exception;
void onDelivery(Delivery delivery) throws Exception; void onDelivery(Delivery delivery) throws Exception;

View File

@ -85,7 +85,7 @@ public final class Events {
handler.onFinal(event.getLink()); handler.onFinal(event.getLink());
break; break;
case LINK_LOCAL_DETACH: case LINK_LOCAL_DETACH:
handler.onDetach(event.getLink()); handler.onLocalDetach(event.getLink());
break; break;
case LINK_REMOTE_DETACH: case LINK_REMOTE_DETACH:
handler.onRemoteDetach(event.getLink()); handler.onRemoteDetach(event.getLink());
@ -96,7 +96,8 @@ public final class Events {
case DELIVERY: case DELIVERY:
handler.onDelivery(event.getDelivery()); handler.onDelivery(event.getDelivery());
break; break;
default:
break;
} }
} }
} }

View File

@ -0,0 +1,350 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import static org.apache.activemq.transport.amqp.AmqpSupport.COPY;
import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME;
import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.engine.Receiver;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests for broker side support of the Durable Subscription mapping for JMS.
*/
public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(AmqpDurableReceiverTest.class);
private final String SELECTOR_STRING = "color = red";
@Override
public void setUp() throws Exception {
super.setUp();
server.createQueue(new SimpleString(getTopicName()), new SimpleString(getTopicName()), null, true, false);
}
@Test(timeout = 60000)
public void testCreateDurableReceiver() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.createConnection());
connection.setContainerId(getContainerID());
connection.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
receiver.flow(1);
assertEquals(getTopicName(), lookupSubscription());
AmqpSender sender = session.createSender(getTopicName());
AmqpMessage message = new AmqpMessage();
message.setMessageId("message:1");
sender.send(message);
message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
connection.close();
assertEquals(getTopicName(), lookupSubscription());
}
@Test(timeout = 60000)
public void testDetachedDurableReceiverRemainsActive() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.createConnection());
connection.setContainerId(getContainerID());
connection.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
assertEquals(getTopicName(), lookupSubscription());
receiver.detach();
assertEquals(getTopicName(), lookupSubscription());
connection.close();
}
@Test(timeout = 60000)
public void testCloseDurableReceiverRemovesSubscription() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.createConnection());
connection.setContainerId(getContainerID());
connection.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
assertEquals(getTopicName(), lookupSubscription());
receiver.close();
assertNull(lookupSubscription());
connection.close();
}
@Test(timeout = 60000)
public void testReattachToDurableNode() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.createConnection());
connection.setContainerId(getContainerID());
connection.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
receiver.detach();
receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
receiver.close();
connection.close();
}
@Test(timeout = 60000)
public void testLookupExistingSubscription() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.createConnection());
connection.setContainerId(getContainerID());
connection.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName());
receiver.detach();
receiver = session.lookupSubscription(getSubscriptionName());
assertNotNull(receiver);
Receiver protonReceiver = receiver.getReceiver();
assertNotNull(protonReceiver.getRemoteSource());
Source remoteSource = (Source) protonReceiver.getRemoteSource();
if (remoteSource.getFilter() != null) {
assertFalse(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
assertFalse(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
}
assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
assertEquals(COPY, remoteSource.getDistributionMode());
receiver.close();
try {
receiver = session.lookupSubscription(getSubscriptionName());
fail("Should not be able to lookup the subscription");
} catch (Exception e) {
}
connection.close();
}
@Test(timeout = 60000)
public void testLookupExistingSubscriptionWithSelector() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.createConnection());
connection.setContainerId(getContainerID());
connection.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName(), SELECTOR_STRING, false);
receiver.detach();
receiver = session.lookupSubscription(getSubscriptionName());
assertNotNull(receiver);
Receiver protonReceiver = receiver.getReceiver();
assertNotNull(protonReceiver.getRemoteSource());
Source remoteSource = (Source) protonReceiver.getRemoteSource();
assertNotNull(remoteSource.getFilter());
assertFalse(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
String selector = (String) ((DescribedType) remoteSource.getFilter().get(JMS_SELECTOR_NAME)).getDescribed();
assertEquals(SELECTOR_STRING, selector);
assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
assertEquals(COPY, remoteSource.getDistributionMode());
receiver.close();
try {
receiver = session.lookupSubscription(getSubscriptionName());
fail("Should not be able to lookup the subscription");
} catch (Exception e) {
}
connection.close();
}
@Test(timeout = 60000)
public void testLookupExistingSubscriptionWithNoLocal() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.createConnection());
connection.setContainerId(getContainerID());
connection.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName(), null, true);
receiver.detach();
receiver = session.lookupSubscription(getSubscriptionName());
assertNotNull(receiver);
Receiver protonReceiver = receiver.getReceiver();
assertNotNull(protonReceiver.getRemoteSource());
Source remoteSource = (Source) protonReceiver.getRemoteSource();
assertNotNull(remoteSource.getFilter());
assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
assertFalse(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
assertEquals(COPY, remoteSource.getDistributionMode());
receiver.close();
try {
receiver = session.lookupSubscription(getSubscriptionName());
fail("Should not be able to lookup the subscription");
} catch (Exception e) {
}
connection.close();
}
@Test(timeout = 60000)
public void testLookupExistingSubscriptionWithSelectorAndNoLocal() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.createConnection());
connection.setContainerId(getContainerID());
connection.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName(), SELECTOR_STRING, true);
receiver.detach();
receiver = session.lookupSubscription(getSubscriptionName());
assertNotNull(receiver);
Receiver protonReceiver = receiver.getReceiver();
assertNotNull(protonReceiver.getRemoteSource());
Source remoteSource = (Source) protonReceiver.getRemoteSource();
assertNotNull(remoteSource.getFilter());
assertTrue(remoteSource.getFilter().containsKey(NO_LOCAL_NAME));
assertTrue(remoteSource.getFilter().containsKey(JMS_SELECTOR_NAME));
String selector = (String) ((DescribedType) remoteSource.getFilter().get(JMS_SELECTOR_NAME)).getDescribed();
assertEquals(SELECTOR_STRING, selector);
assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy());
assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable());
assertEquals(COPY, remoteSource.getDistributionMode());
receiver.close();
try {
receiver = session.lookupSubscription(getSubscriptionName());
fail("Should not be able to lookup the subscription");
} catch (Exception e) {
}
connection.close();
}
@Test(timeout = 60000)
public void testLookupNonExistingSubscription() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.createConnection());
connection.setContainerId(getContainerID());
connection.connect();
AmqpSession session = connection.createSession();
try {
session.lookupSubscription(getSubscriptionName());
fail("Should throw an exception since there is not subscription");
} catch (Exception e) {
LOG.info("Error on lookup: {}", e.getMessage());
}
connection.close();
}
public String lookupSubscription() {
Binding binding = server.getPostOffice().getBinding(new SimpleString(getContainerID() + "." + getSubscriptionName()));
if (binding != null) {
return binding.getAddress().toString();
}
return null;
}
public String getContainerID() {
return "myContainerID";
}
public String getSubscriptionName() {
return "mySubscription";
}
public String getTopicName() {
return "jms.topic.myTopic";
}
}