Refactor credit handling and drain state tracking to ensure we stay in
sync with the remote state and always answer drain requests.  Start
adding some more tests around drain to the interop suite.
This commit is contained in:
Timothy Bish 2016-06-01 18:30:31 -04:00
parent 6ae169e275
commit 8448cf1cb8
17 changed files with 969 additions and 96 deletions

View File

@ -142,7 +142,7 @@ public abstract class AmqpAbstractLink<LINK_TYPE extends Link> implements AmqpLi
}
/**
* Shorcut method to hand off an ActiveMQ Command to the broker and assign
* Shortcut method to hand off an ActiveMQ Command to the broker and assign
* a ResponseHandler to deal with any reply from the broker.
*
* @param command
@ -153,7 +153,7 @@ public abstract class AmqpAbstractLink<LINK_TYPE extends Link> implements AmqpLi
}
/**
* Shorcut method to hand off an ActiveMQ Command to the broker and assign
* Shortcut method to hand off an ActiveMQ Command to the broker and assign
* a ResponseHandler to deal with any reply from the broker.
*
* @param command

View File

@ -172,6 +172,7 @@ public class AmqpConnection implements AmqpProtocolConverter {
this.protonTransport.bind(this.protonConnection);
this.protonTransport.setChannelMax(CHANNEL_MAX);
this.protonTransport.setEmitFlowEventOnSend(false);
this.protonConnection.collect(eventCollector);

View File

@ -81,7 +81,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
private final ConsumerInfo consumerInfo;
private final boolean presettle;
private int currentCredit;
private boolean draining;
private long lastDeliveredSequenceId;
@ -101,7 +100,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) {
super(session, endpoint);
this.currentCredit = endpoint.getRemoteCredit();
this.consumerInfo = consumerInfo;
this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
}
@ -120,7 +118,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
if (!isClosed() && isOpened()) {
RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
sendToActiveMQ(removeCommand, null);
sendToActiveMQ(removeCommand);
session.unregisterSender(getConsumerId());
}
@ -133,7 +131,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
if (!isClosed() && isOpened()) {
RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
sendToActiveMQ(removeCommand, null);
sendToActiveMQ(removeCommand);
if (consumerInfo.isDurable()) {
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
@ -141,7 +139,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
rsi.setSubscriptionName(getEndpoint().getName());
rsi.setClientId(session.getConnection().getClientId());
sendToActiveMQ(rsi, null);
sendToActiveMQ(rsi);
}
session.unregisterSender(getConsumerId());
@ -152,17 +150,13 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
@Override
public void flow() throws Exception {
int updatedCredit = getEndpoint().getCredit();
if (LOG.isTraceEnabled()) {
LOG.trace("Flow: currentCredit={}, draining={}, drain={} credit={}, remoteCredit={}, queued={}",
currentCredit, draining, getEndpoint().getDrain(),
LOG.trace("Flow: draining={}, drain={} credit={}, remoteCredit={}, queued={}",
draining, getEndpoint().getDrain(),
getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
}
if (getEndpoint().getDrain() && (updatedCredit != currentCredit || !draining)) {
currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
draining = true;
if (getEndpoint().getDrain() && !draining) {
// Revert to a pull consumer.
ConsumerControl control = new ConsumerControl();
@ -170,35 +164,42 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
control.setDestination(getDestination());
control.setPrefetch(0);
LOG.trace("Flow: Pull case -> consumer control with prefetch (0)");
LOG.trace("Flow: Pull case -> consumer control with prefetch (0) to control output");
sendToActiveMQ(control, null);
sendToActiveMQ(control);
// Now request dispatch of the drain amount, we request immediate
// timeout and an completion message regardless so that we can know
// when we should marked the link as drained.
MessagePull pullRequest = new MessagePull();
pullRequest.setConsumerId(getConsumerId());
pullRequest.setDestination(getDestination());
pullRequest.setTimeout(-1);
pullRequest.setAlwaysSignalDone(true);
pullRequest.setQuantity(currentCredit);
if (endpoint.getCredit() > 0) {
draining = true;
LOG.trace("Pull case -> consumer pull request quantity = {}", currentCredit);
// Now request dispatch of the drain amount, we request immediate
// timeout and an completion message regardless so that we can know
// when we should marked the link as drained.
MessagePull pullRequest = new MessagePull();
pullRequest.setConsumerId(getConsumerId());
pullRequest.setDestination(getDestination());
pullRequest.setTimeout(-1);
pullRequest.setAlwaysSignalDone(true);
pullRequest.setQuantity(endpoint.getCredit());
sendToActiveMQ(pullRequest, null);
} else if (updatedCredit != currentCredit) {
currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
LOG.trace("Pull case -> consumer pull request quantity = {}", endpoint.getCredit());
sendToActiveMQ(pullRequest);
} else {
LOG.trace("Pull case -> sending any Queued messages and marking drained");
pumpOutbound();
getEndpoint().drained();
session.pumpProtonToSocket();
}
} else {
ConsumerControl control = new ConsumerControl();
control.setConsumerId(getConsumerId());
control.setDestination(getDestination());
control.setPrefetch(currentCredit);
control.setPrefetch(getEndpoint().getCredit());
LOG.trace("Flow: update -> consumer control with prefetch (0)");
LOG.trace("Flow: update -> consumer control with prefetch {}", control.getPrefetch());
sendToActiveMQ(control, null);
} else {
LOG.trace("Flow: no credit change -> no broker updates needed");
sendToActiveMQ(control);
}
}
@ -415,14 +416,29 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
// It's the end of browse signal in response to a MessagePull
getEndpoint().drained();
draining = false;
currentCredit = 0;
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Sender:[{}] msgId={} currentCredit={}, draining={}, drain={} credit={}, remoteCredit={}, queued={}",
getEndpoint().getName(), jms.getJMSMessageID(), currentCredit, draining, getEndpoint().getDrain(),
LOG.trace("Sender:[{}] msgId={} draining={}, drain={}, credit={}, remoteCredit={}, queued={}",
getEndpoint().getName(), jms.getJMSMessageID(), draining, getEndpoint().getDrain(),
getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
}
if (draining && getEndpoint().getCredit() == 0) {
LOG.trace("Sender:[{}] browse complete.", getEndpoint().getName());
getEndpoint().drained();
draining = false;
} else {
LOG.trace("Sender:[{}] updating conumser prefetch:{} after dispatch.",
getEndpoint().getName(), getEndpoint().getCredit());
ConsumerControl control = new ConsumerControl();
control.setConsumerId(getConsumerId());
control.setDestination(getDestination());
control.setPrefetch(Math.max(0, getEndpoint().getCredit() - 1));
sendToActiveMQ(control);
}
jms.setRedeliveryCounter(md.getRedeliveryCounter());
jms.setReadOnlyBody(true);
final EncodedMessage amqp = outboundTransformer.transform(jms);

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.amqp;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -85,6 +86,45 @@ public class JMSClientTestSupport extends AmqpTestSupport {
return amqpURI;
}
protected URI getAmqpURI() {
return getAmqpURI("");
}
protected URI getAmqpURI(String uriOptions) {
boolean useSSL = getBrokerURI().getScheme().toLowerCase().contains("ssl");
String amqpURI = (useSSL ? "amqps://" : "amqp://") + getBrokerURI().getHost() + ":" + getBrokerURI().getPort();
if (uriOptions != null && !uriOptions.isEmpty()) {
if (uriOptions.startsWith("?") || uriOptions.startsWith("&")) {
uriOptions = uriOptions.substring(1);
}
} else {
uriOptions = "";
}
if (useSSL) {
amqpURI += "?transport.verifyHost=false";
}
if (!uriOptions.isEmpty()) {
if (useSSL) {
amqpURI += "&" + uriOptions;
} else {
amqpURI += "?" + uriOptions;
}
}
URI result = getBrokerURI();
try {
result = new URI(amqpURI);
} catch (URISyntaxException e) {
}
return result;
}
protected Connection createConnection() throws JMSException {
return createConnection(name.toString(), false);
}

View File

@ -20,9 +20,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
@ -30,6 +35,7 @@ import javax.jms.TextMessage;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.activemq.util.Wait;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -186,13 +192,107 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize());
SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName());
assertNotNull(subscription);
LOG.info("Subscription[{}]: prefetch size after rollback = {}", subscription.getSubscriptionId(), subscription.getPrefetchSize());
assertTrue(subscription.getPrefetchSize() > 0);
for (int i = 1; i <= MSG_COUNT; i++) {
LOG.info("Trying to receive message: {}", i);
TextMessage message = (TextMessage) consumer.receive(1000);
assertNotNull("Message " + i + "should be available", message);
assertNotNull("Message " + i + " should be available", message);
assertEquals("Should get message: " + i, i , message.getIntProperty("MessageSequence"));
}
session.commit();
}
@Test(timeout = 60000)
public void testQueueTXRollbackAndCommitAsyncConsumer() throws Exception {
final int MSG_COUNT = 3;
final AtomicInteger counter = new AtomicInteger();
connection = createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue destination = session.createQueue(getDestinationName());
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
LOG.info("Received Message {}", message.getJMSMessageID());
} catch (JMSException e) {
}
counter.incrementAndGet();
}
});
int msgIndex = 0;
for (int i = 1; i <= MSG_COUNT; i++) {
LOG.info("Sending message: {} to rollback", msgIndex++);
TextMessage message = session.createTextMessage("Rolled back Message: " + msgIndex);
message.setIntProperty("MessageSequence", msgIndex);
producer.send(message);
}
LOG.info("ROLLBACK of sent message here:");
session.rollback();
assertEquals(0, getProxyToQueue(getDestinationName()).getQueueSize());
for (int i = 1; i <= MSG_COUNT; i++) {
LOG.info("Sending message: {} to commit", msgIndex++);
TextMessage message = session.createTextMessage("Commit Message: " + msgIndex);
message.setIntProperty("MessageSequence", msgIndex);
producer.send(message);
}
LOG.info("COMMIT of sent message here:");
session.commit();
assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize());
SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName());
assertNotNull(subscription);
LOG.info("Subscription[{}]: prefetch size after rollback = {}", subscription.getSubscriptionId(), subscription.getPrefetchSize());
assertTrue(subscription.getPrefetchSize() > 0);
assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return counter.get() == MSG_COUNT;
}
}));
LOG.info("COMMIT of first received batch here:");
session.commit();
assertTrue(subscription.getPrefetchSize() > 0);
for (int i = 1; i <= MSG_COUNT; i++) {
LOG.info("Sending message: {} to commit", msgIndex++);
TextMessage message = session.createTextMessage("Commit Message: " + msgIndex);
message.setIntProperty("MessageSequence", msgIndex);
producer.send(message);
}
LOG.info("COMMIT of next sent message batch here:");
session.commit();
LOG.info("WAITING -> for next three messages to arrive:");
assertTrue(subscription.getPrefetchSize() > 0);
assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("Read {} messages so far", counter.get());
return counter.get() == MSG_COUNT * 2;
}
}));
}
}

View File

@ -19,13 +19,18 @@ package org.apache.activemq.transport.amqp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.Enumeration;
import java.util.concurrent.TimeUnit;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.junit.ActiveMQTestRunner;
@ -45,12 +50,12 @@ public class JMSQueueBrowserTest extends JMSClientTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class);
@Test(timeout = 60000)
@Repeat(repetitions = 1)
@Repeat(repetitions = 5)
public void testBrowseAllInQueueZeroPrefetch() throws Exception {
final int MSG_COUNT = 5;
JmsConnectionFactory cf = new JmsConnectionFactory(getBrokerURI() + "?jms.prefetchPolicy.all=0");
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=0"));
connection = cf.createConnection();
connection.start();
@ -78,6 +83,242 @@ public class JMSQueueBrowserTest extends JMSClientTestSupport {
assertEquals(5, count);
}
@Test(timeout = 40000)
public void testCreateQueueBrowser() throws Exception {
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI());
connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Queue queue = session.createQueue(getDestinationName());
session.createConsumer(queue).close();
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
assertEquals(0, proxy.getQueueSize());
}
@Test(timeout = 40000)
public void testNoMessagesBrowserHasNoElements() throws Exception {
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI());
connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Queue queue = session.createQueue(getDestinationName());
session.createConsumer(queue).close();
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
assertEquals(0, proxy.getQueueSize());
Enumeration<?> enumeration = browser.getEnumeration();
assertFalse(enumeration.hasMoreElements());
}
@Test(timeout=30000)
public void testBroseOneInQueue() throws Exception {
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI());
connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello"));
producer.close();
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> enumeration = browser.getEnumeration();
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
assertTrue(m instanceof TextMessage);
LOG.debug("Browsed message {} from Queue {}", m, queue);
}
browser.close();
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(5000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
}
@Test(timeout = 60000)
@Repeat(repetitions = 5)
public void testBrowseAllInQueue() throws Exception {
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI());
connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Queue queue = session.createQueue(getDestinationName());
sendMessages(name.getMethodName(), 5, false);
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
assertEquals(5, proxy.getQueueSize());
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration<?> enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
TimeUnit.MILLISECONDS.sleep(50);
}
assertFalse(enumeration.hasMoreElements());
assertEquals(5, count);
}
@Test(timeout = 60000)
@Repeat(repetitions = 5)
public void testBrowseAllInQueuePrefetchOne() throws Exception {
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=1"));
connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Queue queue = session.createQueue(getDestinationName());
sendMessages(name.getMethodName(), 5, false);
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
assertEquals(5, proxy.getQueueSize());
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration<?> enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
}
assertFalse(enumeration.hasMoreElements());
assertEquals(5, count);
}
@Test(timeout = 40000)
public void testBrowseAllInQueueTxSession() throws Exception {
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI());
connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
assertNotNull(session);
Queue queue = session.createQueue(getDestinationName());
sendMessages(name.getMethodName(), 5, false);
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
assertEquals(5, proxy.getQueueSize());
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration<?> enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
}
assertFalse(enumeration.hasMoreElements());
assertEquals(5, count);
}
@Test(timeout = 40000)
public void testQueueBrowserInTxSessionLeavesOtherWorkUnaffected() throws Exception {
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI());
connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
assertNotNull(session);
Queue queue = session.createQueue(getDestinationName());
sendMessages(name.getMethodName(), 5, false);
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
assertEquals(5, proxy.getQueueSize());
// Send some TX work but don't commit.
MessageProducer txProducer = session.createProducer(queue);
for (int i = 0; i < 5; ++i) {
txProducer.send(session.createMessage());
}
assertEquals(5, proxy.getQueueSize());
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration<?> enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
}
assertFalse(enumeration.hasMoreElements());
assertEquals(5, count);
browser.close();
// Now check that all browser work did not affect the session transaction.
assertEquals(5, proxy.getQueueSize());
session.commit();
assertEquals(10, proxy.getQueueSize());
}
@Test(timeout = 60000)
public void testBrowseAllInQueueSmallPrefetch() throws Exception {
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=5"));
connection = cf.createConnection();
connection.start();
final int MSG_COUNT = 30;
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Queue queue = session.createQueue(getDestinationName());
sendMessages(name.getMethodName(), MSG_COUNT, false);
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
assertEquals(MSG_COUNT, proxy.getQueueSize());
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration<?> enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
}
assertFalse(enumeration.hasMoreElements());
assertEquals(MSG_COUNT, count);
}
@Override
protected boolean isUseOpenWireConnector() {
return true;

View File

@ -152,6 +152,19 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
connection.fireClientException(error);
}
@Override
public void locallyClosed(AmqpConnection connection, Exception error) {
if (endpoint != null) {
// TODO: if this is a producer/consumer link then we may only be detached,
// rather than fully closed, and should respond appropriately.
endpoint.close();
}
LOG.info("Resource {} was locally closed", this);
connection.fireClientException(error);
}
public E getEndpoint() {
return this.endpoint;
}

View File

@ -17,9 +17,6 @@
package org.apache.activemq.transport.amqp.client;
import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.net.URI;
@ -39,8 +36,10 @@ import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator;
import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.IdGenerator;
import org.apache.activemq.transport.amqp.client.util.NoOpAsyncResult;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.Collector;
@ -54,10 +53,16 @@ import org.apache.qpid.proton.engine.impl.CollectorImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
public class AmqpConnection extends AmqpAbstractResource<Connection> implements NettyTransportListener {
private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult();
private static final int DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 1;
// NOTE: Limit default channel max to signed short range to deal with
// brokers that don't currently handle the unsigned range well.
@ -66,6 +71,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
public static final long DEFAULT_CONNECT_TIMEOUT = 515000;
public static final long DEFAULT_CLOSE_TIMEOUT = 30000;
public static final long DEFAULT_DRAIN_TIMEOUT = 60000;
private final ScheduledExecutorService serializer;
private final AtomicBoolean closed = new AtomicBoolean();
@ -95,6 +101,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private int channelMax = DEFAULT_CHANNEL_MAX;
private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
private long drainTimeout = DEFAULT_DRAIN_TIMEOUT;
public AmqpConnection(NettyTransport transport, String username, String password) {
setEndpoint(Connection.Factory.create());
@ -150,7 +157,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
authenticator = new SaslAuthenticator(sasl, username, password, authzid, mechanismRestriction);
open(future);
pumpToProtonTransport();
pumpToProtonTransport(future);
}
});
@ -190,7 +197,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
request.onSuccess();
}
pumpToProtonTransport();
pumpToProtonTransport(request);
} catch (Exception e) {
LOG.debug("Caught exception while closing proton connection");
}
@ -241,7 +248,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
session.setEndpoint(getEndpoint().session());
session.setStateInspector(getStateInspector());
session.open(request);
pumpToProtonTransport();
pumpToProtonTransport(request);
}
});
@ -355,6 +362,14 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
this.closeTimeout = closeTimeout;
}
public long getDrainTimeout() {
return drainTimeout;
}
public void setDrainTimeout(long drainTimeout) {
this.drainTimeout = drainTimeout;
}
public List<Symbol> getOfferedCapabilities() {
return offeredCapabilities;
}
@ -439,6 +454,10 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
}
void pumpToProtonTransport() {
pumpToProtonTransport(NOOP_REQUEST);
}
void pumpToProtonTransport(AsyncResult request) {
try {
boolean done = false;
while (!done) {
@ -454,6 +473,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
}
} catch (IOException e) {
fireClientException(e);
request.onFailure(e);
}
}

View File

@ -26,14 +26,17 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.InvalidDestinationException;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver;
import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
@ -74,6 +77,9 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
private boolean presettle;
private boolean noLocal;
private AsyncResult pullRequest;
private AsyncResult stopRequest;
/**
* Create a new receiver instance.
*
@ -133,7 +139,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
public void run() {
checkClosed();
close(request);
session.pumpToProtonTransport();
session.pumpToProtonTransport(request);
}
});
@ -156,7 +162,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
public void run() {
checkClosed();
detach(request);
session.pumpToProtonTransport();
session.pumpToProtonTransport(request);
}
});
@ -222,6 +228,108 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
return prefetch.poll();
}
/**
* Request a remote peer send a Message to this client waiting until one arrives.
*
* @return the pulled AmqpMessage or null if none was pulled from the remote.
*
* @throws IOException if an error occurs
*/
public AmqpMessage pull() throws IOException {
return pull(-1, TimeUnit.MILLISECONDS);
}
/**
* Request a remote peer send a Message to this client using an immediate drain request.
*
* @return the pulled AmqpMessage or null if none was pulled from the remote.
*
* @throws IOException if an error occurs
*/
public AmqpMessage pullImmediate() throws IOException {
return pull(0, TimeUnit.MILLISECONDS);
}
/**
* Request a remote peer send a Message to this client.
*
* {@literal timeout < 0} then it should remain open until a message is received.
* {@literal timeout = 0} then it returns a message or null if none available
* {@literal timeout > 0} then it should remain open for timeout amount of time.
*
* The timeout value when positive is given in milliseconds.
*
* @param timeout
* the amount of time to tell the remote peer to keep this pull request valid.
* @param unit
* the unit of measure that the timeout represents.
*
* @return the pulled AmqpMessage or null if none was pulled from the remote.
*
* @throws IOException if an error occurs
*/
public AmqpMessage pull(final long timeout, final TimeUnit unit) throws IOException {
checkClosed();
final ClientFuture request = new ClientFuture();
session.getScheduler().execute(new Runnable() {
@Override
public void run() {
checkClosed();
long timeoutMills = unit.toMillis(timeout);
try {
LOG.trace("Pull on Receiver {} with timeout = {}", getSubscriptionName(), timeoutMills);
if (timeoutMills < 0) {
// Wait until message arrives. Just give credit if needed.
if (getEndpoint().getCredit() == 0) {
LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName());
getEndpoint().flow(1);
}
// Await the message arrival
pullRequest = request;
} else if (timeoutMills == 0) {
// If we have no credit then we need to issue some so that we can
// try to fulfill the request, then drain down what is there to
// ensure we consume what is available and remove all credit.
if (getEndpoint().getCredit() == 0){
LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName());
getEndpoint().flow(1);
}
// Drain immediately and wait for the message(s) to arrive,
// or a flow indicating removal of the remaining credit.
stop(request);
} else if (timeoutMills > 0) {
// If we have no credit then we need to issue some so that we can
// try to fulfill the request, then drain down what is there to
// ensure we consume what is available and remove all credit.
if (getEndpoint().getCredit() == 0) {
LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName());
getEndpoint().flow(1);
}
// Wait for the timeout for the message(s) to arrive, then drain if required
// and wait for remaining message(s) to arrive or a flow indicating
// removal of the remaining credit.
stopOnSchedule(timeoutMills, request);
}
session.pumpToProtonTransport(request);
} catch (Exception e) {
request.onFailure(e);
}
}
});
request.sync();
return prefetch.poll();
}
/**
* Controls the amount of credit given to the receiver link.
*
@ -240,7 +348,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
checkClosed();
try {
getEndpoint().flow(credit);
session.pumpToProtonTransport();
session.pumpToProtonTransport(request);
request.onSuccess();
} catch (Exception e) {
request.onFailure(e);
@ -269,7 +377,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
checkClosed();
try {
getEndpoint().drain(credit);
session.pumpToProtonTransport();
session.pumpToProtonTransport(request);
request.onSuccess();
} catch (Exception e) {
request.onFailure(e);
@ -280,6 +388,31 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
request.sync();
}
/**
* Stops the receiver, using all link credit and waiting for in-flight messages to arrive.
*
* @throws IOException if an error occurs while sending the drain.
*/
public void stop() throws IOException {
checkClosed();
final ClientFuture request = new ClientFuture();
session.getScheduler().execute(new Runnable() {
@Override
public void run() {
checkClosed();
try {
stop(request);
session.pumpToProtonTransport(request);
} catch (Exception e) {
request.onFailure(e);
}
}
});
request.sync();
}
/**
* Accepts a message that was dispatched under the given Delivery instance.
*
@ -318,7 +451,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
delivery.settle();
}
}
session.pumpToProtonTransport();
session.pumpToProtonTransport(request);
request.onSuccess();
} catch (Exception e) {
request.onFailure(e);
@ -360,7 +493,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
disposition.setDeliveryFailed(deliveryFailed);
delivery.disposition(disposition);
delivery.settle();
session.pumpToProtonTransport();
session.pumpToProtonTransport(request);
}
request.onSuccess();
} catch (Exception e) {
@ -397,7 +530,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
if (!delivery.isSettled()) {
delivery.disposition(Released.getInstance());
delivery.settle();
session.pumpToProtonTransport();
session.pumpToProtonTransport(request);
}
request.onSuccess();
} catch (Exception e) {
@ -454,6 +587,10 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
this.noLocal = noLocal;
}
public long getDrainTimeout() {
return session.getConnection().getDrainTimeout();
}
//----- Internal implementation ------------------------------------------//
@Override
@ -604,6 +741,15 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
LOG.trace("{} has a partial incoming Message(s), deferring.", this);
incoming = null;
}
} else {
// We have exhausted the locally queued messages on this link.
// Check if we tried to stop and have now run out of credit.
if (getEndpoint().getRemoteCredit() <= 0) {
if (stopRequest != null) {
stopRequest.onSuccess();
stopRequest = null;
}
}
}
} while (incoming != null);
@ -624,6 +770,35 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
// Store reference to envelope in delivery context for recovery
incoming.setContext(amqpMessage);
prefetch.add(amqpMessage);
// We processed a message, signal completion
// of a message pull request if there is one.
if (pullRequest != null) {
pullRequest.onSuccess();
pullRequest = null;
}
}
@Override
public void processFlowUpdates(AmqpConnection connection) throws IOException {
if (pullRequest != null || stopRequest != null) {
Receiver receiver = getEndpoint();
if (receiver.getRemoteCredit() <= 0 && receiver.getQueued() == 0) {
if (pullRequest != null) {
pullRequest.onSuccess();
pullRequest = null;
}
if (stopRequest != null) {
stopRequest.onSuccess();
stopRequest = null;
}
}
}
LOG.trace("Consumer {} flow updated, remote credit = {}", getSubscriptionName(), getEndpoint().getRemoteCredit());
super.processFlowUpdates(connection);
}
protected Message decodeIncomingMessage(Delivery incoming) {
@ -661,6 +836,61 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
}
}
private void stop(final AsyncResult request) {
Receiver receiver = getEndpoint();
if (receiver.getRemoteCredit() <= 0) {
if (receiver.getQueued() == 0) {
// We have no remote credit and all the deliveries have been processed.
request.onSuccess();
} else {
// There are still deliveries to process, wait for them to be.
stopRequest = request;
}
} else {
// TODO: We don't actually want the additional messages that could be sent while
// draining. We could explicitly reduce credit first, or possibly use 'echo' instead
// of drain if it was supported. We would first need to understand what happens
// if we reduce credit below the number of messages already in-flight before
// the peer sees the update.
stopRequest = request;
receiver.drain(0);
if (getDrainTimeout() > 0) {
// If the remote doesn't respond we will close the consumer and break any
// blocked receive or stop calls that are waiting.
final ScheduledFuture<?> future = getSession().getScheduler().schedule(new Runnable() {
@Override
public void run() {
LOG.trace("Consumer {} drain request timed out", this);
Exception cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time");
locallyClosed(session.getConnection(), cause);
stopRequest.onFailure(cause);
session.pumpToProtonTransport(stopRequest);
}
}, getDrainTimeout(), TimeUnit.MILLISECONDS);
stopRequest = new ScheduledRequest(future, stopRequest);
}
}
}
private void stopOnSchedule(long timeout, final AsyncResult request) {
LOG.trace("Receiver {} scheduling stop", this);
// We need to drain the credit if no message(s) arrive to use it.
final ScheduledFuture<?> future = getSession().getScheduler().schedule(new Runnable() {
@Override
public void run() {
LOG.trace("Receiver {} running scheduled stop", this);
if (getEndpoint().getRemoteCredit() != 0) {
stop(request);
session.pumpToProtonTransport(request);
}
}
}, timeout, TimeUnit.MILLISECONDS);
stopRequest = new ScheduledRequest(future, request);
}
@Override
public String toString() {
return getClass().getSimpleName() + "{ address = " + address + "}";
@ -685,4 +915,37 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
void postRollback() {
}
//----- Inner classes used in message pull operations --------------------//
protected static final class ScheduledRequest implements AsyncResult {
private final ScheduledFuture<?> sheduledTask;
private final AsyncResult origRequest;
public ScheduledRequest(ScheduledFuture<?> completionTask, AsyncResult origRequest) {
this.sheduledTask = completionTask;
this.origRequest = origRequest;
}
@Override
public void onFailure(Throwable cause) {
sheduledTask.cancel(false);
origRequest.onFailure(cause);
}
@Override
public void onSuccess() {
boolean cancelled = sheduledTask.cancel(false);
if (cancelled) {
// Signal completion. Otherwise wait for the scheduled task to do it.
origRequest.onSuccess();
}
}
@Override
public boolean isComplete() {
return origRequest.isComplete();
}
}
}

View File

@ -91,6 +91,18 @@ public interface AmqpResource extends AmqpEventSink {
*/
void remotelyClosed(AmqpConnection connection);
/**
* Called to indicate that the local end has become closed but the resource
* was not awaiting a close. This could happen during an open request where
* the remote does not set an error condition or during normal operation.
*
* @param connection
* The connection that owns this resource.
* @param error
* The error that triggered the local close of this resource.
*/
void locallyClosed(AmqpConnection connection, Exception error);
/**
* Sets the failed state for this Resource and triggers a failure signal for
* any pending ProduverRequest.

View File

@ -135,7 +135,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
public void run() {
try {
doSend(message, sendRequest);
session.pumpToProtonTransport();
session.pumpToProtonTransport(sendRequest);
} catch (Exception e) {
sendRequest.onFailure(e);
session.getConnection().fireClientException(e);
@ -165,7 +165,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
public void run() {
checkClosed();
close(request);
session.pumpToProtonTransport();
session.pumpToProtonTransport(request);
}
});

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp.client;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
import org.apache.qpid.proton.amqp.messaging.Source;
@ -92,7 +93,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
checkClosed();
sender.setStateInspector(getStateInspector());
sender.open(request);
pumpToProtonTransport();
pumpToProtonTransport(request);
}
});
@ -124,7 +125,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
checkClosed();
sender.setStateInspector(getStateInspector());
sender.open(request);
pumpToProtonTransport();
pumpToProtonTransport(request);
}
});
@ -195,7 +196,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
checkClosed();
receiver.setStateInspector(getStateInspector());
receiver.open(request);
pumpToProtonTransport();
pumpToProtonTransport(request);
}
});
@ -227,7 +228,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
checkClosed();
receiver.setStateInspector(getStateInspector());
receiver.open(request);
pumpToProtonTransport();
pumpToProtonTransport(request);
}
});
@ -308,7 +309,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
checkClosed();
receiver.setStateInspector(getStateInspector());
receiver.open(request);
pumpToProtonTransport();
pumpToProtonTransport(request);
}
});
@ -345,7 +346,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
checkClosed();
receiver.setStateInspector(getStateInspector());
receiver.open(request);
pumpToProtonTransport();
pumpToProtonTransport(request);
}
});
@ -427,8 +428,8 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
return connection.getProtonConnection();
}
void pumpToProtonTransport() {
connection.pumpToProtonTransport();
void pumpToProtonTransport(AsyncResult request) {
connection.pumpToProtonTransport(request);
}
AmqpTransactionId getTransactionId() {

View File

@ -111,7 +111,7 @@ public class AmqpTransactionContext {
}
}
session.pumpToProtonTransport();
session.pumpToProtonTransport(request);
}
});
@ -153,7 +153,7 @@ public class AmqpTransactionContext {
try {
LOG.info("Attempting to commit TX:[{}]", transactionId);
coordinator.discharge(transactionId, request, true);
session.pumpToProtonTransport();
session.pumpToProtonTransport(request);
} catch (Exception e) {
request.onFailure(e);
}
@ -198,7 +198,7 @@ public class AmqpTransactionContext {
try {
LOG.info("Attempting to roll back TX:[{}]", transactionId);
coordinator.discharge(transactionId, request, false);
session.pumpToProtonTransport();
session.pumpToProtonTransport(request);
} catch (Exception e) {
request.onFailure(e);
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
/**
* Simple NoOp implementation used when the result of the operation does not matter.
*/
public class NoOpAsyncResult implements AsyncResult {
public final static NoOpAsyncResult INSTANCE = new NoOpAsyncResult();
@Override
public void onFailure(Throwable result) {
}
@Override
public void onSuccess() {
}
@Override
public boolean isComplete() {
return true;
}
}

