mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-6422 - match proton sender view credit to prefetchExtension - tracking credit to dispatch delta to track additional flow requests. Proton sender layer is distinct from the transport layer - they mirror each other
This commit is contained in:
parent
0bb76c7fb4
commit
ebbb7ab437
|
@ -20,8 +20,9 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.activemq.broker.region.Subscription;
|
import org.apache.activemq.broker.region.AbstractSubscription;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQMessage;
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
import org.apache.activemq.command.ConsumerControl;
|
import org.apache.activemq.command.ConsumerControl;
|
||||||
|
@ -81,7 +82,10 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
|
private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
|
||||||
|
|
||||||
private final ConsumerInfo consumerInfo;
|
private final ConsumerInfo consumerInfo;
|
||||||
private Subscription subscription;
|
private AbstractSubscription subscription;
|
||||||
|
private AtomicInteger prefetchExtension;
|
||||||
|
private int currentCreditRequest;
|
||||||
|
private int logicalDeliveryCount; // echoes prefetch extension but from protons perspective
|
||||||
private final boolean presettle;
|
private final boolean presettle;
|
||||||
|
|
||||||
private boolean draining;
|
private boolean draining;
|
||||||
|
@ -111,7 +115,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
public void open() {
|
public void open() {
|
||||||
if (!isClosed()) {
|
if (!isClosed()) {
|
||||||
session.registerSender(getConsumerId(), this);
|
session.registerSender(getConsumerId(), this);
|
||||||
subscription = session.getConnection().lookupPrefetchSubscription(consumerInfo);
|
subscription = (AbstractSubscription)session.getConnection().lookupPrefetchSubscription(consumerInfo);
|
||||||
|
prefetchExtension = subscription.getPrefetchExtension();
|
||||||
}
|
}
|
||||||
|
|
||||||
super.open();
|
super.open();
|
||||||
|
@ -168,24 +173,15 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
public void flow() throws Exception {
|
public void flow() throws Exception {
|
||||||
Link endpoint = getEndpoint();
|
Link endpoint = getEndpoint();
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Flow: draining={}, drain={} credit={}, remoteCredit={}, queued={}, unsettled={}",
|
LOG.trace("Flow: draining={}, drain={} credit={}, currentCredit={}, senderDeliveryCount={} - Sub={}",
|
||||||
draining, endpoint.getDrain(),
|
draining, endpoint.getDrain(),
|
||||||
endpoint.getCredit(), endpoint.getRemoteCredit(), endpoint.getQueued(), endpoint.getUnsettled());
|
endpoint.getCredit(), currentCreditRequest, logicalDeliveryCount, subscription);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final int endpointCredit = endpoint.getCredit();
|
||||||
if (endpoint.getDrain() && !draining) {
|
if (endpoint.getDrain() && !draining) {
|
||||||
|
|
||||||
// Revert to a pull consumer.
|
if (endpointCredit > 0) {
|
||||||
ConsumerControl control = new ConsumerControl();
|
|
||||||
control.setConsumerId(getConsumerId());
|
|
||||||
control.setDestination(getDestination());
|
|
||||||
control.setPrefetch(0);
|
|
||||||
|
|
||||||
LOG.trace("Flow: Pull case -> consumer control with prefetch (0) to control output");
|
|
||||||
|
|
||||||
sendToActiveMQ(control);
|
|
||||||
|
|
||||||
if (endpoint.getCredit() > 0) {
|
|
||||||
draining = true;
|
draining = true;
|
||||||
|
|
||||||
// Now request dispatch of the drain amount, we request immediate
|
// Now request dispatch of the drain amount, we request immediate
|
||||||
|
@ -196,9 +192,9 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
pullRequest.setDestination(getDestination());
|
pullRequest.setDestination(getDestination());
|
||||||
pullRequest.setTimeout(-1);
|
pullRequest.setTimeout(-1);
|
||||||
pullRequest.setAlwaysSignalDone(true);
|
pullRequest.setAlwaysSignalDone(true);
|
||||||
pullRequest.setQuantity(endpoint.getCredit());
|
pullRequest.setQuantity(endpointCredit);
|
||||||
|
|
||||||
LOG.trace("Pull case -> consumer pull request quantity = {}", endpoint.getCredit());
|
LOG.trace("Pull case -> consumer pull request quantity = {}", endpointCredit);
|
||||||
|
|
||||||
sendToActiveMQ(pullRequest);
|
sendToActiveMQ(pullRequest);
|
||||||
} else {
|
} else {
|
||||||
|
@ -207,25 +203,36 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
pumpOutbound();
|
pumpOutbound();
|
||||||
getEndpoint().drained();
|
getEndpoint().drained();
|
||||||
session.pumpProtonToSocket();
|
session.pumpProtonToSocket();
|
||||||
|
currentCreditRequest = 0;
|
||||||
|
logicalDeliveryCount = 0;
|
||||||
}
|
}
|
||||||
|
} else if (endpointCredit >= 0) {
|
||||||
|
|
||||||
|
if (endpointCredit == 0 && currentCreditRequest != 0) {
|
||||||
|
|
||||||
|
prefetchExtension.set(0);
|
||||||
|
currentCreditRequest = 0;
|
||||||
|
logicalDeliveryCount = 0;
|
||||||
|
LOG.trace("Flow: credit 0 for sub:" + subscription);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
ConsumerControl control = new ConsumerControl();
|
|
||||||
control.setConsumerId(getConsumerId());
|
|
||||||
control.setDestination(getDestination());
|
|
||||||
|
|
||||||
int remoteCredit = endpoint.getRemoteCredit();
|
int deltaToAdd = endpointCredit;
|
||||||
if (remoteCredit > 0 && subscription != null) {
|
int logicalCredit = currentCreditRequest - logicalDeliveryCount;
|
||||||
// ensure prefetch exceeds credit + inflight
|
if (logicalCredit > 0) {
|
||||||
if (remoteCredit + endpoint.getUnsettled() + endpoint.getQueued() > subscription.getPrefetchSize()) {
|
deltaToAdd -= logicalCredit;
|
||||||
LOG.trace("Adding dispatched size to credit for sub: " + subscription);
|
} else {
|
||||||
remoteCredit += subscription.getDispatchedQueueSize();
|
// reset delivery counter - dispatch from broker concurrent with credit=0 flow can go negative
|
||||||
|
logicalDeliveryCount = 0;
|
||||||
|
}
|
||||||
|
if (deltaToAdd > 0) {
|
||||||
|
currentCreditRequest = prefetchExtension.addAndGet(deltaToAdd);
|
||||||
|
subscription.wakeupDestinationsForDispatch();
|
||||||
|
// force dispatch of matched/pending for topics (pending messages accumulate in the sub and are dispatched on update of prefetch)
|
||||||
|
subscription.setPrefetchSize(0);
|
||||||
|
LOG.trace("Flow: credit addition of {} for sub {}", deltaToAdd, subscription);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
control.setPrefetch(remoteCredit);
|
|
||||||
|
|
||||||
LOG.trace("Flow: update -> consumer control with prefetch {}", control.getPrefetch());
|
|
||||||
|
|
||||||
sendToActiveMQ(control);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -285,6 +292,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pumpOutbound();
|
pumpOutbound();
|
||||||
|
logicalDeliveryCount++;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -440,6 +448,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
// It's the end of browse signal in response to a MessagePull
|
// It's the end of browse signal in response to a MessagePull
|
||||||
getEndpoint().drained();
|
getEndpoint().drained();
|
||||||
draining = false;
|
draining = false;
|
||||||
|
currentCreditRequest = 0;
|
||||||
|
logicalDeliveryCount = 0;
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Sender:[{}] msgId={} draining={}, drain={}, credit={}, remoteCredit={}, queued={}",
|
LOG.trace("Sender:[{}] msgId={} draining={}, drain={}, credit={}, remoteCredit={}, queued={}",
|
||||||
|
@ -451,6 +461,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
LOG.trace("Sender:[{}] browse complete.", getEndpoint().getName());
|
LOG.trace("Sender:[{}] browse complete.", getEndpoint().getName());
|
||||||
getEndpoint().drained();
|
getEndpoint().drained();
|
||||||
draining = false;
|
draining = false;
|
||||||
|
currentCreditRequest = 0;
|
||||||
|
logicalDeliveryCount = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
jms.setRedeliveryCounter(md.getRedeliveryCounter());
|
jms.setRedeliveryCounter(md.getRedeliveryCounter());
|
||||||
|
@ -481,17 +493,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
tagCache.returnTag(tag);
|
tagCache.returnTag(tag);
|
||||||
}
|
}
|
||||||
|
|
||||||
int newCredit = Math.max(0, getEndpoint().getCredit() - 1);
|
|
||||||
LOG.trace("Sender:[{}] updating conumser prefetch:{} after delivery settled.",
|
|
||||||
getEndpoint().getName(), newCredit);
|
|
||||||
|
|
||||||
ConsumerControl control = new ConsumerControl();
|
|
||||||
control.setConsumerId(getConsumerId());
|
|
||||||
control.setDestination(getDestination());
|
|
||||||
control.setPrefetch(newCredit);
|
|
||||||
|
|
||||||
sendToActiveMQ(control);
|
|
||||||
|
|
||||||
if (ackType == -1) {
|
if (ackType == -1) {
|
||||||
// we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
|
// we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
|
||||||
delivery.settle();
|
delivery.settle();
|
||||||
|
|
|
@ -193,8 +193,6 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
|
||||||
assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize());
|
assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize());
|
||||||
SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName());
|
SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName());
|
||||||
assertNotNull(subscription);
|
assertNotNull(subscription);
|
||||||
LOG.info("Subscription[{}]: prefetch size after rollback = {}", subscription.getSubscriptionId(), subscription.getPrefetchSize());
|
|
||||||
assertTrue(subscription.getPrefetchSize() > 0);
|
|
||||||
|
|
||||||
for (int i = 1; i <= MSG_COUNT; i++) {
|
for (int i = 1; i <= MSG_COUNT; i++) {
|
||||||
LOG.info("Trying to receive message: {}", i);
|
LOG.info("Trying to receive message: {}", i);
|
||||||
|
@ -259,8 +257,6 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
|
||||||
assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize());
|
assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize());
|
||||||
SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName());
|
SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName());
|
||||||
assertNotNull(subscription);
|
assertNotNull(subscription);
|
||||||
LOG.info("Subscription[{}]: prefetch size after rollback = {}", subscription.getSubscriptionId(), subscription.getPrefetchSize());
|
|
||||||
assertTrue(subscription.getPrefetchSize() > 0);
|
|
||||||
|
|
||||||
assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() {
|
assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
|
@ -273,7 +269,6 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
|
||||||
LOG.info("COMMIT of first received batch here:");
|
LOG.info("COMMIT of first received batch here:");
|
||||||
session.commit();
|
session.commit();
|
||||||
|
|
||||||
assertTrue(subscription.getPrefetchSize() > 0);
|
|
||||||
for (int i = 1; i <= MSG_COUNT; i++) {
|
for (int i = 1; i <= MSG_COUNT; i++) {
|
||||||
LOG.info("Sending message: {} to commit", msgIndex++);
|
LOG.info("Sending message: {} to commit", msgIndex++);
|
||||||
TextMessage message = session.createTextMessage("Commit Message: " + msgIndex);
|
TextMessage message = session.createTextMessage("Commit Message: " + msgIndex);
|
||||||
|
@ -286,7 +281,6 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
|
||||||
|
|
||||||
LOG.info("WAITING -> for next three messages to arrive:");
|
LOG.info("WAITING -> for next three messages to arrive:");
|
||||||
|
|
||||||
assertTrue(subscription.getPrefetchSize() > 0);
|
|
||||||
assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() {
|
assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -294,7 +294,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
||||||
receiver2.flow(splitCredit);
|
receiver2.flow(splitCredit);
|
||||||
for (int i = 0; i < splitCredit; i++) {
|
for (int i = 0; i < splitCredit; i++) {
|
||||||
AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
|
AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
|
||||||
assertNotNull("Receiver #2 should have read a message", message);
|
assertNotNull("Receiver #2 should have read message[" + i + "]", message);
|
||||||
LOG.info("Receiver #2 read message: {}", message.getMessageId());
|
LOG.info("Receiver #2 read message: {}", message.getMessageId());
|
||||||
message.accept();
|
message.accept();
|
||||||
}
|
}
|
||||||
|
@ -671,7 +671,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
||||||
LOG.info("*** Attempting to read remaining messages with both receivers");
|
LOG.info("*** Attempting to read remaining messages with both receivers");
|
||||||
int splitCredit = (MSG_COUNT - 4) / 2;
|
int splitCredit = (MSG_COUNT - 4) / 2;
|
||||||
|
|
||||||
LOG.info("**** Receiver #1 granting creadit[{}] for its block of messages", splitCredit);
|
LOG.info("**** Receiver #1 granting credit[{}] for its block of messages", splitCredit);
|
||||||
receiver1.flow(splitCredit);
|
receiver1.flow(splitCredit);
|
||||||
for (int i = 0; i < splitCredit; i++) {
|
for (int i = 0; i < splitCredit; i++) {
|
||||||
AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
|
AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
|
||||||
|
@ -680,11 +680,11 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
||||||
message.accept();
|
message.accept();
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("**** Receiver #2 granting creadit[{}] for its block of messages", splitCredit);
|
LOG.info("**** Receiver #2 granting credit[{}] for its block of messages", splitCredit);
|
||||||
receiver2.flow(splitCredit);
|
receiver2.flow(splitCredit);
|
||||||
for (int i = 0; i < splitCredit; i++) {
|
for (int i = 0; i < splitCredit; i++) {
|
||||||
AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
|
AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
|
||||||
assertNotNull("Receiver #2 should have read a message", message);
|
assertNotNull("Receiver #2 should have read a message[" + i + "]", message);
|
||||||
LOG.info("Receiver #2 read message: {}", message.getMessageId());
|
LOG.info("Receiver #2 read message: {}", message.getMessageId());
|
||||||
message.accept();
|
message.accept();
|
||||||
}
|
}
|
||||||
|
|
|
@ -688,7 +688,7 @@ public abstract class AbstractRegion implements Region {
|
||||||
entry.configurePrefetch(sub);
|
entry.configurePrefetch(sub);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.debug("setting prefetch: {}, on subscription: {}; resulting value: {}", new Object[]{ control.getPrefetch(), control.getConsumerId(), sub.getConsumerInfo().getCurrentPrefetchSize()});
|
LOG.debug("setting prefetch: {}, on subscription: {}; resulting value: {}", new Object[]{ control.getPrefetch(), control.getConsumerId(), sub.getConsumerInfo().getPrefetchSize()});
|
||||||
try {
|
try {
|
||||||
lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
|
lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -20,6 +20,7 @@ import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import javax.jms.InvalidSelectorException;
|
import javax.jms.InvalidSelectorException;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
@ -49,6 +50,7 @@ public abstract class AbstractSubscription implements Subscription {
|
||||||
protected ConsumerInfo info;
|
protected ConsumerInfo info;
|
||||||
protected final DestinationFilter destinationFilter;
|
protected final DestinationFilter destinationFilter;
|
||||||
protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
|
protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
|
||||||
|
protected final AtomicInteger prefetchExtension = new AtomicInteger(0);
|
||||||
|
|
||||||
private BooleanExpression selectorExpression;
|
private BooleanExpression selectorExpression;
|
||||||
private ObjectName objectName;
|
private ObjectName objectName;
|
||||||
|
@ -309,4 +311,14 @@ public abstract class AbstractSubscription implements Subscription {
|
||||||
public SubscriptionStatistics getSubscriptionStatistics() {
|
public SubscriptionStatistics getSubscriptionStatistics() {
|
||||||
return subscriptionStatistics;
|
return subscriptionStatistics;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void wakeupDestinationsForDispatch() {
|
||||||
|
for (Destination dest : destinations) {
|
||||||
|
dest.wakeup();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public AtomicInteger getPrefetchExtension() {
|
||||||
|
return this.prefetchExtension;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,6 @@ import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
|
||||||
|
@ -57,7 +56,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
||||||
|
|
||||||
protected PendingMessageCursor pending;
|
protected PendingMessageCursor pending;
|
||||||
protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
|
protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
|
||||||
protected final AtomicInteger prefetchExtension = new AtomicInteger();
|
|
||||||
protected boolean usePrefetchExtension = true;
|
protected boolean usePrefetchExtension = true;
|
||||||
private int maxProducersToAudit=32;
|
private int maxProducersToAudit=32;
|
||||||
private int maxAuditDepth=2048;
|
private int maxAuditDepth=2048;
|
||||||
|
@ -431,9 +429,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
||||||
dispatchPending();
|
dispatchPending();
|
||||||
|
|
||||||
if (pending.isEmpty()) {
|
if (pending.isEmpty()) {
|
||||||
for (Destination dest : destinations) {
|
wakeupDestinationsForDispatch();
|
||||||
dest.wakeup();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): {}", ack);
|
LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): {}", ack);
|
||||||
|
@ -904,10 +900,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
||||||
this.usePrefetchExtension = usePrefetchExtension;
|
this.usePrefetchExtension = usePrefetchExtension;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected int getPrefetchExtension() {
|
|
||||||
return this.prefetchExtension.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setPrefetchSize(int prefetchSize) {
|
public void setPrefetchSize(int prefetchSize) {
|
||||||
this.info.setPrefetchSize(prefetchSize);
|
this.info.setPrefetchSize(prefetchSize);
|
||||||
|
|
|
@ -69,7 +69,7 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
|
||||||
@Override
|
@Override
|
||||||
public synchronized String toString() {
|
public synchronized String toString() {
|
||||||
return "QueueSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + dispatched.size() + ", delivered="
|
return "QueueSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + dispatched.size() + ", delivered="
|
||||||
+ this.prefetchExtension + ", pending=" + getPendingQueueSize();
|
+ this.prefetchExtension + ", pending=" + getPendingQueueSize() + ", prefetch=" + getPrefetchSize() + ", prefetchExtension=" + prefetchExtension.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -64,7 +64,6 @@ public class TopicSubscription extends AbstractSubscription {
|
||||||
private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
|
private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
|
||||||
private int discarded;
|
private int discarded;
|
||||||
private final Object matchedListMutex = new Object();
|
private final Object matchedListMutex = new Object();
|
||||||
private final AtomicInteger prefetchExtension = new AtomicInteger(0);
|
|
||||||
private int memoryUsageHighWaterMark = 95;
|
private int memoryUsageHighWaterMark = 95;
|
||||||
// allow duplicate suppression in a ring network of brokers
|
// allow duplicate suppression in a ring network of brokers
|
||||||
protected int maxProducersToAudit = 1024;
|
protected int maxProducersToAudit = 1024;
|
||||||
|
@ -410,6 +409,16 @@ public class TopicSubscription extends AbstractSubscription {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void decrementPrefetchExtension() {
|
||||||
|
while (true) {
|
||||||
|
int currentExtension = prefetchExtension.get();
|
||||||
|
int newExtension = Math.max(0, currentExtension - 1);
|
||||||
|
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int countBeforeFull() {
|
public int countBeforeFull() {
|
||||||
return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - getDispatchedQueueSize();
|
return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - getDispatchedQueueSize();
|
||||||
|
@ -529,6 +538,9 @@ public class TopicSubscription extends AbstractSubscription {
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
@Override
|
@Override
|
||||||
public boolean isFull() {
|
public boolean isFull() {
|
||||||
|
if (info.getPrefetchSize() == 0) {
|
||||||
|
return prefetchExtension.get() == 0;
|
||||||
|
}
|
||||||
return getDispatchedQueueSize() >= info.getPrefetchSize();
|
return getDispatchedQueueSize() >= info.getPrefetchSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -655,6 +667,11 @@ public class TopicSubscription extends AbstractSubscription {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (getPrefetchSize() == 0) {
|
||||||
|
decrementPrefetchExtension();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
if (info.isDispatchAsync()) {
|
if (info.isDispatchAsync()) {
|
||||||
if (node != null) {
|
if (node != null) {
|
||||||
|
@ -712,7 +729,7 @@ public class TopicSubscription extends AbstractSubscription {
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
|
return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
|
||||||
+ getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
|
+ getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded() + ", prefetchExtension=" + prefetchExtension.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -22,6 +22,7 @@ import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
@ -75,6 +76,24 @@ public class TopicSubscriptionZeroPrefetchTest {
|
||||||
Assert.assertNotNull("should have received a message the published message", consumedMessage);
|
Assert.assertNotNull("should have received a message the published message", consumedMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testTopicConsumerPrefetchZeroClientAckLoop() throws Exception {
|
||||||
|
ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.retroactive=true&consumer.prefetchSize=0");
|
||||||
|
Session consumerClientAckSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
consumer = consumerClientAckSession.createConsumer(consumerDestination);
|
||||||
|
|
||||||
|
final int count = 10;
|
||||||
|
for (int i=0;i<count;i++) {
|
||||||
|
Message txtMessage = session.createTextMessage("M:"+ i);
|
||||||
|
producer.send(txtMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i=0;i<count;i++) {
|
||||||
|
Message consumedMessage = consumer.receive(2000);
|
||||||
|
Assert.assertNotNull("should have received message[" + i +"]", consumedMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* test durable topic subscription with prefetch zero
|
* test durable topic subscription with prefetch zero
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue