ARTEMIS-2494: [AMQP] Allow Modified disposition to be used signal address full to a sending peer

This commit is contained in:
Keith Wall 2019-09-20 20:39:10 +01:00 committed by Clebert Suconic
parent 92f87feab9
commit 7bd710520d
8 changed files with 380 additions and 80 deletions

View File

@ -440,6 +440,7 @@ public class AMQPSessionCallback implements SessionCallback {
// Anonymous relay must set a To value // Anonymous relay must set a To value
address = message.getAddressSimpleString(); address = message.getAddressSimpleString();
if (address == null) { if (address == null) {
// Errors are not currently handled as required by AMQP 1.0 anonterm-v1.0
rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer"); rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer");
return; return;
} }
@ -457,14 +458,14 @@ public class AMQPSessionCallback implements SessionCallback {
PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString()); PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString());
if (store != null && store.isRejectingMessages()) { if (store != null && store.isRejectingMessages()) {
// We drop pre-settled messages (and abort any associated Tx) // We drop pre-settled messages (and abort any associated Tx)
String amqpAddress = delivery.getLink().getTarget().getAddress();
ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
if (delivery.remotelySettled()) { if (delivery.remotelySettled()) {
if (transaction != null) { if (transaction != null) {
String amqpAddress = delivery.getLink().getTarget().getAddress();
ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
transaction.markAsRollbackOnly(e); transaction.markAsRollbackOnly(e);
} }
} else { } else {
rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address); throw e;
} }
} else { } else {
serverSend(context, transaction, message, delivery, receiver, routingContext); serverSend(context, transaction, message, delivery, receiver, routingContext);

View File

@ -69,6 +69,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
private int amqpLowCredits = AmqpSupport.AMQP_LOW_CREDITS_DEFAULT; private int amqpLowCredits = AmqpSupport.AMQP_LOW_CREDITS_DEFAULT;
private boolean amqpUseModifiedForTransientDeliveryErrors = AmqpSupport.AMQP_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS;
private int initialRemoteMaxFrameSize = 4 * 1024; private int initialRemoteMaxFrameSize = 4 * 1024;
private String[] saslMechanisms = MechanismFinder.getKnownMechanisms(); private String[] saslMechanisms = MechanismFinder.getKnownMechanisms();
@ -293,4 +295,22 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
this.initialRemoteMaxFrameSize = initialRemoteMaxFrameSize; this.initialRemoteMaxFrameSize = initialRemoteMaxFrameSize;
} }
/**
* Returns true if transient delivery errors should be handled with a Modified disposition
* (if permitted by link)
*/
public boolean isUseModifiedForTransientDeliveryErrors() {
return this.amqpUseModifiedForTransientDeliveryErrors;
}
/**
* Sets if transient delivery errors should be handled with a Modified disposition
* (if permitted by link)
*/
public ProtonProtocolManager setAmqpUseModifiedForTransientDeliveryErrors(boolean amqpUseModifiedForTransientDeliveryErrors) {
this.amqpUseModifiedForTransientDeliveryErrors = amqpUseModifiedForTransientDeliveryErrors;
return this;
}
} }

View File

@ -32,6 +32,9 @@ public class AmqpSupport {
public static final int AMQP_CREDITS_DEFAULT = 1000; public static final int AMQP_CREDITS_DEFAULT = 1000;
public static final int AMQP_LOW_CREDITS_DEFAULT = 300; public static final int AMQP_LOW_CREDITS_DEFAULT = 300;
// Defaults for controlling the behaviour of AMQP dispositions
public static final boolean AMQP_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS = false;
// Identification values used to locating JMS selector types. // Identification values used to locating JMS selector types.
public static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L); public static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L);
public static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string"); public static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string");

View File

@ -20,6 +20,8 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -40,10 +42,14 @@ import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable; import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.codec.ReadableBuffer;
@ -77,6 +83,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
* We set it as ran as the first one should always go through * We set it as ran as the first one should always go through
*/ */
protected final AtomicRunnable creditRunnable; protected final AtomicRunnable creditRunnable;
private final boolean useModified;
/** /**
* This Credit Runnable may be used in Mock tests to simulate the credit semantic here * This Credit Runnable may be used in Mock tests to simulate the credit semantic here
@ -125,6 +132,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
this.amqpCredits = connection.getAmqpCredits(); this.amqpCredits = connection.getAmqpCredits();
this.minCreditRefresh = connection.getAmqpLowCredits(); this.minCreditRefresh = connection.getAmqpLowCredits();
this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver, connection).setRan(); this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver, connection).setRan();
useModified = this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors();
} }
@Override @Override
@ -304,20 +312,11 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data, routingContext); sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data, routingContext);
} catch (Exception e) { } catch (Exception e) {
log.warn(e.getMessage(), e); log.warn(e.getMessage(), e);
Rejected rejected = new Rejected(); DeliveryState deliveryState = determineDeliveryState(((Source) receiver.getSource()),
ErrorCondition condition = new ErrorCondition(); useModified,
e);
if (e instanceof ActiveMQSecurityException) {
condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
} else {
condition.setCondition(Symbol.valueOf("failed"));
}
connection.runLater(() -> { connection.runLater(() -> {
delivery.disposition(deliveryState);
condition.setDescription(e.getMessage());
rejected.setError(condition);
delivery.disposition(rejected);
delivery.settle(); delivery.settle();
flow(); flow();
connection.flush(); connection.flush();
@ -326,6 +325,50 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
} }
} }
private DeliveryState determineDeliveryState(final Source source, final boolean useModified, final Exception e) {
Outcome defaultOutcome = getEffectiveDefaultOutcome(source);
if (isAddressFull(e) && useModified &&
(outcomeSupported(source, Modified.DESCRIPTOR_SYMBOL) || defaultOutcome instanceof Modified)) {
Modified modified = new Modified();
modified.setDeliveryFailed(true);
return modified;
} else {
if (outcomeSupported(source, Rejected.DESCRIPTOR_SYMBOL) || defaultOutcome instanceof Rejected) {
return createRejected(e);
} else if (source.getDefaultOutcome() instanceof DeliveryState) {
return ((DeliveryState) source.getDefaultOutcome());
} else {
// The AMQP specification requires that Accepted is returned for this case. However there exist
// implementations that set neither outcomes/default-outcome but use/expect for full range of outcomes.
// To maintain compatibility with these implementations, we maintain previous behaviour.
return createRejected(e);
}
}
}
private boolean isAddressFull(final Exception e) {
return e instanceof ActiveMQException && ActiveMQExceptionType.ADDRESS_FULL.equals(((ActiveMQException) e).getType());
}
private Rejected createRejected(final Exception e) {
ErrorCondition condition = new ErrorCondition();
// Set condition
if (e instanceof ActiveMQSecurityException) {
condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
} else if (isAddressFull(e)) {
condition.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
} else {
condition.setCondition(Symbol.valueOf("failed"));
}
condition.setDescription(e.getMessage());
Rejected rejected = new Rejected();
rejected.setError(condition);
return rejected;
}
@Override @Override
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException { public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
protonSession.removeReceiver(receiver); protonSession.removeReceiver(receiver);
@ -375,4 +418,15 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
public boolean isDraining() { public boolean isDraining() {
return receiver.draining(); return receiver.draining();
} }
private boolean outcomeSupported(final Source source, final Symbol outcome) {
if (source != null && source.getOutcomes() != null) {
return Arrays.asList(( source).getOutcomes()).contains(outcome);
}
return false;
}
private Outcome getEffectiveDefaultOutcome(final Source source) {
return (source.getOutcomes() == null || source.getOutcomes().length == 0) ? source.getDefaultOutcome() : null;
}
} }

View File

@ -16,16 +16,43 @@
*/ */
package org.apache.activemq.artemis.protocol.amqp.proton; package org.apache.activemq.artemis.protocol.amqp.proton;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Receiver;
import org.junit.Test; import org.junit.Test;
import org.mockito.stubbing.Answer;
public class ProtonServerReceiverContextTest { public class ProtonServerReceiverContextTest {
@ -39,6 +66,44 @@ public class ProtonServerReceiverContextTest {
doOnMessageWithAbortedDeliveryTestImpl(true); doOnMessageWithAbortedDeliveryTestImpl(true);
} }
@Test
public void addressFull_SourceSupportsModified() throws Exception {
doOnMessageWithDeliveryException(asList(Rejected.DESCRIPTOR_SYMBOL,
Accepted.DESCRIPTOR_SYMBOL,
Modified.DESCRIPTOR_SYMBOL),
null, new ActiveMQAddressFullException(),
Modified.class);
}
@Test
public void addressFull_SourceDoesNotSupportModified() throws Exception {
doOnMessageWithDeliveryException(asList(Rejected.DESCRIPTOR_SYMBOL,
Accepted.DESCRIPTOR_SYMBOL),
null, new ActiveMQAddressFullException(),
Rejected.class);
}
@Test
public void otherFailure_SourceSupportsRejects() throws Exception {
doOnMessageWithDeliveryException(asList(Rejected.DESCRIPTOR_SYMBOL,
Accepted.DESCRIPTOR_SYMBOL,
Modified.DESCRIPTOR_SYMBOL),
null, new ActiveMQException(),
Rejected.class);
}
@Test
public void otherFailure_SourceDoesNotSupportReject() throws Exception {
doOnMessageWithDeliveryException(singletonList(Accepted.DESCRIPTOR_SYMBOL),
Accepted.getInstance(), new ActiveMQException(),
Accepted.class);
// violates AMQP specification - see explanation ProtonServerReceiverContext.determineDeliveryState
doOnMessageWithDeliveryException(singletonList(Accepted.DESCRIPTOR_SYMBOL),
null,
new ActiveMQException(),
Rejected.class);
}
private void doOnMessageWithAbortedDeliveryTestImpl(boolean drain) throws ActiveMQAMQPException { private void doOnMessageWithAbortedDeliveryTestImpl(boolean drain) throws ActiveMQAMQPException {
Receiver mockReceiver = mock(Receiver.class); Receiver mockReceiver = mock(Receiver.class);
AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class); AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class);
@ -46,6 +111,8 @@ public class ProtonServerReceiverContextTest {
when(mockConnContext.getAmqpCredits()).thenReturn(100); when(mockConnContext.getAmqpCredits()).thenReturn(100);
when(mockConnContext.getAmqpLowCredits()).thenReturn(30); when(mockConnContext.getAmqpLowCredits()).thenReturn(30);
when(mockConnContext.getProtocolManager()).thenReturn(mock(ProtonProtocolManager.class));
ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext, null, mockReceiver); ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext, null, mockReceiver);
Delivery mockDelivery = mock(Delivery.class); Delivery mockDelivery = mock(Delivery.class);
@ -72,4 +139,48 @@ public class ProtonServerReceiverContextTest {
verifyNoMoreInteractions(mockReceiver); verifyNoMoreInteractions(mockReceiver);
} }
private void doOnMessageWithDeliveryException(List<Symbol> sourceSymbols,
Outcome defaultOutcome, Exception deliveryException,
Class<? extends DeliveryState> expectedDeliveryState) throws Exception {
AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class);
doAnswer((Answer<Void>) invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
}).when(mockConnContext).runLater(any(Runnable.class));
ProtonProtocolManager mockProtocolManager = mock(ProtonProtocolManager.class);
when(mockProtocolManager.isUseModifiedForTransientDeliveryErrors()).thenReturn(true);
when(mockConnContext.getProtocolManager()).thenReturn(mockProtocolManager);
AMQPSessionCallback mockSession = mock(AMQPSessionCallback.class);
Receiver mockReceiver = mock(Receiver.class);
ProtonServerReceiverContext rc = new ProtonServerReceiverContext(mockSession, mockConnContext, null, mockReceiver);
Delivery mockDelivery = mock(Delivery.class);
when(mockDelivery.getLink()).thenReturn(mockReceiver);
when(mockReceiver.current()).thenReturn(mockDelivery);
Source source = new Source();
source.setOutcomes(sourceSymbols.toArray(new Symbol[]{}));
source.setDefaultOutcome(defaultOutcome);
when(mockReceiver.getSource()).thenReturn(source);
doThrow(deliveryException).when(mockSession)
.serverSend(eq(rc),
nullable(Transaction.class),
eq(mockReceiver),
eq(mockDelivery),
nullable(SimpleString.class),
anyInt(),
nullable(ReadableBuffer.class),
any(RoutingContext.class));
rc.onMessage(mockDelivery);
verify(mockDelivery, times(1)).settle();
verify(mockDelivery, times(1)).disposition(any(expectedDeliveryState));
}
} }

