mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-5379 - amqp prefetch; split producer and consumer prefetch settings; allow consumer prefetch to be adjusted using link credit
This commit is contained in:
parent
3873ecfe5d
commit
ab3de0c4c2
|
@ -31,6 +31,7 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
|
||||||
|
|
||||||
final private AmqpTransport transport;
|
final private AmqpTransport transport;
|
||||||
private int prefetch = DEFAULT_PREFETCH;
|
private int prefetch = DEFAULT_PREFETCH;
|
||||||
|
private int producerCredit = DEFAULT_PREFETCH;
|
||||||
|
|
||||||
interface Discriminator {
|
interface Discriminator {
|
||||||
boolean matches(AmqpHeader header);
|
boolean matches(AmqpHeader header);
|
||||||
|
@ -85,6 +86,7 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
|
||||||
}
|
}
|
||||||
IAmqpProtocolConverter next = match.create(transport);
|
IAmqpProtocolConverter next = match.create(transport);
|
||||||
next.setPrefetch(prefetch);
|
next.setPrefetch(prefetch);
|
||||||
|
next.setProducerCredit(producerCredit);
|
||||||
transport.setProtocolConverter(next);
|
transport.setProtocolConverter(next);
|
||||||
for (Command send : pendingCommands) {
|
for (Command send : pendingCommands) {
|
||||||
next.onActiveMQCommand(send);
|
next.onActiveMQCommand(send);
|
||||||
|
@ -113,4 +115,9 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
|
||||||
public void setPrefetch(int prefetch) {
|
public void setPrefetch(int prefetch) {
|
||||||
this.prefetch = prefetch;
|
this.prefetch = prefetch;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setProducerCredit(int producerCredit) {
|
||||||
|
this.producerCredit = producerCredit;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,31 +30,7 @@ import javax.jms.Destination;
|
||||||
import javax.jms.InvalidClientIDException;
|
import javax.jms.InvalidClientIDException;
|
||||||
import javax.jms.InvalidSelectorException;
|
import javax.jms.InvalidSelectorException;
|
||||||
|
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.*;
|
||||||
import org.apache.activemq.command.ActiveMQMessage;
|
|
||||||
import org.apache.activemq.command.ActiveMQTempQueue;
|
|
||||||
import org.apache.activemq.command.ActiveMQTempTopic;
|
|
||||||
import org.apache.activemq.command.Command;
|
|
||||||
import org.apache.activemq.command.ConnectionError;
|
|
||||||
import org.apache.activemq.command.ConnectionId;
|
|
||||||
import org.apache.activemq.command.ConnectionInfo;
|
|
||||||
import org.apache.activemq.command.ConsumerId;
|
|
||||||
import org.apache.activemq.command.ConsumerInfo;
|
|
||||||
import org.apache.activemq.command.DestinationInfo;
|
|
||||||
import org.apache.activemq.command.ExceptionResponse;
|
|
||||||
import org.apache.activemq.command.LocalTransactionId;
|
|
||||||
import org.apache.activemq.command.MessageAck;
|
|
||||||
import org.apache.activemq.command.MessageDispatch;
|
|
||||||
import org.apache.activemq.command.MessageId;
|
|
||||||
import org.apache.activemq.command.ProducerId;
|
|
||||||
import org.apache.activemq.command.ProducerInfo;
|
|
||||||
import org.apache.activemq.command.RemoveInfo;
|
|
||||||
import org.apache.activemq.command.RemoveSubscriptionInfo;
|
|
||||||
import org.apache.activemq.command.Response;
|
|
||||||
import org.apache.activemq.command.SessionId;
|
|
||||||
import org.apache.activemq.command.SessionInfo;
|
|
||||||
import org.apache.activemq.command.ShutdownInfo;
|
|
||||||
import org.apache.activemq.command.TransactionInfo;
|
|
||||||
import org.apache.activemq.selector.SelectorParser;
|
import org.apache.activemq.selector.SelectorParser;
|
||||||
import org.apache.activemq.util.IOExceptionSupport;
|
import org.apache.activemq.util.IOExceptionSupport;
|
||||||
import org.apache.activemq.util.IdGenerator;
|
import org.apache.activemq.util.IdGenerator;
|
||||||
|
@ -122,6 +98,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
|
||||||
private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
|
private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
|
||||||
|
|
||||||
protected int prefetch;
|
protected int prefetch;
|
||||||
|
protected int producerCredit;
|
||||||
protected Transport protonTransport = Proton.transport();
|
protected Transport protonTransport = Proton.transport();
|
||||||
protected Connection protonConnection = Proton.connection();
|
protected Connection protonConnection = Proton.connection();
|
||||||
protected Collector eventCollector = new CollectorImpl();
|
protected Collector eventCollector = new CollectorImpl();
|
||||||
|
@ -296,8 +273,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
|
||||||
processLinkEvent(event.getLink());
|
processLinkEvent(event.getLink());
|
||||||
break;
|
break;
|
||||||
case LINK_FLOW:
|
case LINK_FLOW:
|
||||||
Link link = event.getLink();
|
processLinkFlow(event.getLink());
|
||||||
((AmqpDeliveryListener) link.getContext()).drainCheck();
|
|
||||||
break;
|
break;
|
||||||
case DELIVERY:
|
case DELIVERY:
|
||||||
processDelivery(event.getDelivery());
|
processDelivery(event.getDelivery());
|
||||||
|
@ -317,6 +293,25 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void processLinkFlow(Link link) throws Exception {
|
||||||
|
Object context = link.getContext();
|
||||||
|
int credit = link.getRemoteCredit();
|
||||||
|
if (context != null && context instanceof ConsumerContext) {
|
||||||
|
ConsumerContext consumerContext = (ConsumerContext)context;
|
||||||
|
// change ActiveMQ consumer prefetch if needed
|
||||||
|
if (consumerContext.credit == 0 && consumerContext.consumerPrefetch != credit && credit > 0) {
|
||||||
|
ConsumerControl control = new ConsumerControl();
|
||||||
|
control.setConsumerId(consumerContext.consumerId);
|
||||||
|
control.setDestination(consumerContext.destination);
|
||||||
|
control.setPrefetch(credit);
|
||||||
|
consumerContext.consumerPrefetch = credit;
|
||||||
|
sendToActiveMQ(control, null);
|
||||||
|
}
|
||||||
|
consumerContext.credit = credit;
|
||||||
|
}
|
||||||
|
((AmqpDeliveryListener) link.getContext()).drainCheck();
|
||||||
|
}
|
||||||
|
|
||||||
protected void processConnectionEvent(Connection connection) throws Exception {
|
protected void processConnectionEvent(Connection connection) throws Exception {
|
||||||
EndpointState remoteState = connection.getRemoteState();
|
EndpointState remoteState = connection.getRemoteState();
|
||||||
if (remoteState == EndpointState.ACTIVE) {
|
if (remoteState == EndpointState.ACTIVE) {
|
||||||
|
@ -828,7 +823,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
|
||||||
void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
|
void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
|
||||||
// Client is producing to this receiver object
|
// Client is producing to this receiver object
|
||||||
org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget();
|
org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget();
|
||||||
int flow = prefetch;
|
int flow = producerCredit;
|
||||||
// use client's preference if set
|
// use client's preference if set
|
||||||
if (receiver.getRemoteCredit() != 0) {
|
if (receiver.getRemoteCredit() != 0) {
|
||||||
flow = receiver.getRemoteCredit();
|
flow = receiver.getRemoteCredit();
|
||||||
|
@ -923,6 +918,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
|
||||||
private boolean closed;
|
private boolean closed;
|
||||||
public ConsumerInfo info;
|
public ConsumerInfo info;
|
||||||
private boolean endOfBrowse = false;
|
private boolean endOfBrowse = false;
|
||||||
|
public ActiveMQDestination destination;
|
||||||
|
public int credit;
|
||||||
|
public int consumerPrefetch;
|
||||||
|
|
||||||
protected LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
|
protected LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
|
||||||
|
|
||||||
|
@ -1317,12 +1315,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
|
||||||
consumerInfo.setSelector(selector);
|
consumerInfo.setSelector(selector);
|
||||||
consumerInfo.setNoRangeAcks(true);
|
consumerInfo.setNoRangeAcks(true);
|
||||||
consumerInfo.setDestination(dest);
|
consumerInfo.setDestination(dest);
|
||||||
// use client's preference if set
|
consumerContext.destination = dest;
|
||||||
if (sender.getRemoteCredit() != 0) {
|
consumerInfo.setPrefetchSize(sender.getRemoteCredit());
|
||||||
consumerInfo.setPrefetchSize(sender.getRemoteCredit());
|
consumerContext.credit = sender.getRemoteCredit();
|
||||||
} else {
|
consumerContext.consumerPrefetch = consumerInfo.getPrefetchSize();
|
||||||
consumerInfo.setPrefetchSize(prefetch);
|
|
||||||
}
|
|
||||||
consumerInfo.setDispatchAsync(true);
|
consumerInfo.setDispatchAsync(true);
|
||||||
if (source.getDistributionMode() == COPY && dest.isQueue()) {
|
if (source.getDistributionMode() == COPY && dest.isQueue()) {
|
||||||
consumerInfo.setBrowser(true);
|
consumerInfo.setBrowser(true);
|
||||||
|
@ -1441,4 +1437,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
|
||||||
public void setPrefetch(int prefetch) {
|
public void setPrefetch(int prefetch) {
|
||||||
this.prefetch = prefetch;
|
this.prefetch = prefetch;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setProducerCredit(int producerCredit) {
|
||||||
|
this.producerCredit = producerCredit;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -180,4 +180,8 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
|
||||||
public void setPrefetch(int prefetch) {
|
public void setPrefetch(int prefetch) {
|
||||||
protocolConverter.setPrefetch(prefetch);
|
protocolConverter.setPrefetch(prefetch);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setProducerCredit(int producerCredit) {
|
||||||
|
protocolConverter.setProducerCredit(producerCredit);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,4 +33,6 @@ public interface IAmqpProtocolConverter {
|
||||||
void updateTracer();
|
void updateTracer();
|
||||||
|
|
||||||
void setPrefetch(int prefetch);
|
void setPrefetch(int prefetch);
|
||||||
|
|
||||||
|
void setProducerCredit(int producerCredit);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue