https://issues.apache.org/jira/browse/AMQ-5433
https://issues.apache.org/jira/browse/AMQ-5647
https://issues.apache.org/jira/browse/AMQ-5684

Adds support for AMQP drain and fixes some issues around incorrect
dispatching and credit handling.  Should resolve several issues that
have been seen using test suites from AmqpNetLite and other AMQP
clients.
This commit is contained in:
Timothy Bish 2015-03-27 15:11:38 -04:00
parent b585650197
commit 05ff52dc15
7 changed files with 95 additions and 51 deletions

View File

@ -24,6 +24,7 @@ import javax.jms.Message;
import javax.jms.MessageFormatException;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.codec.CompositeWritableBuffer;
import org.apache.qpid.proton.codec.DroppingWritableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
@ -85,6 +86,10 @@ public class AMQPNativeOutboundTransformer extends OutboundTransformer {
// Update the DeliveryCount header...
// The AMQP delivery-count field only includes prior failed delivery attempts,
// whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1.
if (amqp.getHeader() == null) {
amqp.setHeader(new Header());
}
amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1));
// Re-encode...

View File

@ -30,6 +30,7 @@ import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
@ -78,8 +79,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
private final boolean presettle;
private boolean closed;
private boolean endOfBrowse;
private int currentCredit;
private boolean draining;
private long lastDeliveredSequenceId;
private Buffer currentBuffer;
@ -151,7 +152,31 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
public void flow() throws Exception {
int updatedCredit = getEndpoint().getCredit();
if (updatedCredit != currentCredit) {
LOG.trace("Flow: drain={} credit={}, remoteCredit={}",
getEndpoint().getDrain(), getEndpoint().getCredit(), getEndpoint().getRemoteCredit());
if (getEndpoint().getDrain() && (updatedCredit != currentCredit || !draining)) {
currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
draining = true;
// Revert to a pull consumer.
ConsumerControl control = new ConsumerControl();
control.setConsumerId(getConsumerId());
control.setDestination(getDestination());
control.setPrefetch(0);
sendToActiveMQ(control, null);
// Now request dispatch of the drain amount, we request immediate
// timeout and an completion message regardless so that we can know
// when we should marked the link as drained.
MessagePull pullRequest = new MessagePull();
pullRequest.setConsumerId(getConsumerId());
pullRequest.setDestination(getDestination());
pullRequest.setTimeout(-1);
pullRequest.setAlwaysSignalDone(true);
pullRequest.setQuantity(currentCredit);
sendToActiveMQ(pullRequest, null);
} else if (updatedCredit != currentCredit) {
currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
ConsumerControl control = new ConsumerControl();
control.setConsumerId(getConsumerId());
@ -159,8 +184,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
control.setPrefetch(currentCredit);
sendToActiveMQ(control, null);
}
drainCheck();
}
@Override
@ -357,9 +380,10 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
final ActiveMQMessage jms = temp;
if (jms == null) {
// It's the end of browse signal.
endOfBrowse = true;
drainCheck();
LOG.info("End of browse signals endpoint drained.");
// It's the end of browse signal in response to a MessagePull
getEndpoint().drained();
draining = false;
} else {
jms.setRedeliveryCounter(md.getRedeliveryCounter());
jms.setReadOnlyBody(true);
@ -436,16 +460,4 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
});
}
}
private void drainCheck() {
// If we are a browser.. lets not say we are drained until
// we hit the end of browse message.
if (consumerInfo.isBrowser() && !endOfBrowse) {
return;
}
if (outbound.isEmpty()) {
getEndpoint().drained();
}
}
}

View File

@ -16,22 +16,31 @@
*/
package org.apache.activemq.transport.amqp;
import java.io.File;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBStore;
// import org.apache.activemq.leveldb.LevelDBStore;
import java.io.File;
public class IDERunner {
private static final String AMQP_TRANSFORMER = "jms";
private static final boolean TRANSPORT_TRACE = true;
public static void main(String[]args) throws Exception {
BrokerService bs = new BrokerService();
bs.addConnector("amqp://0.0.0.0:5672?trace=true");
BrokerService brokerService = new BrokerService();
brokerService.addConnector(
"amqp://0.0.0.0:5672?trace=" + TRANSPORT_TRACE + "&transport.transformer=" + AMQP_TRANSFORMER);
KahaDBStore store = new KahaDBStore();
store.setDirectory(new File("target/activemq-data/kahadb"));
bs.setPersistenceAdapter(store);
bs.deleteAllMessages();
bs.start();
bs.waitUntilStopped();
brokerService.setStoreOpenWireVersion(10);
brokerService.setPersistenceAdapter(store);
brokerService.setUseJmx(false);
brokerService.deleteAllMessages();
brokerService.start();
brokerService.waitUntilStopped();
}
}

View File

@ -43,7 +43,6 @@ import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.engine.Receiver;
import org.junit.Ignore;
import org.junit.Test;
/**
@ -340,7 +339,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
connection.close();
}
@Ignore("Test fails currently due to improper implementation of drain.")
//@Ignore("Test fails currently due to improper implementation of drain.")
@Test(timeout = 60000)
public void testReceiverCanDrainMessages() throws Exception {
int MSG_COUNT = 20;

View File

@ -88,13 +88,12 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
* Allows a message to be pulled on demand by a client
*/
@Override
public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception {
// The slave should not deliver pull messages.
// TODO: when the slave becomes a master, He should send a NULL message to all the
// consumers to 'wake them up' in case they were waiting for a message.
if (getPrefetchSize() == 0) {
prefetchExtension.incrementAndGet();
prefetchExtension.set(pull.getQuantity());
final long dispatchCounterBeforePull = dispatchCounter;
// Have the destination push us some messages.
@ -105,10 +104,11 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
synchronized(this) {
// If there was nothing dispatched.. we may need to setup a timeout.
if (dispatchCounterBeforePull == dispatchCounter) {
if (dispatchCounterBeforePull == dispatchCounter || pull.isAlwaysSignalDone()) {
// immediate timeout used by receiveNoWait()
if (pull.getTimeout() == -1) {
// Send a NULL message.
// Null message indicates the pull is done or did not have pending.
prefetchExtension.set(1);
add(QueueMessageReference.NULL_MESSAGE);
dispatchPending();
}
@ -116,7 +116,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
scheduler.executeAfterDelay(new Runnable() {
@Override
public void run() {
pullTimeout(dispatchCounterBeforePull);
pullTimeout(dispatchCounterBeforePull, pull.isAlwaysSignalDone());
}
}, pull.getTimeout());
}
@ -130,14 +130,17 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
* Occurs when a pull times out. If nothing has been dispatched since the
* timeout was setup, then send the NULL message.
*/
final void pullTimeout(long dispatchCounterBeforePull) {
final void pullTimeout(long dispatchCounterBeforePull, boolean alwaysSignalDone) {
synchronized (pendingLock) {
if (dispatchCounterBeforePull == dispatchCounter) {
if (dispatchCounterBeforePull == dispatchCounter || alwaysSignalDone) {
try {
prefetchExtension.set(1);
add(QueueMessageReference.NULL_MESSAGE);
dispatchPending();
} catch (Exception e) {
context.getConnection().serviceException(e);
} finally {
prefetchExtension.set(0);
}
}
}
@ -147,7 +150,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
public void add(MessageReference node) throws Exception {
synchronized (pendingLock) {
// The destination may have just been removed...
if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) {
if (!destinations.contains(node.getRegionDestination()) && node != QueueMessageReference.NULL_MESSAGE) {
// perhaps we should inform the caller that we are no longer valid to dispatch to?
return;
}
@ -213,7 +216,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
// Acknowledge all dispatched messages up till the message id of
// the acknowledgment.
int index = 0;
boolean inAckRange = false;
List<MessageReference> removeList = new ArrayList<MessageReference>();
for (final MessageReference node : dispatched) {
@ -231,7 +233,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
} else {
registerRemoveSync(context, node);
}
index++;
acknowledge(context, ack, node);
if (ack.getLastMessageId().equals(messageId)) {
destination = (Destination) node.getRegionDestination();

View File

@ -326,23 +326,23 @@ public class TopicSubscription extends AbstractSubscription {
}
@Override
public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception {
// The slave should not deliver pull messages.
if (getPrefetchSize() == 0 ) {
if (getPrefetchSize() == 0) {
final long currentDispatchedCount = dispatchedCounter.get();
prefetchExtension.incrementAndGet();
prefetchExtension.set(pull.getQuantity());
dispatchMatched();
// If there was nothing dispatched.. we may need to setup a timeout.
if (currentDispatchedCount == dispatchedCounter.get()) {
if (currentDispatchedCount == dispatchedCounter.get() || pull.isAlwaysSignalDone()) {
// immediate timeout used by receiveNoWait()
if (pull.getTimeout() == -1) {
prefetchExtension.decrementAndGet();
// Send a NULL message to signal nothing pending.
dispatch(null);
prefetchExtension.set(0);
}
if (pull.getTimeout() > 0) {
@ -350,7 +350,7 @@ public class TopicSubscription extends AbstractSubscription {
@Override
public void run() {
pullTimeout(currentDispatchedCount);
pullTimeout(currentDispatchedCount, pull.isAlwaysSignalDone());
}
}, pull.getTimeout());
}
@ -363,15 +363,15 @@ public class TopicSubscription extends AbstractSubscription {
* Occurs when a pull times out. If nothing has been dispatched since the
* timeout was setup, then send the NULL message.
*/
private final void pullTimeout(long currentDispatchedCount) {
private final void pullTimeout(long currentDispatchedCount, boolean alwaysSendDone) {
synchronized (matchedListMutex) {
if (currentDispatchedCount == dispatchedCounter.get()) {
if (currentDispatchedCount == dispatchedCounter.get() || alwaysSendDone) {
try {
dispatch(null);
} catch (Exception e) {
context.getConnection().serviceException(e);
} finally {
prefetchExtension.decrementAndGet();
prefetchExtension.set(0);
}
}
}
@ -583,7 +583,7 @@ public class TopicSubscription extends AbstractSubscription {
}
private void dispatch(final MessageReference node) throws IOException {
Message message = node.getMessage();
Message message = node != null ? node.getMessage() : null;
if (node != null) {
node.incrementReferenceCount();
}

View File

@ -35,6 +35,8 @@ public class MessagePull extends BaseCommand {
private MessageId messageId;
private String correlationId;
private transient int quantity = 1;
private transient boolean alwaysSignalDone;
private transient boolean tracked = false;
@Override
@ -124,4 +126,20 @@ public class MessagePull extends BaseCommand {
public boolean isTracked() {
return this.tracked;
}
public int getQuantity() {
return quantity;
}
public void setQuantity(int quantity) {
this.quantity = quantity;
}
public boolean isAlwaysSignalDone() {
return alwaysSignalDone;
}
public void setAlwaysSignalDone(boolean alwaysSignalDone) {
this.alwaysSignalDone = alwaysSignalDone;
}
}