This commit is contained in:
Dejan Bosanac 2014-10-02 11:32:31 +02:00
parent fc3d90e8b7
commit 838bbebeeb
4 changed files with 33 additions and 4 deletions

View File

@ -27,7 +27,10 @@ import org.apache.activemq.command.Command;
*/ */
public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
private static final int DEFAULT_PREFETCH = 100;
final private AmqpTransport transport; final private AmqpTransport transport;
private int prefetch = DEFAULT_PREFETCH;
interface Discriminator { interface Discriminator {
boolean matches(AmqpHeader header); boolean matches(AmqpHeader header);
@ -81,6 +84,7 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
match = DISCRIMINATORS.get(0); match = DISCRIMINATORS.get(0);
} }
IAmqpProtocolConverter next = match.create(transport); IAmqpProtocolConverter next = match.create(transport);
next.setPrefetch(prefetch);
transport.setProtocolConverter(next); transport.setProtocolConverter(next);
for (Command send : pendingCommands) { for (Command send : pendingCommands) {
next.onActiveMQCommand(send); next.onActiveMQCommand(send);
@ -104,4 +108,9 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
@Override @Override
public void updateTracer() { public void updateTracer() {
} }
@Override
public void setPrefetch(int prefetch) {
this.prefetch = prefetch;
}
} }

View File

@ -117,7 +117,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private static final Symbol NO_LOCAL = Symbol.valueOf("no-local"); private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED"); private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
protected int prefetch = 100; protected int prefetch;
protected Transport protonTransport = Proton.transport(); protected Transport protonTransport = Proton.transport();
protected Connection protonConnection = Proton.connection(); protected Connection protonConnection = Proton.connection();
protected Collector eventCollector = new CollectorImpl(); protected Collector eventCollector = new CollectorImpl();
@ -780,11 +780,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) { void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
// Client is producing to this receiver object // Client is producing to this receiver object
org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget(); org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget();
int flow = prefetch;
// use client's preference if set
if (receiver.getRemoteCredit() != 0) {
flow = receiver.getRemoteCredit();
}
try { try {
if (remoteTarget instanceof Coordinator) { if (remoteTarget instanceof Coordinator) {
pumpProtonToSocket(); pumpProtonToSocket();
receiver.setContext(coordinatorContext); receiver.setContext(coordinatorContext);
receiver.flow(prefetch); receiver.flow(flow);
receiver.open(); receiver.open();
pumpProtonToSocket(); pumpProtonToSocket();
} else { } else {
@ -804,7 +809,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
ProducerContext producerContext = new ProducerContext(producerId, dest); ProducerContext producerContext = new ProducerContext(producerId, dest);
receiver.setContext(producerContext); receiver.setContext(producerContext);
receiver.flow(prefetch); receiver.flow(flow);
ProducerInfo producerInfo = new ProducerInfo(producerId); ProducerInfo producerInfo = new ProducerInfo(producerId);
producerInfo.setDestination(dest); producerInfo.setDestination(dest);
sendToActiveMQ(producerInfo, new ResponseHandler() { sendToActiveMQ(producerInfo, new ResponseHandler() {
@ -1258,7 +1263,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
consumerInfo.setSelector(selector); consumerInfo.setSelector(selector);
consumerInfo.setNoRangeAcks(true); consumerInfo.setNoRangeAcks(true);
consumerInfo.setDestination(dest); consumerInfo.setDestination(dest);
consumerInfo.setPrefetchSize(100); // use client's preference if set
if (sender.getRemoteCredit() != 0) {
consumerInfo.setPrefetchSize(sender.getRemoteCredit());
} else {
consumerInfo.setPrefetchSize(prefetch);
}
consumerInfo.setDispatchAsync(true); consumerInfo.setDispatchAsync(true);
if (source.getDistributionMode() == COPY && dest.isQueue()) { if (source.getDistributionMode() == COPY && dest.isQueue()) {
consumerInfo.setBrowser(true); consumerInfo.setBrowser(true);
@ -1372,4 +1382,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
condition.setDescription(description); condition.setDescription(description);
return condition; return condition;
} }
public void setPrefetch(int prefetch) {
this.prefetch = prefetch;
}
} }

View File

@ -176,4 +176,8 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
public void setProtocolConverter(IAmqpProtocolConverter protocolConverter) { public void setProtocolConverter(IAmqpProtocolConverter protocolConverter) {
this.protocolConverter = protocolConverter; this.protocolConverter = protocolConverter;
} }
public void setPrefetch(int prefetch) {
protocolConverter.setPrefetch(prefetch);
}
} }

View File

@ -31,4 +31,6 @@ public interface IAmqpProtocolConverter {
void onActiveMQCommand(Command command) throws Exception; void onActiveMQCommand(Command command) throws Exception;
void updateTracer(); void updateTracer();
void setPrefetch(int prefetch);
} }