View File

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
/**
* Tests various behaviors of broker side drain support.
*/
public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testReceiverCanDrainMessages() throws Exception {
int MSG_COUNT = 20;
sendMessages(getTestName(), MSG_COUNT, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
QueueViewMBean queueView = getProxyToQueue(getTestName());
assertEquals(MSG_COUNT, queueView.getQueueSize());
assertEquals(0, queueView.getDispatchCount());
receiver.drain(MSG_COUNT);
for (int i = 0; i < MSG_COUNT; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
}
receiver.close();
assertEquals(0, queueView.getQueueSize());
connection.close();
}
@Test(timeout = 60000)
public void testPullWithNoMessageGetDrained() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
receiver.flow(10);
QueueViewMBean queueView = getProxyToQueue(getTestName());
assertEquals(0, queueView.getQueueSize());
assertEquals(0, queueView.getDispatchCount());
assertEquals(10, receiver.getReceiver().getRemoteCredit());
assertNull(receiver.pull(1, TimeUnit.SECONDS));
assertEquals(0, receiver.getReceiver().getRemoteCredit());
connection.close();
}
@Test(timeout = 60000)
public void testPullOneFromRemote() throws Exception {
int MSG_COUNT = 20;
sendMessages(getTestName(), MSG_COUNT, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
QueueViewMBean queueView = getProxyToQueue(getTestName());
assertEquals(MSG_COUNT, queueView.getQueueSize());
assertEquals(0, queueView.getDispatchCount());
assertEquals(0, receiver.getReceiver().getRemoteCredit());
AmqpMessage message = receiver.pull(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
assertEquals(0, receiver.getReceiver().getRemoteCredit());
receiver.close();
assertEquals(MSG_COUNT - 1, queueView.getQueueSize());
assertEquals(1, queueView.getDispatchCount());
connection.close();
}
@Test(timeout = 60000)
public void testMultipleZeroResultPulls() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
receiver.flow(10);
QueueViewMBean queueView = getProxyToQueue(getTestName());
assertEquals(0, queueView.getQueueSize());
assertEquals(0, queueView.getDispatchCount());
assertEquals(10, receiver.getReceiver().getRemoteCredit());
assertNull(receiver.pull(1, TimeUnit.SECONDS));
assertEquals(0, receiver.getReceiver().getRemoteCredit());
assertNull(receiver.pull(1, TimeUnit.SECONDS));
assertNull(receiver.pull(1, TimeUnit.SECONDS));
assertEquals(0, receiver.getReceiver().getRemoteCredit());
connection.close();
}
@Override
protected boolean isUseOpenWireConnector() {
return true;
}
}

