Add some initial tests for durable subscription handling.
This commit is contained in:
Timothy Bish 2015-03-16 17:51:45 -04:00
parent 559f52a2aa
commit 934ad44add
6 changed files with 268 additions and 19 deletions

View File

@ -42,6 +42,7 @@ import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.ConnectorViewMBean; import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.spring.SpringSslContext; import org.apache.activemq.spring.SpringSslContext;
import org.apache.activemq.store.kahadb.KahaDBStore; import org.apache.activemq.store.kahadb.KahaDBStore;
@ -320,10 +321,10 @@ public class AmqpTestSupport {
return proxy; return proxy;
} }
protected QueueViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException { protected TopicViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name); ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name);
QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext() TopicViewMBean proxy = (TopicViewMBean) brokerService.getManagementContext()
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); .newProxyInstance(queueViewMBeanName, TopicViewMBean.class, true);
return proxy; return proxy;
} }
} }

View File

@ -65,27 +65,59 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
} }
} }
@Override
public void detach(AsyncResult request) {
// If already closed signal success or else the caller might never get notified.
if (getEndpoint().getLocalState() == EndpointState.CLOSED ||
getEndpoint().getRemoteState() == EndpointState.CLOSED) {
if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
doDetach();
getEndpoint().free();
}
request.onSuccess();
} else {
this.closeRequest = request;
doDetach();
}
}
@Override @Override
public void close(AsyncResult request) { public void close(AsyncResult request) {
// If already closed signal success or else the caller might never get notified. // If already closed signal success or else the caller might never get notified.
if (getEndpoint().getLocalState() == EndpointState.CLOSED || if (getEndpoint().getLocalState() == EndpointState.CLOSED ||
getEndpoint().getRemoteState() == EndpointState.CLOSED) { getEndpoint().getRemoteState() == EndpointState.CLOSED) {
if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
// Remote already closed this resource, close locally and free.
if (getEndpoint().getLocalState() != EndpointState.CLOSED) { if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
doClose(); doClose();
getEndpoint().free(); getEndpoint().free();
} }
}
request.onSuccess(); request.onSuccess();
return; } else {
}
this.closeRequest = request; this.closeRequest = request;
doClose(); doClose();
} }
// // If already closed signal success or else the caller might never get notified.
// if (getEndpoint().getLocalState() == EndpointState.CLOSED ||
// getEndpoint().getRemoteState() == EndpointState.CLOSED) {
//
// if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
// // Remote already closed this resource, close locally and free.
// if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
// doClose();
// getEndpoint().free();
// }
// }
//
// request.onSuccess();
// return;
// }
//
// this.closeRequest = request;
// doClose();
}
@Override @Override
public boolean isClosed() { public boolean isClosed() {
@ -277,6 +309,16 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
getEndpoint().close(); getEndpoint().close();
} }
/**
* Perform the detach operation on the managed endpoint.
*
* By default this method throws an UnsupportedOperationException, a subclass
* must implement this and do a detach if its resource supports that.
*/
protected void doDetach() {
throw new UnsupportedOperationException("Endpoint cannot be detached.");
}
/** /**
* Complete the open operation on the managed endpoint. A subclass may * Complete the open operation on the managed endpoint. A subclass may
* override this method to provide additional verification actions or configuration * override this method to provide additional verification actions or configuration

View File

@ -94,10 +94,10 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
} }
/** /**
* Close the sender, a closed sender will throw exceptions if any further send * Close the receiver, a closed receiver will throw exceptions if any further send
* calls are made. * calls are made.
* *
* @throws IOException if an error occurs while closing the sender. * @throws IOException if an error occurs while closing the receiver.
*/ */
public void close() throws IOException { public void close() throws IOException {
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
@ -116,6 +116,29 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
} }
} }
/**
* Detach the receiver, a closed receiver will throw exceptions if any further send
* calls are made.
*
* @throws IOException if an error occurs while closing the receiver.
*/
public void detach() throws IOException {
if (closed.compareAndSet(false, true)) {
final ClientFuture request = new ClientFuture();
session.getScheduler().execute(new Runnable() {
@Override
public void run() {
checkClosed();
detach(request);
session.pumpToProtonTransport();
}
});
request.sync();
}
}
/** /**
* @return this session's parent AmqpSession. * @return this session's parent AmqpSession.
*/ */
@ -442,11 +465,12 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
@Override @Override
protected void doClose() { protected void doClose() {
if (isDurable()) {
getEndpoint().detach();
} else {
getEndpoint().close(); getEndpoint().close();
} }
@Override
protected void doDetach() {
getEndpoint().detach();
} }
@Override @Override

