ARTEMIS-2497: [AMQP] Allow handling of the reject disposition to be configured.
This commit is contained in:
parent
f2268657ce
commit
39cd9d56f6
|
@ -73,6 +73,10 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
|
||||||
|
|
||||||
private boolean amqpUseModifiedForTransientDeliveryErrors = AmqpSupport.AMQP_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS;
|
private boolean amqpUseModifiedForTransientDeliveryErrors = AmqpSupport.AMQP_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS;
|
||||||
|
|
||||||
|
// If set true, a reject disposition will be treated as if it were an unmodified disposition with the
|
||||||
|
// delivery-failed flag set true.
|
||||||
|
private boolean amqpTreatRejectAsUnmodifiedDeliveryFailed = AmqpSupport.AMQP_TREAT_REJECT_AS_UNMODIFIED_DELIVERY_FAILURE;
|
||||||
|
|
||||||
private int initialRemoteMaxFrameSize = 4 * 1024;
|
private int initialRemoteMaxFrameSize = 4 * 1024;
|
||||||
|
|
||||||
private String[] saslMechanisms = MechanismFinder.getKnownMechanisms();
|
private String[] saslMechanisms = MechanismFinder.getKnownMechanisms();
|
||||||
|
@ -324,4 +328,11 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void setAmqpTreatRejectAsUnmodifiedDeliveryFailed(final boolean amqpTreatRejectAsUnmodifiedDeliveryFailed) {
|
||||||
|
this.amqpTreatRejectAsUnmodifiedDeliveryFailed = amqpTreatRejectAsUnmodifiedDeliveryFailed;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isAmqpTreatRejectAsUnmodifiedDeliveryFailed() {
|
||||||
|
return this.amqpTreatRejectAsUnmodifiedDeliveryFailed;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,9 @@ public class AmqpSupport {
|
||||||
public static final int AMQP_CREDITS_DEFAULT = 1000;
|
public static final int AMQP_CREDITS_DEFAULT = 1000;
|
||||||
public static final int AMQP_LOW_CREDITS_DEFAULT = 300;
|
public static final int AMQP_LOW_CREDITS_DEFAULT = 300;
|
||||||
|
|
||||||
|
// Defaults for controlling the interpretation of AMQP dispositions
|
||||||
|
public static final boolean AMQP_TREAT_REJECT_AS_UNMODIFIED_DELIVERY_FAILURE = false;
|
||||||
|
|
||||||
// Defaults for controlling the behaviour of AMQP dispositions
|
// Defaults for controlling the behaviour of AMQP dispositions
|
||||||
public static final boolean AMQP_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS = false;
|
public static final boolean AMQP_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS = false;
|
||||||
|
|
||||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.qpid.proton.amqp.Symbol;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Accepted;
|
import org.apache.qpid.proton.amqp.messaging.Accepted;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Modified;
|
import org.apache.qpid.proton.amqp.messaging.Modified;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Outcome;
|
import org.apache.qpid.proton.amqp.messaging.Outcome;
|
||||||
|
import org.apache.qpid.proton.amqp.messaging.Rejected;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Source;
|
import org.apache.qpid.proton.amqp.messaging.Source;
|
||||||
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
|
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
|
||||||
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
|
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
|
||||||
|
@ -118,6 +119,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
* */
|
* */
|
||||||
private final Object creditsLock = new Object();
|
private final Object creditsLock = new Object();
|
||||||
private final java.util.function.Consumer<? super MessageReference> executeDelivery;
|
private final java.util.function.Consumer<? super MessageReference> executeDelivery;
|
||||||
|
private final boolean amqpTreatRejectAsUnmodifiedDeliveryFailed;
|
||||||
|
|
||||||
public ProtonServerSenderContext(AMQPConnectionContext connection,
|
public ProtonServerSenderContext(AMQPConnectionContext connection,
|
||||||
Sender sender,
|
Sender sender,
|
||||||
|
@ -129,6 +131,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
this.protonSession = protonSession;
|
this.protonSession = protonSession;
|
||||||
this.sessionSPI = server;
|
this.sessionSPI = server;
|
||||||
this.executeDelivery = this::executeDelivery;
|
this.executeDelivery = this::executeDelivery;
|
||||||
|
amqpTreatRejectAsUnmodifiedDeliveryFailed = this.connection.getProtocolManager()
|
||||||
|
.isAmqpTreatRejectAsUnmodifiedDeliveryFailed();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Object getBrokerConsumer() {
|
public Object getBrokerConsumer() {
|
||||||
|
@ -681,7 +685,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
break;
|
break;
|
||||||
case Rejected:
|
case Rejected:
|
||||||
try {
|
try {
|
||||||
|
if (amqpTreatRejectAsUnmodifiedDeliveryFailed) {
|
||||||
|
// We could be more discriminating - for instance check for AmqpError#RESOURCE_LIMIT_EXCEEDED
|
||||||
|
sessionSPI.cancel(brokerConsumer, message, true);
|
||||||
|
} else {
|
||||||
sessionSPI.reject(brokerConsumer, message);
|
sessionSPI.reject(brokerConsumer, message);
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
|
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,11 +16,14 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.protocol.amqp.proton;
|
package org.apache.activemq.artemis.protocol.amqp.proton;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.AddressQueryResult;
|
import org.apache.activemq.artemis.core.server.AddressQueryResult;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
|
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
|
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Source;
|
import org.apache.qpid.proton.amqp.messaging.Source;
|
||||||
import org.apache.qpid.proton.engine.Sender;
|
import org.apache.qpid.proton.engine.Sender;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -34,8 +37,11 @@ public class ProtonServerSenderContextTest {
|
||||||
|
|
||||||
@Test(expected = ActiveMQAMQPNotFoundException.class)
|
@Test(expected = ActiveMQAMQPNotFoundException.class)
|
||||||
public void testAcceptsNullSourceAddressWhenInitialising() throws Exception {
|
public void testAcceptsNullSourceAddressWhenInitialising() throws Exception {
|
||||||
|
ProtonProtocolManager mock = mock(ProtonProtocolManager.class);
|
||||||
|
when(mock.getServer()).thenReturn(mock(ActiveMQServer.class));
|
||||||
Sender mockSender = mock(Sender.class);
|
Sender mockSender = mock(Sender.class);
|
||||||
AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class);
|
AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class);
|
||||||
|
when(mockConnContext.getProtocolManager()).thenReturn(mock);
|
||||||
|
|
||||||
AMQPSessionCallback mockSessionCallback = mock(AMQPSessionCallback.class);
|
AMQPSessionCallback mockSessionCallback = mock(AMQPSessionCallback.class);
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||||
|
@ -25,12 +26,16 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||||
import org.apache.qpid.proton.message.Message;
|
import org.apache.qpid.proton.message.Message;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.runners.Enclosed;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test various behaviors of AMQP receivers with the broker.
|
* Test various behaviors of AMQP receivers with the broker.
|
||||||
*/
|
*/
|
||||||
public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
|
@RunWith(Enclosed.class)
|
||||||
|
public class AmqpReceiverDispositionTest {
|
||||||
|
|
||||||
|
public static class AmqpReceiverDispositionOrdinaryTests extends AmqpClientTestSupport {
|
||||||
@Test(timeout = 30000)
|
@Test(timeout = 30000)
|
||||||
public void testReleasedDisposition() throws Exception {
|
public void testReleasedDisposition() throws Exception {
|
||||||
sendMessages(getQueueName(), 1);
|
sendMessages(getQueueName(), 1);
|
||||||
|
@ -166,8 +171,54 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
|
||||||
|
|
||||||
Message protonMessage2 = message.getWrappedMessage();
|
Message protonMessage2 = message.getWrappedMessage();
|
||||||
assertNotNull(protonMessage2);
|
assertNotNull(protonMessage2);
|
||||||
assertEquals("Unexpected updated value for AMQP delivery-count", expectedDeliveryCount, protonMessage2.getDeliveryCount());
|
assertEquals("Unexpected updated value for AMQP delivery-count",
|
||||||
|
expectedDeliveryCount,
|
||||||
|
protonMessage2.getDeliveryCount());
|
||||||
|
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class AmqpReceiverDispositionRejectAsUnmodifiedModeTests extends AmqpClientTestSupport {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
|
||||||
|
params.put("amqpTreatRejectAsUnmodifiedDeliveryFailed", true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testRejectedDisposition() throws Exception {
|
||||||
|
sendMessages(getQueueName(), 1);
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpReceiver receiver1 = session.createReceiver(getQueueName());
|
||||||
|
receiver1.flow(1);
|
||||||
|
|
||||||
|
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull("did not receive message first time", message);
|
||||||
|
assertEquals("MessageID:0", message.getMessageId());
|
||||||
|
|
||||||
|
Message protonMessage = message.getWrappedMessage();
|
||||||
|
assertNotNull(protonMessage);
|
||||||
|
assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
|
||||||
|
|
||||||
|
// Owing to the config, the reject should be treat as if it were a
|
||||||
|
// Unmodified delivery-failed=true
|
||||||
|
message.reject();
|
||||||
|
receiver1.flow(1);
|
||||||
|
|
||||||
|
message = receiver1.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull("did not receive message after reject", message);
|
||||||
|
assertEquals("MessageID:0", message.getMessageId());
|
||||||
|
|
||||||
|
protonMessage = message.getWrappedMessage();
|
||||||
|
assertNotNull(protonMessage);
|
||||||
|
assertEquals("Unexpected value for AMQP delivery-count after redelivery", 1, protonMessage.getDeliveryCount());
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue