mirror of https://github.com/apache/activemq.git
Refactor credit handling and drain state tracking to ensure we stay in
sync with the remote state and always answer drain requests. Start
adding some more tests around drain to the interop suite.
(cherry picked from commit 8448cf1cb8
)
This commit is contained in:
parent
508c12d948
commit
8916beea42
|
@ -142,7 +142,7 @@ public abstract class AmqpAbstractLink<LINK_TYPE extends Link> implements AmqpLi
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shorcut method to hand off an ActiveMQ Command to the broker and assign
|
* Shortcut method to hand off an ActiveMQ Command to the broker and assign
|
||||||
* a ResponseHandler to deal with any reply from the broker.
|
* a ResponseHandler to deal with any reply from the broker.
|
||||||
*
|
*
|
||||||
* @param command
|
* @param command
|
||||||
|
@ -153,7 +153,7 @@ public abstract class AmqpAbstractLink<LINK_TYPE extends Link> implements AmqpLi
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shorcut method to hand off an ActiveMQ Command to the broker and assign
|
* Shortcut method to hand off an ActiveMQ Command to the broker and assign
|
||||||
* a ResponseHandler to deal with any reply from the broker.
|
* a ResponseHandler to deal with any reply from the broker.
|
||||||
*
|
*
|
||||||
* @param command
|
* @param command
|
||||||
|
|
|
@ -173,6 +173,7 @@ public class AmqpConnection implements AmqpProtocolConverter {
|
||||||
|
|
||||||
this.protonTransport.bind(this.protonConnection);
|
this.protonTransport.bind(this.protonConnection);
|
||||||
this.protonTransport.setChannelMax(CHANNEL_MAX);
|
this.protonTransport.setChannelMax(CHANNEL_MAX);
|
||||||
|
this.protonTransport.setEmitFlowEventOnSend(false);
|
||||||
|
|
||||||
this.protonConnection.collect(eventCollector);
|
this.protonConnection.collect(eventCollector);
|
||||||
|
|
||||||
|
|
|
@ -81,7 +81,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
private final ConsumerInfo consumerInfo;
|
private final ConsumerInfo consumerInfo;
|
||||||
private final boolean presettle;
|
private final boolean presettle;
|
||||||
|
|
||||||
private int currentCredit;
|
|
||||||
private boolean draining;
|
private boolean draining;
|
||||||
private long lastDeliveredSequenceId;
|
private long lastDeliveredSequenceId;
|
||||||
|
|
||||||
|
@ -101,7 +100,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) {
|
public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) {
|
||||||
super(session, endpoint);
|
super(session, endpoint);
|
||||||
|
|
||||||
this.currentCredit = endpoint.getRemoteCredit();
|
|
||||||
this.consumerInfo = consumerInfo;
|
this.consumerInfo = consumerInfo;
|
||||||
this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
|
this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
|
||||||
}
|
}
|
||||||
|
@ -120,7 +118,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
if (!isClosed() && isOpened()) {
|
if (!isClosed() && isOpened()) {
|
||||||
RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
|
RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
|
||||||
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
|
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
|
||||||
sendToActiveMQ(removeCommand, null);
|
sendToActiveMQ(removeCommand);
|
||||||
|
|
||||||
session.unregisterSender(getConsumerId());
|
session.unregisterSender(getConsumerId());
|
||||||
}
|
}
|
||||||
|
@ -133,7 +131,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
if (!isClosed() && isOpened()) {
|
if (!isClosed() && isOpened()) {
|
||||||
RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
|
RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
|
||||||
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
|
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
|
||||||
sendToActiveMQ(removeCommand, null);
|
sendToActiveMQ(removeCommand);
|
||||||
|
|
||||||
if (consumerInfo.isDurable()) {
|
if (consumerInfo.isDurable()) {
|
||||||
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
|
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
|
||||||
|
@ -141,7 +139,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
rsi.setSubscriptionName(getEndpoint().getName());
|
rsi.setSubscriptionName(getEndpoint().getName());
|
||||||
rsi.setClientId(session.getConnection().getClientId());
|
rsi.setClientId(session.getConnection().getClientId());
|
||||||
|
|
||||||
sendToActiveMQ(rsi, null);
|
sendToActiveMQ(rsi);
|
||||||
}
|
}
|
||||||
|
|
||||||
session.unregisterSender(getConsumerId());
|
session.unregisterSender(getConsumerId());
|
||||||
|
@ -152,17 +150,13 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void flow() throws Exception {
|
public void flow() throws Exception {
|
||||||
int updatedCredit = getEndpoint().getCredit();
|
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Flow: currentCredit={}, draining={}, drain={} credit={}, remoteCredit={}, queued={}",
|
LOG.trace("Flow: draining={}, drain={} credit={}, remoteCredit={}, queued={}",
|
||||||
currentCredit, draining, getEndpoint().getDrain(),
|
draining, getEndpoint().getDrain(),
|
||||||
getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
|
getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (getEndpoint().getDrain() && (updatedCredit != currentCredit || !draining)) {
|
if (getEndpoint().getDrain() && !draining) {
|
||||||
currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
|
|
||||||
draining = true;
|
|
||||||
|
|
||||||
// Revert to a pull consumer.
|
// Revert to a pull consumer.
|
||||||
ConsumerControl control = new ConsumerControl();
|
ConsumerControl control = new ConsumerControl();
|
||||||
|
@ -170,9 +164,12 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
control.setDestination(getDestination());
|
control.setDestination(getDestination());
|
||||||
control.setPrefetch(0);
|
control.setPrefetch(0);
|
||||||
|
|
||||||
LOG.trace("Flow: Pull case -> consumer control with prefetch (0)");
|
LOG.trace("Flow: Pull case -> consumer control with prefetch (0) to control output");
|
||||||
|
|
||||||
sendToActiveMQ(control, null);
|
sendToActiveMQ(control);
|
||||||
|
|
||||||
|
if (endpoint.getCredit() > 0) {
|
||||||
|
draining = true;
|
||||||
|
|
||||||
// Now request dispatch of the drain amount, we request immediate
|
// Now request dispatch of the drain amount, we request immediate
|
||||||
// timeout and an completion message regardless so that we can know
|
// timeout and an completion message regardless so that we can know
|
||||||
|
@ -182,23 +179,27 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
pullRequest.setDestination(getDestination());
|
pullRequest.setDestination(getDestination());
|
||||||
pullRequest.setTimeout(-1);
|
pullRequest.setTimeout(-1);
|
||||||
pullRequest.setAlwaysSignalDone(true);
|
pullRequest.setAlwaysSignalDone(true);
|
||||||
pullRequest.setQuantity(currentCredit);
|
pullRequest.setQuantity(endpoint.getCredit());
|
||||||
|
|
||||||
LOG.trace("Pull case -> consumer pull request quantity = {}", currentCredit);
|
LOG.trace("Pull case -> consumer pull request quantity = {}", endpoint.getCredit());
|
||||||
|
|
||||||
sendToActiveMQ(pullRequest, null);
|
sendToActiveMQ(pullRequest);
|
||||||
} else if (updatedCredit != currentCredit) {
|
} else {
|
||||||
currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
|
LOG.trace("Pull case -> sending any Queued messages and marking drained");
|
||||||
|
|
||||||
|
pumpOutbound();
|
||||||
|
getEndpoint().drained();
|
||||||
|
session.pumpProtonToSocket();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
ConsumerControl control = new ConsumerControl();
|
ConsumerControl control = new ConsumerControl();
|
||||||
control.setConsumerId(getConsumerId());
|
control.setConsumerId(getConsumerId());
|
||||||
control.setDestination(getDestination());
|
control.setDestination(getDestination());
|
||||||
control.setPrefetch(currentCredit);
|
control.setPrefetch(getEndpoint().getCredit());
|
||||||
|
|
||||||
LOG.trace("Flow: update -> consumer control with prefetch (0)");
|
LOG.trace("Flow: update -> consumer control with prefetch {}", control.getPrefetch());
|
||||||
|
|
||||||
sendToActiveMQ(control, null);
|
sendToActiveMQ(control);
|
||||||
} else {
|
|
||||||
LOG.trace("Flow: no credit change -> no broker updates needed");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -415,14 +416,29 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
// It's the end of browse signal in response to a MessagePull
|
// It's the end of browse signal in response to a MessagePull
|
||||||
getEndpoint().drained();
|
getEndpoint().drained();
|
||||||
draining = false;
|
draining = false;
|
||||||
currentCredit = 0;
|
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Sender:[{}] msgId={} currentCredit={}, draining={}, drain={} credit={}, remoteCredit={}, queued={}",
|
LOG.trace("Sender:[{}] msgId={} draining={}, drain={}, credit={}, remoteCredit={}, queued={}",
|
||||||
getEndpoint().getName(), jms.getJMSMessageID(), currentCredit, draining, getEndpoint().getDrain(),
|
getEndpoint().getName(), jms.getJMSMessageID(), draining, getEndpoint().getDrain(),
|
||||||
getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
|
getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (draining && getEndpoint().getCredit() == 0) {
|
||||||
|
LOG.trace("Sender:[{}] browse complete.", getEndpoint().getName());
|
||||||
|
getEndpoint().drained();
|
||||||
|
draining = false;
|
||||||
|
} else {
|
||||||
|
LOG.trace("Sender:[{}] updating conumser prefetch:{} after dispatch.",
|
||||||
|
getEndpoint().getName(), getEndpoint().getCredit());
|
||||||
|
|
||||||
|
ConsumerControl control = new ConsumerControl();
|
||||||
|
control.setConsumerId(getConsumerId());
|
||||||
|
control.setDestination(getDestination());
|
||||||
|
control.setPrefetch(Math.max(0, getEndpoint().getCredit() - 1));
|
||||||
|
|
||||||
|
sendToActiveMQ(control);
|
||||||
|
}
|
||||||
|
|
||||||
jms.setRedeliveryCounter(md.getRedeliveryCounter());
|
jms.setRedeliveryCounter(md.getRedeliveryCounter());
|
||||||
jms.setReadOnlyBody(true);
|
jms.setReadOnlyBody(true);
|
||||||
final EncodedMessage amqp = outboundTransformer.transform(jms);
|
final EncodedMessage amqp = outboundTransformer.transform(jms);
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.activemq.transport.amqp;
|
package org.apache.activemq.transport.amqp;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
@ -85,6 +86,45 @@ public class JMSClientTestSupport extends AmqpTestSupport {
|
||||||
return amqpURI;
|
return amqpURI;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected URI getAmqpURI() {
|
||||||
|
return getAmqpURI("");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected URI getAmqpURI(String uriOptions) {
|
||||||
|
|
||||||
|
boolean useSSL = getBrokerURI().getScheme().toLowerCase().contains("ssl");
|
||||||
|
|
||||||
|
String amqpURI = (useSSL ? "amqps://" : "amqp://") + getBrokerURI().getHost() + ":" + getBrokerURI().getPort();
|
||||||
|
|
||||||
|
if (uriOptions != null && !uriOptions.isEmpty()) {
|
||||||
|
if (uriOptions.startsWith("?") || uriOptions.startsWith("&")) {
|
||||||
|
uriOptions = uriOptions.substring(1);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
uriOptions = "";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (useSSL) {
|
||||||
|
amqpURI += "?transport.verifyHost=false";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!uriOptions.isEmpty()) {
|
||||||
|
if (useSSL) {
|
||||||
|
amqpURI += "&" + uriOptions;
|
||||||
|
} else {
|
||||||
|
amqpURI += "?" + uriOptions;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
URI result = getBrokerURI();
|
||||||
|
try {
|
||||||
|
result = new URI(amqpURI);
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
protected Connection createConnection() throws JMSException {
|
protected Connection createConnection() throws JMSException {
|
||||||
return createConnection(name.toString(), false);
|
return createConnection(name.toString(), false);
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,9 +20,14 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import javax.jms.DeliveryMode;
|
import javax.jms.DeliveryMode;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageListener;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
@ -30,6 +35,7 @@ import javax.jms.TextMessage;
|
||||||
|
|
||||||
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||||
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
|
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
|
||||||
|
import org.apache.activemq.util.Wait;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -186,13 +192,107 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
|
||||||
assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize());
|
assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize());
|
||||||
SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName());
|
SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName());
|
||||||
assertNotNull(subscription);
|
assertNotNull(subscription);
|
||||||
|
LOG.info("Subscription[{}]: prefetch size after rollback = {}", subscription.getSubscriptionId(), subscription.getPrefetchSize());
|
||||||
assertTrue(subscription.getPrefetchSize() > 0);
|
assertTrue(subscription.getPrefetchSize() > 0);
|
||||||
|
|
||||||
for (int i = 1; i <= MSG_COUNT; i++) {
|
for (int i = 1; i <= MSG_COUNT; i++) {
|
||||||
LOG.info("Trying to receive message: {}", i);
|
LOG.info("Trying to receive message: {}", i);
|
||||||
TextMessage message = (TextMessage) consumer.receive(1000);
|
TextMessage message = (TextMessage) consumer.receive(1000);
|
||||||
assertNotNull("Message " + i + "should be available", message);
|
assertNotNull("Message " + i + " should be available", message);
|
||||||
assertEquals("Should get message: " + i, i , message.getIntProperty("MessageSequence"));
|
assertEquals("Should get message: " + i, i , message.getIntProperty("MessageSequence"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testQueueTXRollbackAndCommitAsyncConsumer() throws Exception {
|
||||||
|
final int MSG_COUNT = 3;
|
||||||
|
|
||||||
|
final AtomicInteger counter = new AtomicInteger();
|
||||||
|
|
||||||
|
connection = createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
Queue destination = session.createQueue(getDestinationName());
|
||||||
|
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
|
||||||
|
consumer.setMessageListener(new MessageListener() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(Message message) {
|
||||||
|
try {
|
||||||
|
LOG.info("Received Message {}", message.getJMSMessageID());
|
||||||
|
} catch (JMSException e) {
|
||||||
|
}
|
||||||
|
counter.incrementAndGet();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
int msgIndex = 0;
|
||||||
|
for (int i = 1; i <= MSG_COUNT; i++) {
|
||||||
|
LOG.info("Sending message: {} to rollback", msgIndex++);
|
||||||
|
TextMessage message = session.createTextMessage("Rolled back Message: " + msgIndex);
|
||||||
|
message.setIntProperty("MessageSequence", msgIndex);
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("ROLLBACK of sent message here:");
|
||||||
|
session.rollback();
|
||||||
|
|
||||||
|
assertEquals(0, getProxyToQueue(getDestinationName()).getQueueSize());
|
||||||
|
|
||||||
|
for (int i = 1; i <= MSG_COUNT; i++) {
|
||||||
|
LOG.info("Sending message: {} to commit", msgIndex++);
|
||||||
|
TextMessage message = session.createTextMessage("Commit Message: " + msgIndex);
|
||||||
|
message.setIntProperty("MessageSequence", msgIndex);
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("COMMIT of sent message here:");
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize());
|
||||||
|
SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName());
|
||||||
|
assertNotNull(subscription);
|
||||||
|
LOG.info("Subscription[{}]: prefetch size after rollback = {}", subscription.getSubscriptionId(), subscription.getPrefetchSize());
|
||||||
|
assertTrue(subscription.getPrefetchSize() > 0);
|
||||||
|
|
||||||
|
assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return counter.get() == MSG_COUNT;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
LOG.info("COMMIT of first received batch here:");
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
assertTrue(subscription.getPrefetchSize() > 0);
|
||||||
|
for (int i = 1; i <= MSG_COUNT; i++) {
|
||||||
|
LOG.info("Sending message: {} to commit", msgIndex++);
|
||||||
|
TextMessage message = session.createTextMessage("Commit Message: " + msgIndex);
|
||||||
|
message.setIntProperty("MessageSequence", msgIndex);
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("COMMIT of next sent message batch here:");
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
LOG.info("WAITING -> for next three messages to arrive:");
|
||||||
|
|
||||||
|
assertTrue(subscription.getPrefetchSize() > 0);
|
||||||
|
assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
LOG.info("Read {} messages so far", counter.get());
|
||||||
|
return counter.get() == MSG_COUNT * 2;
|
||||||
|
}
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,13 +19,18 @@ package org.apache.activemq.transport.amqp;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.util.Enumeration;
|
import java.util.Enumeration;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
import javax.jms.QueueBrowser;
|
import javax.jms.QueueBrowser;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||||
import org.apache.activemq.junit.ActiveMQTestRunner;
|
import org.apache.activemq.junit.ActiveMQTestRunner;
|
||||||
|
@ -45,12 +50,12 @@ public class JMSQueueBrowserTest extends JMSClientTestSupport {
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class);
|
protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class);
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
@Repeat(repetitions = 1)
|
@Repeat(repetitions = 5)
|
||||||
public void testBrowseAllInQueueZeroPrefetch() throws Exception {
|
public void testBrowseAllInQueueZeroPrefetch() throws Exception {
|
||||||
|
|
||||||
final int MSG_COUNT = 5;
|
final int MSG_COUNT = 5;
|
||||||
|
|
||||||
JmsConnectionFactory cf = new JmsConnectionFactory(getBrokerURI() + "?jms.prefetchPolicy.all=0");
|
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=0"));
|
||||||
connection = cf.createConnection();
|
connection = cf.createConnection();
|
||||||
connection.start();
|
connection.start();
|
||||||
|
|
||||||
|
@ -78,6 +83,242 @@ public class JMSQueueBrowserTest extends JMSClientTestSupport {
|
||||||
assertEquals(5, count);
|
assertEquals(5, count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 40000)
|
||||||
|
public void testCreateQueueBrowser() throws Exception {
|
||||||
|
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI());
|
||||||
|
|
||||||
|
connection = cf.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
assertNotNull(session);
|
||||||
|
Queue queue = session.createQueue(getDestinationName());
|
||||||
|
session.createConsumer(queue).close();
|
||||||
|
|
||||||
|
QueueBrowser browser = session.createBrowser(queue);
|
||||||
|
assertNotNull(browser);
|
||||||
|
|
||||||
|
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
|
||||||
|
assertEquals(0, proxy.getQueueSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 40000)
|
||||||
|
public void testNoMessagesBrowserHasNoElements() throws Exception {
|
||||||
|
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI());
|
||||||
|
|
||||||
|
connection = cf.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
assertNotNull(session);
|
||||||
|
Queue queue = session.createQueue(getDestinationName());
|
||||||
|
session.createConsumer(queue).close();
|
||||||
|
|
||||||
|
QueueBrowser browser = session.createBrowser(queue);
|
||||||
|
assertNotNull(browser);
|
||||||
|
|
||||||
|
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
|
||||||
|
assertEquals(0, proxy.getQueueSize());
|
||||||
|
|
||||||
|
Enumeration<?> enumeration = browser.getEnumeration();
|
||||||
|
assertFalse(enumeration.hasMoreElements());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=30000)
|
||||||
|
public void testBroseOneInQueue() throws Exception {
|
||||||
|
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI());
|
||||||
|
|
||||||
|
connection = cf.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Queue queue = session.createQueue(getDestinationName());
|
||||||
|
MessageProducer producer = session.createProducer(queue);
|
||||||
|
producer.send(session.createTextMessage("hello"));
|
||||||
|
producer.close();
|
||||||
|
|
||||||
|
QueueBrowser browser = session.createBrowser(queue);
|
||||||
|
Enumeration<?> enumeration = browser.getEnumeration();
|
||||||
|
while (enumeration.hasMoreElements()) {
|
||||||
|
Message m = (Message) enumeration.nextElement();
|
||||||
|
assertTrue(m instanceof TextMessage);
|
||||||
|
LOG.debug("Browsed message {} from Queue {}", m, queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
browser.close();
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
Message msg = consumer.receive(5000);
|
||||||
|
assertNotNull(msg);
|
||||||
|
assertTrue(msg instanceof TextMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
@Repeat(repetitions = 5)
|
||||||
|
public void testBrowseAllInQueue() throws Exception {
|
||||||
|
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI());
|
||||||
|
|
||||||
|
connection = cf.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
assertNotNull(session);
|
||||||
|
Queue queue = session.createQueue(getDestinationName());
|
||||||
|
sendMessages(name.getMethodName(), 5, false);
|
||||||
|
|
||||||
|
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
|
||||||
|
assertEquals(5, proxy.getQueueSize());
|
||||||
|
|
||||||
|
QueueBrowser browser = session.createBrowser(queue);
|
||||||
|
assertNotNull(browser);
|
||||||
|
Enumeration<?> enumeration = browser.getEnumeration();
|
||||||
|
int count = 0;
|
||||||
|
while (enumeration.hasMoreElements()) {
|
||||||
|
Message msg = (Message) enumeration.nextElement();
|
||||||
|
assertNotNull(msg);
|
||||||
|
LOG.debug("Recv: {}", msg);
|
||||||
|
count++;
|
||||||
|
TimeUnit.MILLISECONDS.sleep(50);
|
||||||
|
}
|
||||||
|
assertFalse(enumeration.hasMoreElements());
|
||||||
|
assertEquals(5, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
@Repeat(repetitions = 5)
|
||||||
|
public void testBrowseAllInQueuePrefetchOne() throws Exception {
|
||||||
|
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=1"));
|
||||||
|
|
||||||
|
connection = cf.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
assertNotNull(session);
|
||||||
|
Queue queue = session.createQueue(getDestinationName());
|
||||||
|
sendMessages(name.getMethodName(), 5, false);
|
||||||
|
|
||||||
|
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
|
||||||
|
assertEquals(5, proxy.getQueueSize());
|
||||||
|
|
||||||
|
QueueBrowser browser = session.createBrowser(queue);
|
||||||
|
assertNotNull(browser);
|
||||||
|
Enumeration<?> enumeration = browser.getEnumeration();
|
||||||
|
int count = 0;
|
||||||
|
while (enumeration.hasMoreElements()) {
|
||||||
|
Message msg = (Message) enumeration.nextElement();
|
||||||
|
assertNotNull(msg);
|
||||||
|
LOG.debug("Recv: {}", msg);
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
assertFalse(enumeration.hasMoreElements());
|
||||||
|
assertEquals(5, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 40000)
|
||||||
|
public void testBrowseAllInQueueTxSession() throws Exception {
|
||||||
|
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI());
|
||||||
|
|
||||||
|
connection = cf.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
assertNotNull(session);
|
||||||
|
Queue queue = session.createQueue(getDestinationName());
|
||||||
|
sendMessages(name.getMethodName(), 5, false);
|
||||||
|
|
||||||
|
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
|
||||||
|
assertEquals(5, proxy.getQueueSize());
|
||||||
|
|
||||||
|
QueueBrowser browser = session.createBrowser(queue);
|
||||||
|
assertNotNull(browser);
|
||||||
|
Enumeration<?> enumeration = browser.getEnumeration();
|
||||||
|
int count = 0;
|
||||||
|
while (enumeration.hasMoreElements()) {
|
||||||
|
Message msg = (Message) enumeration.nextElement();
|
||||||
|
assertNotNull(msg);
|
||||||
|
LOG.debug("Recv: {}", msg);
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
assertFalse(enumeration.hasMoreElements());
|
||||||
|
assertEquals(5, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 40000)
|
||||||
|
public void testQueueBrowserInTxSessionLeavesOtherWorkUnaffected() throws Exception {
|
||||||
|
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI());
|
||||||
|
|
||||||
|
connection = cf.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
assertNotNull(session);
|
||||||
|
Queue queue = session.createQueue(getDestinationName());
|
||||||
|
sendMessages(name.getMethodName(), 5, false);
|
||||||
|
|
||||||
|
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
|
||||||
|
assertEquals(5, proxy.getQueueSize());
|
||||||
|
|
||||||
|
// Send some TX work but don't commit.
|
||||||
|
MessageProducer txProducer = session.createProducer(queue);
|
||||||
|
for (int i = 0; i < 5; ++i) {
|
||||||
|
txProducer.send(session.createMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(5, proxy.getQueueSize());
|
||||||
|
|
||||||
|
QueueBrowser browser = session.createBrowser(queue);
|
||||||
|
assertNotNull(browser);
|
||||||
|
Enumeration<?> enumeration = browser.getEnumeration();
|
||||||
|
int count = 0;
|
||||||
|
while (enumeration.hasMoreElements()) {
|
||||||
|
Message msg = (Message) enumeration.nextElement();
|
||||||
|
assertNotNull(msg);
|
||||||
|
LOG.debug("Recv: {}", msg);
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
assertFalse(enumeration.hasMoreElements());
|
||||||
|
assertEquals(5, count);
|
||||||
|
|
||||||
|
browser.close();
|
||||||
|
|
||||||
|
// Now check that all browser work did not affect the session transaction.
|
||||||
|
assertEquals(5, proxy.getQueueSize());
|
||||||
|
session.commit();
|
||||||
|
assertEquals(10, proxy.getQueueSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testBrowseAllInQueueSmallPrefetch() throws Exception {
|
||||||
|
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=5"));
|
||||||
|
|
||||||
|
connection = cf.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
final int MSG_COUNT = 30;
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
assertNotNull(session);
|
||||||
|
Queue queue = session.createQueue(getDestinationName());
|
||||||
|
sendMessages(name.getMethodName(), MSG_COUNT, false);
|
||||||
|
|
||||||
|
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
|
||||||
|
assertEquals(MSG_COUNT, proxy.getQueueSize());
|
||||||
|
|
||||||
|
QueueBrowser browser = session.createBrowser(queue);
|
||||||
|
assertNotNull(browser);
|
||||||
|
Enumeration<?> enumeration = browser.getEnumeration();
|
||||||
|
int count = 0;
|
||||||
|
while (enumeration.hasMoreElements()) {
|
||||||
|
Message msg = (Message) enumeration.nextElement();
|
||||||
|
assertNotNull(msg);
|
||||||
|
LOG.debug("Recv: {}", msg);
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
assertFalse(enumeration.hasMoreElements());
|
||||||
|
assertEquals(MSG_COUNT, count);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean isUseOpenWireConnector() {
|
protected boolean isUseOpenWireConnector() {
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -152,6 +152,19 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
|
||||||
connection.fireClientException(error);
|
connection.fireClientException(error);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void locallyClosed(AmqpConnection connection, Exception error) {
|
||||||
|
if (endpoint != null) {
|
||||||
|
// TODO: if this is a producer/consumer link then we may only be detached,
|
||||||
|
// rather than fully closed, and should respond appropriately.
|
||||||
|
endpoint.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Resource {} was locally closed", this);
|
||||||
|
|
||||||
|
connection.fireClientException(error);
|
||||||
|
}
|
||||||
|
|
||||||
public E getEndpoint() {
|
public E getEndpoint() {
|
||||||
return this.endpoint;
|
return this.endpoint;
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,9 +17,6 @@
|
||||||
package org.apache.activemq.transport.amqp.client;
|
package org.apache.activemq.transport.amqp.client;
|
||||||
|
|
||||||
import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
|
import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
|
||||||
import io.netty.buffer.ByteBuf;
|
|
||||||
import io.netty.buffer.Unpooled;
|
|
||||||
import io.netty.util.ReferenceCountUtil;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
@ -39,8 +36,10 @@ import org.apache.activemq.transport.InactivityIOException;
|
||||||
import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator;
|
import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator;
|
||||||
import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
|
import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
|
||||||
import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener;
|
import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener;
|
||||||
|
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
|
||||||
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
|
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
|
||||||
import org.apache.activemq.transport.amqp.client.util.IdGenerator;
|
import org.apache.activemq.transport.amqp.client.util.IdGenerator;
|
||||||
|
import org.apache.activemq.transport.amqp.client.util.NoOpAsyncResult;
|
||||||
import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection;
|
import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection;
|
||||||
import org.apache.qpid.proton.amqp.Symbol;
|
import org.apache.qpid.proton.amqp.Symbol;
|
||||||
import org.apache.qpid.proton.engine.Collector;
|
import org.apache.qpid.proton.engine.Collector;
|
||||||
|
@ -54,10 +53,16 @@ import org.apache.qpid.proton.engine.impl.CollectorImpl;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
|
import io.netty.util.ReferenceCountUtil;
|
||||||
|
|
||||||
public class AmqpConnection extends AmqpAbstractResource<Connection> implements NettyTransportListener {
|
public class AmqpConnection extends AmqpAbstractResource<Connection> implements NettyTransportListener {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
|
private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
|
||||||
|
|
||||||
|
private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult();
|
||||||
|
|
||||||
private static final int DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 1;
|
private static final int DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 1;
|
||||||
// NOTE: Limit default channel max to signed short range to deal with
|
// NOTE: Limit default channel max to signed short range to deal with
|
||||||
// brokers that don't currently handle the unsigned range well.
|
// brokers that don't currently handle the unsigned range well.
|
||||||
|
@ -66,6 +71,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
||||||
|
|
||||||
public static final long DEFAULT_CONNECT_TIMEOUT = 515000;
|
public static final long DEFAULT_CONNECT_TIMEOUT = 515000;
|
||||||
public static final long DEFAULT_CLOSE_TIMEOUT = 30000;
|
public static final long DEFAULT_CLOSE_TIMEOUT = 30000;
|
||||||
|
public static final long DEFAULT_DRAIN_TIMEOUT = 60000;
|
||||||
|
|
||||||
private final ScheduledExecutorService serializer;
|
private final ScheduledExecutorService serializer;
|
||||||
private final AtomicBoolean closed = new AtomicBoolean();
|
private final AtomicBoolean closed = new AtomicBoolean();
|
||||||
|
@ -95,6 +101,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
||||||
private int channelMax = DEFAULT_CHANNEL_MAX;
|
private int channelMax = DEFAULT_CHANNEL_MAX;
|
||||||
private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
|
private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
|
||||||
private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
|
private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
|
||||||
|
private long drainTimeout = DEFAULT_DRAIN_TIMEOUT;
|
||||||
|
|
||||||
public AmqpConnection(NettyTransport transport, String username, String password) {
|
public AmqpConnection(NettyTransport transport, String username, String password) {
|
||||||
setEndpoint(Connection.Factory.create());
|
setEndpoint(Connection.Factory.create());
|
||||||
|
@ -150,7 +157,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
||||||
authenticator = new SaslAuthenticator(sasl, username, password, authzid, mechanismRestriction);
|
authenticator = new SaslAuthenticator(sasl, username, password, authzid, mechanismRestriction);
|
||||||
open(future);
|
open(future);
|
||||||
|
|
||||||
pumpToProtonTransport();
|
pumpToProtonTransport(future);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -190,7 +197,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
||||||
request.onSuccess();
|
request.onSuccess();
|
||||||
}
|
}
|
||||||
|
|
||||||
pumpToProtonTransport();
|
pumpToProtonTransport(request);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.debug("Caught exception while closing proton connection");
|
LOG.debug("Caught exception while closing proton connection");
|
||||||
}
|
}
|
||||||
|
@ -241,7 +248,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
||||||
session.setEndpoint(getEndpoint().session());
|
session.setEndpoint(getEndpoint().session());
|
||||||
session.setStateInspector(getStateInspector());
|
session.setStateInspector(getStateInspector());
|
||||||
session.open(request);
|
session.open(request);
|
||||||
pumpToProtonTransport();
|
pumpToProtonTransport(request);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -355,6 +362,14 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
||||||
this.closeTimeout = closeTimeout;
|
this.closeTimeout = closeTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getDrainTimeout() {
|
||||||
|
return drainTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDrainTimeout(long drainTimeout) {
|
||||||
|
this.drainTimeout = drainTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
public List<Symbol> getOfferedCapabilities() {
|
public List<Symbol> getOfferedCapabilities() {
|
||||||
return offeredCapabilities;
|
return offeredCapabilities;
|
||||||
}
|
}
|
||||||
|
@ -439,6 +454,10 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
||||||
}
|
}
|
||||||
|
|
||||||
void pumpToProtonTransport() {
|
void pumpToProtonTransport() {
|
||||||
|
pumpToProtonTransport(NOOP_REQUEST);
|
||||||
|
}
|
||||||
|
|
||||||
|
void pumpToProtonTransport(AsyncResult request) {
|
||||||
try {
|
try {
|
||||||
boolean done = false;
|
boolean done = false;
|
||||||
while (!done) {
|
while (!done) {
|
||||||
|
@ -454,6 +473,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
fireClientException(e);
|
fireClientException(e);
|
||||||
|
request.onFailure(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,14 +26,17 @@ import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingDeque;
|
import java.util.concurrent.LinkedBlockingDeque;
|
||||||
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import javax.jms.InvalidDestinationException;
|
import javax.jms.InvalidDestinationException;
|
||||||
|
|
||||||
|
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
|
||||||
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
|
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
|
||||||
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
|
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
|
||||||
import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver;
|
import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver;
|
||||||
|
import org.apache.qpid.jms.JmsOperationTimedOutException;
|
||||||
import org.apache.qpid.proton.amqp.Binary;
|
import org.apache.qpid.proton.amqp.Binary;
|
||||||
import org.apache.qpid.proton.amqp.DescribedType;
|
import org.apache.qpid.proton.amqp.DescribedType;
|
||||||
import org.apache.qpid.proton.amqp.Symbol;
|
import org.apache.qpid.proton.amqp.Symbol;
|
||||||
|
@ -74,6 +77,9 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
private boolean presettle;
|
private boolean presettle;
|
||||||
private boolean noLocal;
|
private boolean noLocal;
|
||||||
|
|
||||||
|
private AsyncResult pullRequest;
|
||||||
|
private AsyncResult stopRequest;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new receiver instance.
|
* Create a new receiver instance.
|
||||||
*
|
*
|
||||||
|
@ -133,7 +139,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
public void run() {
|
public void run() {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
close(request);
|
close(request);
|
||||||
session.pumpToProtonTransport();
|
session.pumpToProtonTransport(request);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -156,7 +162,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
public void run() {
|
public void run() {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
detach(request);
|
detach(request);
|
||||||
session.pumpToProtonTransport();
|
session.pumpToProtonTransport(request);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -222,6 +228,108 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
return prefetch.poll();
|
return prefetch.poll();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Request a remote peer send a Message to this client waiting until one arrives.
|
||||||
|
*
|
||||||
|
* @return the pulled AmqpMessage or null if none was pulled from the remote.
|
||||||
|
*
|
||||||
|
* @throws IOException if an error occurs
|
||||||
|
*/
|
||||||
|
public AmqpMessage pull() throws IOException {
|
||||||
|
return pull(-1, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Request a remote peer send a Message to this client using an immediate drain request.
|
||||||
|
*
|
||||||
|
* @return the pulled AmqpMessage or null if none was pulled from the remote.
|
||||||
|
*
|
||||||
|
* @throws IOException if an error occurs
|
||||||
|
*/
|
||||||
|
public AmqpMessage pullImmediate() throws IOException {
|
||||||
|
return pull(0, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Request a remote peer send a Message to this client.
|
||||||
|
*
|
||||||
|
* {@literal timeout < 0} then it should remain open until a message is received.
|
||||||
|
* {@literal timeout = 0} then it returns a message or null if none available
|
||||||
|
* {@literal timeout > 0} then it should remain open for timeout amount of time.
|
||||||
|
*
|
||||||
|
* The timeout value when positive is given in milliseconds.
|
||||||
|
*
|
||||||
|
* @param timeout
|
||||||
|
* the amount of time to tell the remote peer to keep this pull request valid.
|
||||||
|
* @param unit
|
||||||
|
* the unit of measure that the timeout represents.
|
||||||
|
*
|
||||||
|
* @return the pulled AmqpMessage or null if none was pulled from the remote.
|
||||||
|
*
|
||||||
|
* @throws IOException if an error occurs
|
||||||
|
*/
|
||||||
|
public AmqpMessage pull(final long timeout, final TimeUnit unit) throws IOException {
|
||||||
|
checkClosed();
|
||||||
|
final ClientFuture request = new ClientFuture();
|
||||||
|
session.getScheduler().execute(new Runnable() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
checkClosed();
|
||||||
|
|
||||||
|
long timeoutMills = unit.toMillis(timeout);
|
||||||
|
|
||||||
|
try {
|
||||||
|
LOG.trace("Pull on Receiver {} with timeout = {}", getSubscriptionName(), timeoutMills);
|
||||||
|
if (timeoutMills < 0) {
|
||||||
|
// Wait until message arrives. Just give credit if needed.
|
||||||
|
if (getEndpoint().getCredit() == 0) {
|
||||||
|
LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName());
|
||||||
|
getEndpoint().flow(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Await the message arrival
|
||||||
|
pullRequest = request;
|
||||||
|
} else if (timeoutMills == 0) {
|
||||||
|
// If we have no credit then we need to issue some so that we can
|
||||||
|
// try to fulfill the request, then drain down what is there to
|
||||||
|
// ensure we consume what is available and remove all credit.
|
||||||
|
if (getEndpoint().getCredit() == 0){
|
||||||
|
LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName());
|
||||||
|
getEndpoint().flow(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drain immediately and wait for the message(s) to arrive,
|
||||||
|
// or a flow indicating removal of the remaining credit.
|
||||||
|
stop(request);
|
||||||
|
} else if (timeoutMills > 0) {
|
||||||
|
// If we have no credit then we need to issue some so that we can
|
||||||
|
// try to fulfill the request, then drain down what is there to
|
||||||
|
// ensure we consume what is available and remove all credit.
|
||||||
|
if (getEndpoint().getCredit() == 0) {
|
||||||
|
LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName());
|
||||||
|
getEndpoint().flow(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the timeout for the message(s) to arrive, then drain if required
|
||||||
|
// and wait for remaining message(s) to arrive or a flow indicating
|
||||||
|
// removal of the remaining credit.
|
||||||
|
stopOnSchedule(timeoutMills, request);
|
||||||
|
}
|
||||||
|
|
||||||
|
session.pumpToProtonTransport(request);
|
||||||
|
} catch (Exception e) {
|
||||||
|
request.onFailure(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
request.sync();
|
||||||
|
|
||||||
|
return prefetch.poll();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Controls the amount of credit given to the receiver link.
|
* Controls the amount of credit given to the receiver link.
|
||||||
*
|
*
|
||||||
|
@ -240,7 +348,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
try {
|
try {
|
||||||
getEndpoint().flow(credit);
|
getEndpoint().flow(credit);
|
||||||
session.pumpToProtonTransport();
|
session.pumpToProtonTransport(request);
|
||||||
request.onSuccess();
|
request.onSuccess();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
request.onFailure(e);
|
request.onFailure(e);
|
||||||
|
@ -269,7 +377,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
try {
|
try {
|
||||||
getEndpoint().drain(credit);
|
getEndpoint().drain(credit);
|
||||||
session.pumpToProtonTransport();
|
session.pumpToProtonTransport(request);
|
||||||
request.onSuccess();
|
request.onSuccess();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
request.onFailure(e);
|
request.onFailure(e);
|
||||||
|
@ -280,6 +388,31 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
request.sync();
|
request.sync();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stops the receiver, using all link credit and waiting for in-flight messages to arrive.
|
||||||
|
*
|
||||||
|
* @throws IOException if an error occurs while sending the drain.
|
||||||
|
*/
|
||||||
|
public void stop() throws IOException {
|
||||||
|
checkClosed();
|
||||||
|
final ClientFuture request = new ClientFuture();
|
||||||
|
session.getScheduler().execute(new Runnable() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
checkClosed();
|
||||||
|
try {
|
||||||
|
stop(request);
|
||||||
|
session.pumpToProtonTransport(request);
|
||||||
|
} catch (Exception e) {
|
||||||
|
request.onFailure(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
request.sync();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Accepts a message that was dispatched under the given Delivery instance.
|
* Accepts a message that was dispatched under the given Delivery instance.
|
||||||
*
|
*
|
||||||
|
@ -318,7 +451,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
delivery.settle();
|
delivery.settle();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
session.pumpToProtonTransport();
|
session.pumpToProtonTransport(request);
|
||||||
request.onSuccess();
|
request.onSuccess();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
request.onFailure(e);
|
request.onFailure(e);
|
||||||
|
@ -360,7 +493,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
disposition.setDeliveryFailed(deliveryFailed);
|
disposition.setDeliveryFailed(deliveryFailed);
|
||||||
delivery.disposition(disposition);
|
delivery.disposition(disposition);
|
||||||
delivery.settle();
|
delivery.settle();
|
||||||
session.pumpToProtonTransport();
|
session.pumpToProtonTransport(request);
|
||||||
}
|
}
|
||||||
request.onSuccess();
|
request.onSuccess();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -397,7 +530,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
if (!delivery.isSettled()) {
|
if (!delivery.isSettled()) {
|
||||||
delivery.disposition(Released.getInstance());
|
delivery.disposition(Released.getInstance());
|
||||||
delivery.settle();
|
delivery.settle();
|
||||||
session.pumpToProtonTransport();
|
session.pumpToProtonTransport(request);
|
||||||
}
|
}
|
||||||
request.onSuccess();
|
request.onSuccess();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -454,6 +587,10 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
this.noLocal = noLocal;
|
this.noLocal = noLocal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getDrainTimeout() {
|
||||||
|
return session.getConnection().getDrainTimeout();
|
||||||
|
}
|
||||||
|
|
||||||
//----- Internal implementation ------------------------------------------//
|
//----- Internal implementation ------------------------------------------//
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -604,6 +741,15 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
LOG.trace("{} has a partial incoming Message(s), deferring.", this);
|
LOG.trace("{} has a partial incoming Message(s), deferring.", this);
|
||||||
incoming = null;
|
incoming = null;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// We have exhausted the locally queued messages on this link.
|
||||||
|
// Check if we tried to stop and have now run out of credit.
|
||||||
|
if (getEndpoint().getRemoteCredit() <= 0) {
|
||||||
|
if (stopRequest != null) {
|
||||||
|
stopRequest.onSuccess();
|
||||||
|
stopRequest = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} while (incoming != null);
|
} while (incoming != null);
|
||||||
|
|
||||||
|
@ -624,6 +770,35 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
// Store reference to envelope in delivery context for recovery
|
// Store reference to envelope in delivery context for recovery
|
||||||
incoming.setContext(amqpMessage);
|
incoming.setContext(amqpMessage);
|
||||||
prefetch.add(amqpMessage);
|
prefetch.add(amqpMessage);
|
||||||
|
|
||||||
|
// We processed a message, signal completion
|
||||||
|
// of a message pull request if there is one.
|
||||||
|
if (pullRequest != null) {
|
||||||
|
pullRequest.onSuccess();
|
||||||
|
pullRequest = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void processFlowUpdates(AmqpConnection connection) throws IOException {
|
||||||
|
if (pullRequest != null || stopRequest != null) {
|
||||||
|
Receiver receiver = getEndpoint();
|
||||||
|
if (receiver.getRemoteCredit() <= 0 && receiver.getQueued() == 0) {
|
||||||
|
if (pullRequest != null) {
|
||||||
|
pullRequest.onSuccess();
|
||||||
|
pullRequest = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (stopRequest != null) {
|
||||||
|
stopRequest.onSuccess();
|
||||||
|
stopRequest = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.trace("Consumer {} flow updated, remote credit = {}", getSubscriptionName(), getEndpoint().getRemoteCredit());
|
||||||
|
|
||||||
|
super.processFlowUpdates(connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Message decodeIncomingMessage(Delivery incoming) {
|
protected Message decodeIncomingMessage(Delivery incoming) {
|
||||||
|
@ -661,6 +836,61 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void stop(final AsyncResult request) {
|
||||||
|
Receiver receiver = getEndpoint();
|
||||||
|
if (receiver.getRemoteCredit() <= 0) {
|
||||||
|
if (receiver.getQueued() == 0) {
|
||||||
|
// We have no remote credit and all the deliveries have been processed.
|
||||||
|
request.onSuccess();
|
||||||
|
} else {
|
||||||
|
// There are still deliveries to process, wait for them to be.
|
||||||
|
stopRequest = request;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// TODO: We don't actually want the additional messages that could be sent while
|
||||||
|
// draining. We could explicitly reduce credit first, or possibly use 'echo' instead
|
||||||
|
// of drain if it was supported. We would first need to understand what happens
|
||||||
|
// if we reduce credit below the number of messages already in-flight before
|
||||||
|
// the peer sees the update.
|
||||||
|
stopRequest = request;
|
||||||
|
receiver.drain(0);
|
||||||
|
|
||||||
|
if (getDrainTimeout() > 0) {
|
||||||
|
// If the remote doesn't respond we will close the consumer and break any
|
||||||
|
// blocked receive or stop calls that are waiting.
|
||||||
|
final ScheduledFuture<?> future = getSession().getScheduler().schedule(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
LOG.trace("Consumer {} drain request timed out", this);
|
||||||
|
Exception cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time");
|
||||||
|
locallyClosed(session.getConnection(), cause);
|
||||||
|
stopRequest.onFailure(cause);
|
||||||
|
session.pumpToProtonTransport(stopRequest);
|
||||||
|
}
|
||||||
|
}, getDrainTimeout(), TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
stopRequest = new ScheduledRequest(future, stopRequest);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void stopOnSchedule(long timeout, final AsyncResult request) {
|
||||||
|
LOG.trace("Receiver {} scheduling stop", this);
|
||||||
|
// We need to drain the credit if no message(s) arrive to use it.
|
||||||
|
final ScheduledFuture<?> future = getSession().getScheduler().schedule(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
LOG.trace("Receiver {} running scheduled stop", this);
|
||||||
|
if (getEndpoint().getRemoteCredit() != 0) {
|
||||||
|
stop(request);
|
||||||
|
session.pumpToProtonTransport(request);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, timeout, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
stopRequest = new ScheduledRequest(future, request);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return getClass().getSimpleName() + "{ address = " + address + "}";
|
return getClass().getSimpleName() + "{ address = " + address + "}";
|
||||||
|
@ -685,4 +915,37 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
|
|
||||||
void postRollback() {
|
void postRollback() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//----- Inner classes used in message pull operations --------------------//
|
||||||
|
|
||||||
|
protected static final class ScheduledRequest implements AsyncResult {
|
||||||
|
|
||||||
|
private final ScheduledFuture<?> sheduledTask;
|
||||||
|
private final AsyncResult origRequest;
|
||||||
|
|
||||||
|
public ScheduledRequest(ScheduledFuture<?> completionTask, AsyncResult origRequest) {
|
||||||
|
this.sheduledTask = completionTask;
|
||||||
|
this.origRequest = origRequest;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable cause) {
|
||||||
|
sheduledTask.cancel(false);
|
||||||
|
origRequest.onFailure(cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess() {
|
||||||
|
boolean cancelled = sheduledTask.cancel(false);
|
||||||
|
if (cancelled) {
|
||||||
|
// Signal completion. Otherwise wait for the scheduled task to do it.
|
||||||
|
origRequest.onSuccess();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isComplete() {
|
||||||
|
return origRequest.isComplete();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,6 +91,18 @@ public interface AmqpResource extends AmqpEventSink {
|
||||||
*/
|
*/
|
||||||
void remotelyClosed(AmqpConnection connection);
|
void remotelyClosed(AmqpConnection connection);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called to indicate that the local end has become closed but the resource
|
||||||
|
* was not awaiting a close. This could happen during an open request where
|
||||||
|
* the remote does not set an error condition or during normal operation.
|
||||||
|
*
|
||||||
|
* @param connection
|
||||||
|
* The connection that owns this resource.
|
||||||
|
* @param error
|
||||||
|
* The error that triggered the local close of this resource.
|
||||||
|
*/
|
||||||
|
void locallyClosed(AmqpConnection connection, Exception error);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the failed state for this Resource and triggers a failure signal for
|
* Sets the failed state for this Resource and triggers a failure signal for
|
||||||
* any pending ProduverRequest.
|
* any pending ProduverRequest.
|
||||||
|
|
|
@ -135,7 +135,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
doSend(message, sendRequest);
|
doSend(message, sendRequest);
|
||||||
session.pumpToProtonTransport();
|
session.pumpToProtonTransport(sendRequest);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
sendRequest.onFailure(e);
|
sendRequest.onFailure(e);
|
||||||
session.getConnection().fireClientException(e);
|
session.getConnection().fireClientException(e);
|
||||||
|
@ -165,7 +165,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
|
||||||
public void run() {
|
public void run() {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
close(request);
|
close(request);
|
||||||
session.pumpToProtonTransport();
|
session.pumpToProtonTransport(request);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp.client;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
|
||||||
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
|
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
|
||||||
import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
|
import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Source;
|
import org.apache.qpid.proton.amqp.messaging.Source;
|
||||||
|
@ -92,7 +93,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
sender.setStateInspector(getStateInspector());
|
sender.setStateInspector(getStateInspector());
|
||||||
sender.open(request);
|
sender.open(request);
|
||||||
pumpToProtonTransport();
|
pumpToProtonTransport(request);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -124,7 +125,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
sender.setStateInspector(getStateInspector());
|
sender.setStateInspector(getStateInspector());
|
||||||
sender.open(request);
|
sender.open(request);
|
||||||
pumpToProtonTransport();
|
pumpToProtonTransport(request);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -195,7 +196,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
receiver.setStateInspector(getStateInspector());
|
receiver.setStateInspector(getStateInspector());
|
||||||
receiver.open(request);
|
receiver.open(request);
|
||||||
pumpToProtonTransport();
|
pumpToProtonTransport(request);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -227,7 +228,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
receiver.setStateInspector(getStateInspector());
|
receiver.setStateInspector(getStateInspector());
|
||||||
receiver.open(request);
|
receiver.open(request);
|
||||||
pumpToProtonTransport();
|
pumpToProtonTransport(request);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -308,7 +309,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
receiver.setStateInspector(getStateInspector());
|
receiver.setStateInspector(getStateInspector());
|
||||||
receiver.open(request);
|
receiver.open(request);
|
||||||
pumpToProtonTransport();
|
pumpToProtonTransport(request);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -345,7 +346,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
receiver.setStateInspector(getStateInspector());
|
receiver.setStateInspector(getStateInspector());
|
||||||
receiver.open(request);
|
receiver.open(request);
|
||||||
pumpToProtonTransport();
|
pumpToProtonTransport(request);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -427,8 +428,8 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
|
||||||
return connection.getProtonConnection();
|
return connection.getProtonConnection();
|
||||||
}
|
}
|
||||||
|
|
||||||
void pumpToProtonTransport() {
|
void pumpToProtonTransport(AsyncResult request) {
|
||||||
connection.pumpToProtonTransport();
|
connection.pumpToProtonTransport(request);
|
||||||
}
|
}
|
||||||
|
|
||||||
AmqpTransactionId getTransactionId() {
|
AmqpTransactionId getTransactionId() {
|
||||||
|
|
|
@ -111,7 +111,7 @@ public class AmqpTransactionContext {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
session.pumpToProtonTransport();
|
session.pumpToProtonTransport(request);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -153,7 +153,7 @@ public class AmqpTransactionContext {
|
||||||
try {
|
try {
|
||||||
LOG.info("Attempting to commit TX:[{}]", transactionId);
|
LOG.info("Attempting to commit TX:[{}]", transactionId);
|
||||||
coordinator.discharge(transactionId, request, true);
|
coordinator.discharge(transactionId, request, true);
|
||||||
session.pumpToProtonTransport();
|
session.pumpToProtonTransport(request);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
request.onFailure(e);
|
request.onFailure(e);
|
||||||
}
|
}
|
||||||
|
@ -198,7 +198,7 @@ public class AmqpTransactionContext {
|
||||||
try {
|
try {
|
||||||
LOG.info("Attempting to roll back TX:[{}]", transactionId);
|
LOG.info("Attempting to roll back TX:[{}]", transactionId);
|
||||||
coordinator.discharge(transactionId, request, false);
|
coordinator.discharge(transactionId, request, false);
|
||||||
session.pumpToProtonTransport();
|
session.pumpToProtonTransport(request);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
request.onFailure(e);
|
request.onFailure(e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.transport.amqp.client.util;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple NoOp implementation used when the result of the operation does not matter.
|
||||||
|
*/
|
||||||
|
public class NoOpAsyncResult implements AsyncResult {
|
||||||
|
|
||||||
|
public final static NoOpAsyncResult INSTANCE = new NoOpAsyncResult();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable result) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isComplete() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,154 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.transport.amqp.interop;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests various behaviors of broker side drain support.
|
||||||
|
*/
|
||||||
|
public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testReceiverCanDrainMessages() throws Exception {
|
||||||
|
int MSG_COUNT = 20;
|
||||||
|
sendMessages(getTestName(), MSG_COUNT, false);
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = client.connect();
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||||
|
|
||||||
|
QueueViewMBean queueView = getProxyToQueue(getTestName());
|
||||||
|
assertEquals(MSG_COUNT, queueView.getQueueSize());
|
||||||
|
assertEquals(0, queueView.getDispatchCount());
|
||||||
|
|
||||||
|
receiver.drain(MSG_COUNT);
|
||||||
|
for (int i = 0; i < MSG_COUNT; ++i) {
|
||||||
|
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(message);
|
||||||
|
message.accept();
|
||||||
|
}
|
||||||
|
receiver.close();
|
||||||
|
|
||||||
|
assertEquals(0, queueView.getQueueSize());
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testPullWithNoMessageGetDrained() throws Exception {
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = client.connect();
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||||
|
|
||||||
|
receiver.flow(10);
|
||||||
|
|
||||||
|
QueueViewMBean queueView = getProxyToQueue(getTestName());
|
||||||
|
assertEquals(0, queueView.getQueueSize());
|
||||||
|
assertEquals(0, queueView.getDispatchCount());
|
||||||
|
|
||||||
|
assertEquals(10, receiver.getReceiver().getRemoteCredit());
|
||||||
|
|
||||||
|
assertNull(receiver.pull(1, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
assertEquals(0, receiver.getReceiver().getRemoteCredit());
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testPullOneFromRemote() throws Exception {
|
||||||
|
int MSG_COUNT = 20;
|
||||||
|
sendMessages(getTestName(), MSG_COUNT, false);
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = client.connect();
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||||
|
|
||||||
|
QueueViewMBean queueView = getProxyToQueue(getTestName());
|
||||||
|
assertEquals(MSG_COUNT, queueView.getQueueSize());
|
||||||
|
assertEquals(0, queueView.getDispatchCount());
|
||||||
|
|
||||||
|
assertEquals(0, receiver.getReceiver().getRemoteCredit());
|
||||||
|
|
||||||
|
AmqpMessage message = receiver.pull(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(message);
|
||||||
|
message.accept();
|
||||||
|
|
||||||
|
assertEquals(0, receiver.getReceiver().getRemoteCredit());
|
||||||
|
|
||||||
|
receiver.close();
|
||||||
|
|
||||||
|
assertEquals(MSG_COUNT - 1, queueView.getQueueSize());
|
||||||
|
assertEquals(1, queueView.getDispatchCount());
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMultipleZeroResultPulls() throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = client.connect();
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||||
|
|
||||||
|
receiver.flow(10);
|
||||||
|
|
||||||
|
QueueViewMBean queueView = getProxyToQueue(getTestName());
|
||||||
|
assertEquals(0, queueView.getQueueSize());
|
||||||
|
assertEquals(0, queueView.getDispatchCount());
|
||||||
|
|
||||||
|
assertEquals(10, receiver.getReceiver().getRemoteCredit());
|
||||||
|
|
||||||
|
assertNull(receiver.pull(1, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
assertEquals(0, receiver.getReceiver().getRemoteCredit());
|
||||||
|
|
||||||
|
assertNull(receiver.pull(1, TimeUnit.SECONDS));
|
||||||
|
assertNull(receiver.pull(1, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
assertEquals(0, receiver.getReceiver().getRemoteCredit());
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean isUseOpenWireConnector() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -341,34 +341,6 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
|
||||||
public void testReceiverCanDrainMessages() throws Exception {
|
|
||||||
int MSG_COUNT = 20;
|
|
||||||
sendMessages(getTestName(), MSG_COUNT, false);
|
|
||||||
|
|
||||||
AmqpClient client = createAmqpClient();
|
|
||||||
AmqpConnection connection = client.connect();
|
|
||||||
AmqpSession session = connection.createSession();
|
|
||||||
|
|
||||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
|
||||||
|
|
||||||
QueueViewMBean queueView = getProxyToQueue(getTestName());
|
|
||||||
assertEquals(MSG_COUNT, queueView.getQueueSize());
|
|
||||||
assertEquals(0, queueView.getDispatchCount());
|
|
||||||
|
|
||||||
receiver.drain(MSG_COUNT);
|
|
||||||
for (int i = 0; i < MSG_COUNT; ++i) {
|
|
||||||
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
|
|
||||||
assertNotNull(message);
|
|
||||||
message.accept();
|
|
||||||
}
|
|
||||||
receiver.close();
|
|
||||||
|
|
||||||
assertEquals(0, queueView.getQueueSize());
|
|
||||||
|
|
||||||
connection.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testUnsupportedFiltersAreNotListedAsSupported() throws Exception {
|
public void testUnsupportedFiltersAreNotListedAsSupported() throws Exception {
|
||||||
AmqpClient client = createAmqpClient();
|
AmqpClient client = createAmqpClient();
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
#
|
#
|
||||||
log4j.rootLogger=WARN, console, file
|
log4j.rootLogger=WARN, console, file
|
||||||
log4j.logger.org.apache.activemq=INFO
|
log4j.logger.org.apache.activemq=INFO
|
||||||
log4j.logger.org.apache.activemq.transport.amqp=DEBUG
|
log4j.logger.org.apache.activemq.transport.amqp=INFO
|
||||||
log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO
|
log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO
|
||||||
log4j.logger.org.fusesource=INFO
|
log4j.logger.org.fusesource=INFO
|
||||||
|
|
||||||
|
@ -30,7 +30,7 @@ log4j.logger.org.apache.qpid.jms.provider=INFO
|
||||||
log4j.logger.org.apache.qpid.jms.provider.amqp=INFO
|
log4j.logger.org.apache.qpid.jms.provider.amqp=INFO
|
||||||
log4j.logger.org.apache.qpid.jms.provider.amqp.FRAMES=INFO
|
log4j.logger.org.apache.qpid.jms.provider.amqp.FRAMES=INFO
|
||||||
|
|
||||||
# Console will only display warnnings
|
# Console will only display warnings
|
||||||
log4j.appender.console=org.apache.log4j.ConsoleAppender
|
log4j.appender.console=org.apache.log4j.ConsoleAppender
|
||||||
log4j.appender.console.layout=org.apache.log4j.PatternLayout
|
log4j.appender.console.layout=org.apache.log4j.PatternLayout
|
||||||
log4j.appender.console.layout.ConversionPattern=%d [%-15.15t] - %-5p %-25.30c{1} - %m%n
|
log4j.appender.console.layout.ConversionPattern=%d [%-15.15t] - %-5p %-25.30c{1} - %m%n
|
||||||
|
|
Loading…
Reference in New Issue