View File

@ -57,6 +57,15 @@ public interface AmqpResource {
*/ */
void close(AsyncResult request); void close(AsyncResult request);
/**
* Perform all work needed to detach this resource and store the request
* until such time as the remote peer indicates the resource has been detached.
*
* @param request
* The initiating request that triggered this detach call.
*/
void detach(AsyncResult request);
/** /**
* @return if the resource has moved to the closed state on the remote. * @return if the resource has moved to the closed state on the remote.
*/ */

View File

@ -110,6 +110,40 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
return receiver; return receiver;
} }
/**
* Create a receiver instance using the given address that creates a durable subscription.
*
* @param address
* the address to which the receiver will subscribe for its messages.
* @param subscriptionName
* the name of the subscription that is being created.
*
* @return a newly created receiver that is ready for use.
*
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpReceiver createDurableReceiver(String address, String subscriptionName) throws Exception {
checkClosed();
final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, getNextReceiverId());
receiver.setSubscriptionName(subscriptionName);
final ClientFuture request = new ClientFuture();
connection.getScheduler().execute(new Runnable() {
@Override
public void run() {
checkClosed();
receiver.open(request);
pumpToProtonTransport();
}
});
request.sync();
return receiver;
}
/** /**
* @return this session's parent AmqpConnection. * @return this session's parent AmqpConnection.
*/ */

View File

@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.util.Wait;
import org.junit.Test;
/**
* Tests for broker side support of the Durable Subscription mapping for JMS.
*/
public class AmqpDurableReceiverTest extends AmqpClientTestSupport {
@Override
protected boolean isUseOpenWireConnector() {
return true;
}
@Test(timeout = 60000)
public void testCreateDurableReceiver() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
connection.setContainerId(getTestName());
connection.connect();
AmqpSession session = connection.createSession();
session.createDurableReceiver("topic://" + getTestName(), getTestName());
final BrokerViewMBean brokerView = getProxyToBroker();
assertTrue("Should be a durable sub", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerView.getDurableTopicSubscribers().length == 1;
}
}, TimeUnit.SECONDS.toMillis(5000), TimeUnit.MILLISECONDS.toMillis(10)));
connection.close();
}
@Test(timeout = 60000)
public void testDetachedDurableReceiverRemainsActive() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
connection.setContainerId(getTestName());
connection.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
final BrokerViewMBean brokerView = getProxyToBroker();
assertTrue("Should be a durable sub", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerView.getDurableTopicSubscribers().length == 1;
}
}, TimeUnit.SECONDS.toMillis(5000), TimeUnit.MILLISECONDS.toMillis(10)));
receiver.detach();
assertTrue("Should be an inactive durable sub", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerView.getInactiveDurableTopicSubscribers().length == 1;
}
}, TimeUnit.SECONDS.toMillis(5000), TimeUnit.MILLISECONDS.toMillis(10)));
connection.close();
}
@Test(timeout = 60000)
public void testCloseDurableReceiverRemovesSubscription() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.createConnection();
connection.setContainerId(getTestName());
connection.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName());
final BrokerViewMBean brokerView = getProxyToBroker();
assertTrue("Should be a durable sub", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerView.getDurableTopicSubscribers().length == 1;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(10)));
receiver.close();
assertTrue("Should be an inactive durable sub", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokerView.getDurableTopicSubscribers().length == 0 &&
brokerView.getInactiveDurableTopicSubscribers().length == 0;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(10)));
connection.close();
}
}