Ensure that we settle the delivery state of incoming deliveries that are
already remotely settled so that the resources associated are freed.
This commit is contained in:
Timothy Bish 2015-04-14 11:18:32 -04:00
parent 47f5c08573
commit b3bf8e74f2
4 changed files with 58 additions and 8 deletions

View File

@ -219,7 +219,6 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
rejected.setError(condition);
delivery.disposition(rejected);
} else {
if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) {
LOG.trace("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId());
getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
@ -234,10 +233,9 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
} else {
delivery.disposition(Accepted.getInstance());
}
delivery.settle();
}
delivery.settle();
session.pumpProtonToSocket();
}
});
@ -247,6 +245,8 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
session.pumpProtonToSocket();
}
delivery.settle();
sendToActiveMQ(message);
}
}

View File

@ -377,13 +377,12 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
outcome = (Outcome) state;
} else {
LOG.warn("Message send updated with unsupported state: {}", state);
continue;
outcome = null;
}
AsyncResult request = (AsyncResult) delivery.getContext();
if (outcome instanceof Accepted) {
toRemove.add(delivery);
LOG.trace("Outcome of delivery was accepted: {}", delivery);
tagGenerator.returnTag(delivery.getTag());
if (request != null && !request.isComplete()) {
@ -391,7 +390,6 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
}
} else if (outcome instanceof Rejected) {
Exception remoteError = getRemoteError();
toRemove.add(delivery);
LOG.trace("Outcome of delivery was rejected: {}", delivery);
tagGenerator.returnTag(delivery.getTag());
if (request != null && !request.isComplete()) {
@ -399,9 +397,12 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
} else {
connection.fireClientException(getRemoteError());
}
} else {
} else if (outcome != null) {
LOG.warn("Message send updated with unsupported outcome: {}", outcome);
}
delivery.settle();
toRemove.add(delivery);
}
pending.removeAll(toRemove);

View File

@ -54,16 +54,33 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
* Create a sender instance using the given address
*
* @param address
* the address to which the sender will produce its messages.
* the address to which the sender will produce its messages.
*
* @return a newly created sender that is ready for use.
*
* @throws Exception if an error occurs while creating the sender.
*/
public AmqpSender createSender(final String address) throws Exception {
return createSender(address, false);
}
/**
* Create a sender instance using the given address
*
* @param address
* the address to which the sender will produce its messages.
* @param presettle
* controls if the created sender produces message that have already been marked settled.
*
* @return a newly created sender that is ready for use.
*
* @throws Exception if an error occurs while creating the sender.
*/
public AmqpSender createSender(final String address, boolean presettle) throws Exception {
checkClosed();
final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId());
sender.setPresettle(presettle);
final ClientFuture request = new ClientFuture();
connection.getScheduler().execute(new Runnable() {

View File

@ -18,14 +18,17 @@ package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.util.Wait;
import org.junit.Test;
/**
@ -91,4 +94,33 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
sender.close();
connection.close();
}
@Test(timeout = 60000)
public void testPresettledSender() throws Exception {
final int MSG_COUNT = 1000;
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("topic://" + getTestName(), true);
for (int i = 0; i < MSG_COUNT; ++i) {
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message: " + i);
sender.send(message);
}
final TopicViewMBean topic = getProxyToTopic(getTestName());
assertTrue("All messages should arrive", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return topic.getEnqueueCount() == MSG_COUNT;
}
}));
sender.close();
connection.close();
}
}