mirror of https://github.com/apache/activemq.git
AMQ-5890: prevent NPE if Modified disposition is applied without the delivery-failed flag set, add some general tests of Modified handling
https://issues.apache.org/jira/browse/AMQ-5890
This commit is contained in:
parent
c85c7c1472
commit
0cfd225912
|
@ -227,7 +227,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
|
||||||
settle(delivery, -1);
|
settle(delivery, -1);
|
||||||
} else if (state instanceof Modified) {
|
} else if (state instanceof Modified) {
|
||||||
Modified modified = (Modified) state;
|
Modified modified = (Modified) state;
|
||||||
if (modified.getDeliveryFailed()) {
|
if (Boolean.TRUE.equals(modified.getDeliveryFailed())) {
|
||||||
// increment delivery counter..
|
// increment delivery counter..
|
||||||
md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
|
md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,43 +140,21 @@ public class AmqpMessage {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Rejects the message, marking it as not deliverable here and failed to deliver.
|
* Marks the message as Modified, indicating whether it failed to deliver and is not deliverable here.
|
||||||
*
|
*
|
||||||
* @throws Exception if an error occurs during the reject.
|
|
||||||
*/
|
|
||||||
public void reject() throws Exception {
|
|
||||||
reject(true, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Rejects the message, marking it as failed to deliver and applying the given value
|
|
||||||
* to the undeliverable here tag.
|
|
||||||
*
|
|
||||||
* @param undeliverableHere
|
|
||||||
* marks the delivery as not being able to be process by link it was sent to.
|
|
||||||
*
|
|
||||||
* @throws Exception if an error occurs during the reject.
|
|
||||||
*/
|
|
||||||
public void reject(boolean undeliverableHere) throws Exception {
|
|
||||||
reject(undeliverableHere, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Rejects the message, marking it as not deliverable here and failed to deliver.
|
|
||||||
*
|
|
||||||
* @param undeliverableHere
|
|
||||||
* marks the delivery as not being able to be process by link it was sent to.
|
|
||||||
* @param deliveryFailed
|
* @param deliveryFailed
|
||||||
* indicates that the delivery failed for some reason.
|
* indicates that the delivery failed for some reason.
|
||||||
|
* @param undeliverableHere
|
||||||
|
* marks the delivery as not being able to be process by link it was sent to.
|
||||||
*
|
*
|
||||||
* @throws Exception if an error occurs during the reject.
|
* @throws Exception if an error occurs during the process.
|
||||||
*/
|
*/
|
||||||
public void reject(boolean undeliverableHere, boolean deliveryFailed) throws Exception {
|
public void modified(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception {
|
||||||
if (receiver == null) {
|
if (receiver == null) {
|
||||||
throw new IllegalStateException("Can't reject non-received message.");
|
throw new IllegalStateException("Can't modify non-received message.");
|
||||||
}
|
}
|
||||||
|
|
||||||
receiver.reject(delivery, undeliverableHere, deliveryFailed);
|
receiver.modified(delivery, deliveryFailed, undeliverableHere);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -316,18 +316,17 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reject a message that was dispatched under the given Delivery instance.
|
* Mark a message that was dispatched under the given Delivery instance as Modified.
|
||||||
*
|
*
|
||||||
* @param delivery
|
* @param delivery
|
||||||
* the Delivery instance to reject.
|
* the Delivery instance to mark modified.
|
||||||
* @param undeliverableHere
|
|
||||||
* marks the delivery as not being able to be process by link it was sent to.
|
|
||||||
* @param deliveryFailed
|
* @param deliveryFailed
|
||||||
* indicates that the delivery failed for some reason.
|
* indicates that the delivery failed for some reason.
|
||||||
*
|
* @param undeliverableHere
|
||||||
|
* marks the delivery as not being able to be process by link it was sent to.
|
||||||
* @throws IOException if an error occurs while sending the reject.
|
* @throws IOException if an error occurs while sending the reject.
|
||||||
*/
|
*/
|
||||||
public void reject(final Delivery delivery, final boolean undeliverableHere, final boolean deliveryFailed) throws IOException {
|
public void modified(final Delivery delivery, final Boolean deliveryFailed, final Boolean undeliverableHere) throws IOException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
|
|
||||||
if (delivery == null) {
|
if (delivery == null) {
|
||||||
|
|
|
@ -21,6 +21,7 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS
|
||||||
import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
|
import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -43,6 +44,7 @@ import org.apache.qpid.proton.amqp.messaging.Source;
|
||||||
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
|
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
|
||||||
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
|
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
|
||||||
import org.apache.qpid.proton.engine.Receiver;
|
import org.apache.qpid.proton.engine.Receiver;
|
||||||
|
import org.apache.qpid.proton.message.Message;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -412,4 +414,67 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
|
||||||
connection.getStateInspector().assertValid();
|
connection.getStateInspector().assertValid();
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testModifiedDispositionWithDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception {
|
||||||
|
doModifiedDispositionTestImpl(Boolean.TRUE, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testModifiedDispositionWithoutDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception {
|
||||||
|
doModifiedDispositionTestImpl(null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testModifiedDispositionWithoutDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception {
|
||||||
|
doModifiedDispositionTestImpl(null, Boolean.TRUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testModifiedDispositionWithDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception {
|
||||||
|
doModifiedDispositionTestImpl(Boolean.TRUE, Boolean.TRUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doModifiedDispositionTestImpl(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception {
|
||||||
|
int msgCount = 1;
|
||||||
|
sendMessages(getTestName(), msgCount, false);
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = client.connect();
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||||
|
receiver.flow(2 * msgCount);
|
||||||
|
|
||||||
|
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull("did not receive message first time", message);
|
||||||
|
|
||||||
|
Message protonMessage = message.getWrappedMessage();
|
||||||
|
assertNotNull(protonMessage);
|
||||||
|
assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
|
||||||
|
|
||||||
|
message.modified(deliveryFailed, undeliverableHere);
|
||||||
|
|
||||||
|
if(Boolean.TRUE.equals(undeliverableHere)) {
|
||||||
|
message = receiver.receive(250, TimeUnit.MILLISECONDS);
|
||||||
|
assertNull("Should not receive message again", message);
|
||||||
|
} else {
|
||||||
|
message = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull("did not receive message again", message);
|
||||||
|
|
||||||
|
int expectedDeliveryCount = 0;
|
||||||
|
if(Boolean.TRUE.equals(deliveryFailed)) {
|
||||||
|
expectedDeliveryCount = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message.accept();
|
||||||
|
|
||||||
|
Message protonMessage2 = message.getWrappedMessage();
|
||||||
|
assertNotNull(protonMessage2);
|
||||||
|
assertEquals("Unexpected updated value for AMQP delivery-count", expectedDeliveryCount, protonMessage2.getDeliveryCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
receiver.close();
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue