This commit is contained in:
Clebert Suconic 2020-01-08 12:26:41 -05:00
commit c23c2e0e38
5 changed files with 210 additions and 130 deletions

View File

@ -73,6 +73,10 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
private boolean amqpUseModifiedForTransientDeliveryErrors = AmqpSupport.AMQP_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS;
// If set true, a reject disposition will be treated as if it were an unmodified disposition with the
// delivery-failed flag set true.
private boolean amqpTreatRejectAsUnmodifiedDeliveryFailed = AmqpSupport.AMQP_TREAT_REJECT_AS_UNMODIFIED_DELIVERY_FAILURE;
private int initialRemoteMaxFrameSize = 4 * 1024;
private String[] saslMechanisms = MechanismFinder.getKnownMechanisms();
@ -324,4 +328,11 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
}
public void setAmqpTreatRejectAsUnmodifiedDeliveryFailed(final boolean amqpTreatRejectAsUnmodifiedDeliveryFailed) {
this.amqpTreatRejectAsUnmodifiedDeliveryFailed = amqpTreatRejectAsUnmodifiedDeliveryFailed;
}
public boolean isAmqpTreatRejectAsUnmodifiedDeliveryFailed() {
return this.amqpTreatRejectAsUnmodifiedDeliveryFailed;
}
}

View File