View File

@ -59,6 +59,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {}; private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
public static final long DEFAULT_SEND_TIMEOUT = 15000; public static final long DEFAULT_SEND_TIMEOUT = 15000;
public static final Symbol[] DEFAULT_OUTCOMES = {Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL};
private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true); private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true);
private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean();
@ -70,6 +71,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
private final Target userSpecifiedTarget; private final Target userSpecifiedTarget;
private final SenderSettleMode userSpecifiedSenderSettlementMode; private final SenderSettleMode userSpecifiedSenderSettlementMode;
private final ReceiverSettleMode userSpecifiedReceiverSettlementMode; private final ReceiverSettleMode userSpecifiedReceiverSettlementMode;
private final Symbol[] outcomes;
private boolean presettle; private boolean presettle;
private long sendTimeout = DEFAULT_SEND_TIMEOUT; private long sendTimeout = DEFAULT_SEND_TIMEOUT;
@ -92,7 +94,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
* The unique ID assigned to this sender. * The unique ID assigned to this sender.
*/ */
public AmqpSender(AmqpSession session, String address, String senderId) { public AmqpSender(AmqpSession session, String address, String senderId) {
this(session, address, senderId, null, null); this(session, address, senderId, null, null, DEFAULT_OUTCOMES);
} }
/** /**
@ -108,8 +110,15 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
* The {@link SenderSettleMode} to use on open. * The {@link SenderSettleMode} to use on open.
* @param receiverMode * @param receiverMode
* The {@link ReceiverSettleMode} to use on open. * The {@link ReceiverSettleMode} to use on open.
* @param outcomes
* The outcomes to use on open
*/ */
public AmqpSender(AmqpSession session, String address, String senderId, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) { public AmqpSender(AmqpSession session,
String address,
String senderId,
SenderSettleMode senderMode,
ReceiverSettleMode receiverMode,
Symbol[] outcomes) {
if (address != null && address.isEmpty()) { if (address != null && address.isEmpty()) {
throw new IllegalArgumentException("Address cannot be empty."); throw new IllegalArgumentException("Address cannot be empty.");
@ -121,6 +130,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
this.userSpecifiedTarget = null; this.userSpecifiedTarget = null;
this.userSpecifiedSenderSettlementMode = senderMode; this.userSpecifiedSenderSettlementMode = senderMode;
this.userSpecifiedReceiverSettlementMode = receiverMode; this.userSpecifiedReceiverSettlementMode = receiverMode;
this.outcomes = outcomes;
} }
/** /**
@ -145,6 +155,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
this.userSpecifiedTarget = target; this.userSpecifiedTarget = target;
this.userSpecifiedSenderSettlementMode = null; this.userSpecifiedSenderSettlementMode = null;
this.userSpecifiedReceiverSettlementMode = null; this.userSpecifiedReceiverSettlementMode = null;
outcomes = DEFAULT_OUTCOMES;
} }
/** /**
@ -311,11 +322,9 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
@Override @Override
protected void doOpen() { protected void doOpen() {
Symbol[] outcomes = new Symbol[] {Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL};
Source source = new Source(); Source source = new Source();
source.setAddress(senderId); source.setAddress(senderId);
source.setOutcomes(outcomes); source.setOutcomes(this.outcomes);
Target target = userSpecifiedTarget; Target target = userSpecifiedTarget;
if (target == null) { if (target == null) {

View File

@ -189,10 +189,34 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
* *
* @throws Exception if an error occurs while creating the sender. * @throws Exception if an error occurs while creating the sender.
*/ */
public AmqpSender createSender(final String address, final SenderSettleMode senderMode, ReceiverSettleMode receiverMode) throws Exception { public AmqpSender createSender(final String address,
final SenderSettleMode senderMode,
ReceiverSettleMode receiverMode) throws Exception {
return createSender(address, senderMode, receiverMode, AmqpSender.DEFAULT_OUTCOMES);
}
/**
* Create a sender instance using the given address
*
* @param address
* the address to which the sender will produce its messages.
* @param senderSettlementMode
* controls the settlement mode used by the created Sender
* @param receiverSettlementMode
* controls the desired settlement mode used by the remote Receiver
* @param outcomes
* specifies the outcomes supported by the sender
*
* @return a newly created sender that is ready for use.
*
* @throws Exception if an error occurs while creating the sender.
*/
public AmqpSender createSender(final String address,
final SenderSettleMode senderMode,
ReceiverSettleMode receiverMode, final Symbol[] outcomes) throws Exception {
checkClosed(); checkClosed();
final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId(), senderMode, receiverMode); final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId(), senderMode, receiverMode, outcomes);
final ClientFuture request = new ClientFuture(); final ClientFuture request = new ClientFuture();
connection.getScheduler().execute(new Runnable() { connection.getScheduler().execute(new Runnable() {

View File

@ -27,70 +27,148 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
public class AmqpFlowControlFailTest extends JMSClientTestSupport { import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
@Override @RunWith(Enclosed.class)
protected void configureAddressPolicy(ActiveMQServer server) { public class AmqpFlowControlFailTest {
// For BLOCK tests
@RunWith(Parameterized.class)
public static class AmqpFlowControlFailDispositionTests extends JMSClientTestSupport {
@Parameterized.Parameter()
public boolean useModified;
@Parameterized.Parameter(1)
public Symbol[] outcomes;
@Parameterized.Parameter(2)
public String expectedMessage;
@Parameterized.Parameters(name = "useModified={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{true, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL}, "failure at remote"},
{true, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL}, "[condition = amqp:resource-limit-exceeded]"},
{false, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL}, "[condition = amqp:resource-limit-exceeded]"},
{false, new Symbol[]{}, "[condition = amqp:resource-limit-exceeded]"}
});
}
@Override
protected void configureAddressPolicy(ActiveMQServer server) {
AmqpFlowControlFailTest.configureAddressPolicy(server);
}
@Override
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
params.put("amqpUseModifiedForTransientDeliveryErrors", useModified);
}
@Test(timeout = 60000)
public void testAddressFullDisposition() throws Exception {
AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI());
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName(), null, null, outcomes);
boolean rejected = false;
for (int i = 0; i < 1000; i++) {
final AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[10];
message.setBytes(payload);
try {
sender.send(message);
} catch (IOException e) {
rejected = true;
assertTrue(String.format("Unexpected message expected %s to contain %s", e.getMessage(), expectedMessage),
e.getMessage().contains(expectedMessage));
}
}
assertTrue("Expected messages to be refused by broker", rejected);
} finally {
connection.close();
}
}
}
public static class AmqpFlowControlFailOrdinaryTests extends JMSClientTestSupport {
@Override
protected void configureAddressPolicy(ActiveMQServer server) {
AmqpFlowControlFailTest.configureAddressPolicy(server);
}
@Test(timeout = 60000)
public void testMesagesNotSent() throws Exception {
AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI());
AmqpConnection connection = addConnection(client.connect());
int messagesSent = 0;
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
boolean rejected = false;
for (int i = 0; i < 1000; i++) {
final AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[10];
message.setBytes(payload);
try {
sender.send(message);
messagesSent++;
} catch (IOException e) {
rejected = true;
}
}
assertTrue(rejected);
rejected = false;
assertEquals(0, sender.getSender().getCredit());
AmqpSession session2 = connection.createSession();
AmqpReceiver receiver = session2.createReceiver(getQueueName());
receiver.flow(messagesSent);
for (int i = 0; i < messagesSent; i++) {
AmqpMessage receive = receiver.receive();
receive.accept();
}
receiver.close();
session2.close();
Wait.assertEquals(1000, sender.getSender()::getCredit);
for (int i = 0; i < 1000; i++) {
final AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[100];
message.setBytes(payload);
try {
sender.send(message);
} catch (IOException e) {
rejected = true;
}
}
assertTrue(rejected);
assertEquals(0, sender.getSender().getCredit());
} finally {
connection.close();
}
}
}
private static void configureAddressPolicy(final ActiveMQServer server) {
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#"); AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
addressSettings.setMaxSizeBytes(1000); addressSettings.setMaxSizeBytes(1000);
// addressSettings.setMaxSizeBytesRejectThreshold(MAX_SIZE_BYTES_REJECT_THRESHOLD);
server.getAddressSettingsRepository().addMatch("#", addressSettings); server.getAddressSettingsRepository().addMatch("#", addressSettings);
} }
@Test(timeout = 60000)
public void testMesagesNotSent() throws Exception {
AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI());
AmqpConnection connection = addConnection(client.connect());
int messagesSent = 0;
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
boolean rejected = false;
for (int i = 0; i < 1000; i++) {
final AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[10];
message.setBytes(payload);
try {
sender.send(message);
messagesSent++;
System.out.println("message = " + message);
} catch (IOException e) {
rejected = true;
}
}
assertTrue(rejected);
rejected = false;
assertEquals(0, sender.getSender().getCredit());
AmqpSession session2 = connection.createSession();
AmqpReceiver receiver = session2.createReceiver(getQueueName());
receiver.flow(messagesSent);
for (int i = 0; i < messagesSent; i++) {
AmqpMessage receive = receiver.receive();
receive.accept();
}
receiver.close();
session2.close();
Wait.assertEquals(1000, sender.getSender()::getCredit);
for (int i = 0; i < 1000; i++) {
final AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[100];
message.setBytes(payload);
try {
sender.send(message);
} catch (IOException e) {
rejected = true;
}
}
assertTrue(rejected);
assertEquals(0, sender.getSender().getCredit());
} finally {
connection.close();
}
}
} }