View File

@ -341,34 +341,6 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 60000)
public void testReceiverCanDrainMessages() throws Exception {
int MSG_COUNT = 20;
sendMessages(getTestName(), MSG_COUNT, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
QueueViewMBean queueView = getProxyToQueue(getTestName());
assertEquals(MSG_COUNT, queueView.getQueueSize());
assertEquals(0, queueView.getDispatchCount());
receiver.drain(MSG_COUNT);
for (int i = 0; i < MSG_COUNT; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
message.accept();
}
receiver.close();
assertEquals(0, queueView.getQueueSize());
connection.close();
}
@Test(timeout = 60000)
public void testUnsupportedFiltersAreNotListedAsSupported() throws Exception {
AmqpClient client = createAmqpClient();

View File

@ -20,7 +20,7 @@
#
log4j.rootLogger=WARN, console, file
log4j.logger.org.apache.activemq=INFO
log4j.logger.org.apache.activemq.transport.amqp=DEBUG
log4j.logger.org.apache.activemq.transport.amqp=INFO
log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO
log4j.logger.org.fusesource=INFO
@ -30,7 +30,7 @@ log4j.logger.org.apache.qpid.jms.provider=INFO
log4j.logger.org.apache.qpid.jms.provider.amqp=INFO
log4j.logger.org.apache.qpid.jms.provider.amqp.FRAMES=INFO
# Console will only display warnnings
# Console will only display warnings
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d [%-15.15t] - %-5p %-25.30c{1} - %m%n