@ -32,6 +32,9 @@ public class AmqpSupport {
public static final int AMQP_CREDITS_DEFAULT = 1000;
public static final int AMQP_LOW_CREDITS_DEFAULT = 300;
// Defaults for controlling the interpretation of AMQP dispositions
public static final boolean AMQP_TREAT_REJECT_AS_UNMODIFIED_DELIVERY_FAILURE = false;
// Defaults for controlling the behaviour of AMQP dispositions
public static final boolean AMQP_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS = false;

View File

@ -59,6 +59,7 @@ import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
@ -118,6 +119,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
* */
private final Object creditsLock = new Object();
private final java.util.function.Consumer<? super MessageReference> executeDelivery;
private final boolean amqpTreatRejectAsUnmodifiedDeliveryFailed;
public ProtonServerSenderContext(AMQPConnectionContext connection,
Sender sender,
@ -129,6 +131,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
this.protonSession = protonSession;
this.sessionSPI = server;
this.executeDelivery = this::executeDelivery;
amqpTreatRejectAsUnmodifiedDeliveryFailed = this.connection.getProtocolManager()
.isAmqpTreatRejectAsUnmodifiedDeliveryFailed();
}
public Object getBrokerConsumer() {
@ -681,7 +685,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
break;
case Rejected:
try {
sessionSPI.reject(brokerConsumer, message);
if (amqpTreatRejectAsUnmodifiedDeliveryFailed) {
// We could be more discriminating - for instance check for AmqpError#RESOURCE_LIMIT_EXCEEDED
sessionSPI.cancel(brokerConsumer, message, true);
} else {
sessionSPI.reject(brokerConsumer, message);
}
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
}

View File

@ -16,11 +16,14 @@
*/
package org.apache.activemq.artemis.protocol.amqp.proton;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.engine.Sender;
import org.junit.Test;
import java.util.Collections;
@ -34,8 +37,11 @@ public class ProtonServerSenderContextTest {
@Test(expected = ActiveMQAMQPNotFoundException.class)
public void testAcceptsNullSourceAddressWhenInitialising() throws Exception {
ProtonProtocolManager mock = mock(ProtonProtocolManager.class);
when(mock.getServer()).thenReturn(mock(ActiveMQServer.class));
Sender mockSender = mock(Sender.class);
AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class);
when(mockConnContext.getProtocolManager()).thenReturn(mock);
AMQPSessionCallback mockSessionCallback = mock(AMQPSessionCallback.class);

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.amqp.client.AmqpClient;
@ -25,149 +26,199 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.message.Message;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
/**
* Test various behaviors of AMQP receivers with the broker.
*/
public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
@RunWith(Enclosed.class)
public class AmqpReceiverDispositionTest {
@Test(timeout = 30000)
public void testReleasedDisposition() throws Exception {
sendMessages(getQueueName(), 1);
public static class AmqpReceiverDispositionOrdinaryTests extends AmqpClientTestSupport {
@Test(timeout = 30000)
public void testReleasedDisposition() throws Exception {
sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(1);
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
AmqpReceiver receiver2 = session.createReceiver(getQueueName());
assertNotNull("did not receive message first time", message);
assertEquals("MessageID:0", message.getMessageId());
Message protonMessage = message.getWrappedMessage();
assertNotNull(protonMessage);
assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
receiver2.flow(1);
message.release();
// Read the message again and validate its state
message = receiver2.receive(10, TimeUnit.SECONDS);
assertNotNull("did not receive message again", message);
assertEquals("MessageID:0", message.getMessageId());
message.accept();
protonMessage = message.getWrappedMessage();
assertNotNull(protonMessage);
assertEquals("Unexpected updated value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
connection.close();
}
@Test(timeout = 30000)
public void testRejectedDisposition() throws Exception {
sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(1);
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
assertNotNull("did not receive message first time", message);
assertEquals("MessageID:0", message.getMessageId());
Message protonMessage = message.getWrappedMessage();
assertNotNull(protonMessage);
assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
message.reject();
// Reject is a terminal outcome and should not be redelivered to the rejecting receiver
// or any other as it should move to the archived state.
receiver1.flow(1);
message = receiver1.receiveNoWait();
assertNull("Should not receive message again", message);
// Attempt to Read the message again with another receiver to validate it is archived.
AmqpReceiver receiver2 = session.createReceiver(getQueueName());
receiver2.flow(1);
assertNull(receiver2.receiveNoWait());
connection.close();
}
@Test(timeout = 30000)
public void testModifiedDispositionWithDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception {
doModifiedDispositionTestImpl(Boolean.TRUE, null);
}
@Test(timeout = 30000)
public void testModifiedDispositionWithoutDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception {
doModifiedDispositionTestImpl(null, null);
}
@Test(timeout = 30000)
public void testModifiedDispositionWithoutDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception {
doModifiedDispositionTestImpl(null, Boolean.TRUE);
}
@Test(timeout = 30000)
public void testModifiedDispositionWithDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception {
doModifiedDispositionTestImpl(Boolean.TRUE, Boolean.TRUE);
}
private void doModifiedDispositionTestImpl(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception {
sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(1);
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
assertNotNull("did not receive message first time", message);
Message protonMessage = message.getWrappedMessage();
assertNotNull(protonMessage);
assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
message.modified(deliveryFailed, undeliverableHere);
// Remote must not redispatch to the client if undeliverable here is true
if (Boolean.TRUE.equals(undeliverableHere)) {
AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(1);
message = receiver1.receive(1, TimeUnit.SECONDS);
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
AmqpReceiver receiver2 = session.createReceiver(getQueueName());
assertNotNull("did not receive message first time", message);
assertEquals("MessageID:0", message.getMessageId());
Message protonMessage = message.getWrappedMessage();
assertNotNull(protonMessage);
assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
receiver2.flow(1);
message.release();
// Read the message again and validate its state
message = receiver2.receive(10, TimeUnit.SECONDS);
assertNotNull("did not receive message again", message);
assertEquals("MessageID:0", message.getMessageId());
message.accept();
protonMessage = message.getWrappedMessage();
assertNotNull(protonMessage);
assertEquals("Unexpected updated value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
connection.close();
}
@Test(timeout = 30000)
public void testRejectedDisposition() throws Exception {
sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(1);
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
assertNotNull("did not receive message first time", message);
assertEquals("MessageID:0", message.getMessageId());
Message protonMessage = message.getWrappedMessage();
assertNotNull(protonMessage);
assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
message.reject();
// Reject is a terminal outcome and should not be redelivered to the rejecting receiver
// or any other as it should move to the archived state.
receiver1.flow(1);
message = receiver1.receiveNoWait();
assertNull("Should not receive message again", message);
// Attempt to Read the message again with another receiver to validate it is archived.
AmqpReceiver receiver2 = session.createReceiver(getQueueName());
receiver2.flow(1);
assertNull(receiver2.receiveNoWait());
connection.close();
}
AmqpReceiver receiver2 = session.createReceiver(getQueueName());
receiver2.flow(1);
message = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull("did not receive message again", message);
int expectedDeliveryCount = 0;
if (Boolean.TRUE.equals(deliveryFailed)) {
expectedDeliveryCount = 1;
@Test(timeout = 30000)
public void testModifiedDispositionWithDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception {
doModifiedDispositionTestImpl(Boolean.TRUE, null);
}
message.accept();
@Test(timeout = 30000)
public void testModifiedDispositionWithoutDeliveryFailedWithoutUndeliverableHereFieldsSet() throws Exception {
doModifiedDispositionTestImpl(null, null);
}
Message protonMessage2 = message.getWrappedMessage();
assertNotNull(protonMessage2);
assertEquals("Unexpected updated value for AMQP delivery-count", expectedDeliveryCount, protonMessage2.getDeliveryCount());
@Test(timeout = 30000)
public void testModifiedDispositionWithoutDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception {
doModifiedDispositionTestImpl(null, Boolean.TRUE);
}
connection.close();
@Test(timeout = 30000)
public void testModifiedDispositionWithDeliveryFailedWithUndeliverableHereFieldsSet() throws Exception {
doModifiedDispositionTestImpl(Boolean.TRUE, Boolean.TRUE);
}
private void doModifiedDispositionTestImpl(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception {
sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(1);
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
assertNotNull("did not receive message first time", message);
Message protonMessage = message.getWrappedMessage();
assertNotNull(protonMessage);
assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
message.modified(deliveryFailed, undeliverableHere);
// Remote must not redispatch to the client if undeliverable here is true
if (Boolean.TRUE.equals(undeliverableHere)) {
receiver1.flow(1);
message = receiver1.receive(1, TimeUnit.SECONDS);
assertNull("Should not receive message again", message);
}
AmqpReceiver receiver2 = session.createReceiver(getQueueName());
receiver2.flow(1);
message = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull("did not receive message again", message);
int expectedDeliveryCount = 0;
if (Boolean.TRUE.equals(deliveryFailed)) {
expectedDeliveryCount = 1;
}
message.accept();
Message protonMessage2 = message.getWrappedMessage();
assertNotNull(protonMessage2);
assertEquals("Unexpected updated value for AMQP delivery-count",
expectedDeliveryCount,
protonMessage2.getDeliveryCount());
connection.close();
}
}
public static class AmqpReceiverDispositionRejectAsUnmodifiedModeTests extends AmqpClientTestSupport {
@Override
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
params.put("amqpTreatRejectAsUnmodifiedDeliveryFailed", true);
}
@Test(timeout = 30000)
public void testRejectedDisposition() throws Exception {
sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(1);
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
assertNotNull("did not receive message first time", message);
assertEquals("MessageID:0", message.getMessageId());
Message protonMessage = message.getWrappedMessage();
assertNotNull(protonMessage);
assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
// Owing to the config, the reject should be treat as if it were a
// Unmodified delivery-failed=true
message.reject();
receiver1.flow(1);
message = receiver1.receive(5, TimeUnit.SECONDS);
assertNotNull("did not receive message after reject", message);
assertEquals("MessageID:0", message.getMessageId());
protonMessage = message.getWrappedMessage();
assertNotNull(protonMessage);
assertEquals("Unexpected value for AMQP delivery-count after redelivery", 1, protonMessage.getDeliveryCount());
connection.close();
}
}
}