https://issues.apache.org/jira/browse/AMQ-5379 - amqp prefetch size and redelivery header problem

This commit is contained in:
Dejan Bosanac 2014-11-27 14:40:56 +01:00
parent 4e3499e41b
commit 0ca376d540
3 changed files with 74 additions and 36 deletions

View File

@ -28,12 +28,12 @@ import org.apache.activemq.command.Command;
*/
public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
private static final int DEFAULT_PREFETCH = 100;
public static final int DEFAULT_PREFETCH = 1000;
private final AmqpTransport transport;
private final BrokerService brokerService;
private int prefetch = DEFAULT_PREFETCH;
private int prefetch = 0;
private int producerCredit = DEFAULT_PREFETCH;
interface Discriminator {

View File

@ -69,14 +69,7 @@ import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.*;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transaction.Declare;
import org.apache.qpid.proton.amqp.transaction.Declared;
@ -322,10 +315,11 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
protected void processLinkFlow(Link link) throws Exception {
Object context = link.getContext();
int credit = link.getRemoteCredit();
if (context != null && context instanceof ConsumerContext) {
if (context instanceof ConsumerContext) {
ConsumerContext consumerContext = (ConsumerContext)context;
// change ActiveMQ consumer prefetch if needed
if (consumerContext.credit == 0 && consumerContext.consumerPrefetch != credit && credit > 0) {
// change consumer prefetch if it's not been already set using
// transport connector property or consumer preference
if (consumerContext.consumerPrefetch == 0 && credit > 0) {
ConsumerControl control = new ConsumerControl();
control.setConsumerId(consumerContext.consumerId);
control.setDestination(consumerContext.destination);
@ -612,6 +606,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private final ActiveMQDestination destination;
private boolean closed;
private final boolean anonymous;
private MessageId lastDispatched;
public ProducerContext(ProducerId producerId, ActiveMQDestination destination, boolean anonymous) {
this.producerId = producerId;
@ -688,9 +683,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
rejected.setError(condition);
delivery.disposition(rejected);
} else {
if (receiver.getCredit() <= (prefetch * .2)) {
LOG.trace("Sending more credit ({}) to producer: {}", prefetch - receiver.getCredit(), producerId);
receiver.flow(prefetch - receiver.getCredit());
if (receiver.getCredit() <= (producerCredit * .2)) {
LOG.trace("Sending more credit ({}) to producer: {}", producerCredit - receiver.getCredit(), producerId);
receiver.flow(producerCredit - receiver.getCredit());
}
if (remoteState != null && remoteState instanceof TransactionalState) {
@ -710,9 +705,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
});
} else {
if (receiver.getCredit() <= (prefetch * .2)) {
LOG.trace("Sending more credit ({}) to producer: {}", prefetch - receiver.getCredit(), producerId);
receiver.flow(prefetch - receiver.getCredit());
if (receiver.getCredit() <= (producerCredit * .2)) {
LOG.trace("Sending more credit ({}) to producer: {}", producerCredit - receiver.getCredit(), producerId);
receiver.flow(producerCredit - receiver.getCredit());
pumpProtonToSocket();
}
sendToActiveMQ(message, null);
@ -838,10 +833,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
// Client is producing to this receiver object
org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget();
int flow = producerCredit;
// use client's preference if set
if (receiver.getRemoteCredit() != 0) {
flow = receiver.getRemoteCredit();
}
try {
if (remoteTarget instanceof Coordinator) {
pumpProtonToSocket();
@ -934,7 +925,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private boolean endOfBrowse = false;
public ActiveMQDestination destination;
public int credit;
public int consumerPrefetch;
public int consumerPrefetch = 0;
private long lastDeliveredSequenceId;
protected LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
@ -978,8 +970,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
if (session != null) {
session.consumers.remove(info.getConsumerId());
}
sendToActiveMQ(new RemoveInfo(consumerId), null);
RemoveInfo removeCommand = new RemoveInfo(consumerId);
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
sendToActiveMQ(removeCommand, null);
}
}
@ -1003,7 +996,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
public void pumpOutbound() throws Exception {
while (!closed) {
while (currentBuffer != null) {
int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
if (sent > 0) {
@ -1089,6 +1081,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
onMessageDispatch((MessageDispatch) delivery.getContext());
} else {
MessageDispatch md = (MessageDispatch) delivery.getContext();
lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
MessageAck ack = new MessageAck();
ack.setConsumerId(consumerId);
ack.setFirstMessageId(md.getMessage().getMessageId());
@ -1110,6 +1103,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
dispatchedInTx.addFirst(md);
}
LOG.trace("Sending Ack to ActiveMQ: {}", ack);
sendToActiveMQ(ack, new ResponseHandler() {
@ -1335,9 +1329,25 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
consumerInfo.setNoRangeAcks(true);
consumerInfo.setDestination(dest);
consumerContext.destination = dest;
consumerInfo.setPrefetchSize(sender.getRemoteCredit());
consumerContext.credit = sender.getRemoteCredit();
consumerContext.consumerPrefetch = consumerInfo.getPrefetchSize();
int senderCredit = sender.getRemoteCredit();
if (prefetch != 0) {
// use the value configured on the transport connector
// this value will not be changed to the consumer's preference
consumerInfo.setPrefetchSize(prefetch);
consumerContext.consumerPrefetch = prefetch;
} else {
if (senderCredit != 0) {
// set the prefetch to the value of the remote credit
// and ignore the later changes
consumerInfo.setPrefetchSize(senderCredit);
consumerContext.consumerPrefetch = senderCredit;
} else {
// set default value for now and change to the consumer's preference
// on the first flow packet
consumerInfo.setPrefetchSize(AMQPProtocolDiscriminator.DEFAULT_PREFETCH);
}
}
consumerContext.credit = senderCredit;
consumerInfo.setDispatchAsync(true);
if (source.getDistributionMode() == COPY && dest.isQueue()) {
consumerInfo.setBrowser(true);

View File

@ -16,12 +16,6 @@
*/
package org.apache.activemq.transport.amqp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.concurrent.CountDownLatch;
@ -55,6 +49,8 @@ import org.objectweb.jtests.jms.framework.TestConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
public class JMSClientTest extends JMSClientTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class);
@ -952,4 +948,36 @@ public class JMSClientTest extends JMSClientTestSupport {
} catch (JMSException ex) {
}
}
@Test(timeout=30000)
public void testRedeliveredHeader() throws Exception {
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
connection.start();
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 1; i < 100; i++) {
Message m = session.createTextMessage(i + ". Sample text");
producer.send(m);
}
MessageConsumer consumer = session.createConsumer(queue);
receiveMessages(consumer);
consumer.close();
consumer = session.createConsumer(queue);
receiveMessages(consumer);
consumer.close();
}
protected void receiveMessages(MessageConsumer consumer) throws Exception {
for (int i = 0; i < 10; i++) {
Message message = consumer.receive(1000);
assertNotNull(message);
assertFalse(message.getJMSRedelivered());
}
}
}