This closes #2847
This commit is contained in:
commit
e86df5a350
|
@ -440,6 +440,7 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
// Anonymous relay must set a To value
|
||||
address = message.getAddressSimpleString();
|
||||
if (address == null) {
|
||||
// Errors are not currently handled as required by AMQP 1.0 anonterm-v1.0
|
||||
rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer");
|
||||
return;
|
||||
}
|
||||
|
@ -457,14 +458,14 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString());
|
||||
if (store != null && store.isRejectingMessages()) {
|
||||
// We drop pre-settled messages (and abort any associated Tx)
|
||||
String amqpAddress = delivery.getLink().getTarget().getAddress();
|
||||
ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
|
||||
if (delivery.remotelySettled()) {
|
||||
if (transaction != null) {
|
||||
String amqpAddress = delivery.getLink().getTarget().getAddress();
|
||||
ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
|
||||
transaction.markAsRollbackOnly(e);
|
||||
}
|
||||
} else {
|
||||
rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address);
|
||||
throw e;
|
||||
}
|
||||
} else {
|
||||
serverSend(context, transaction, message, delivery, receiver, routingContext);
|
||||
|
|
|
@ -69,6 +69,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
|
|||
|
||||
private int amqpLowCredits = AmqpSupport.AMQP_LOW_CREDITS_DEFAULT;
|
||||
|
||||
private boolean amqpUseModifiedForTransientDeliveryErrors = AmqpSupport.AMQP_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS;
|
||||
|
||||
private int initialRemoteMaxFrameSize = 4 * 1024;
|
||||
|
||||
private String[] saslMechanisms = MechanismFinder.getKnownMechanisms();
|
||||
|
@ -293,4 +295,22 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
|
|||
this.initialRemoteMaxFrameSize = initialRemoteMaxFrameSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if transient delivery errors should be handled with a Modified disposition
|
||||
* (if permitted by link)
|
||||
*/
|
||||
public boolean isUseModifiedForTransientDeliveryErrors() {
|
||||
return this.amqpUseModifiedForTransientDeliveryErrors;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets if transient delivery errors should be handled with a Modified disposition
|
||||
* (if permitted by link)
|
||||
*/
|
||||
public ProtonProtocolManager setAmqpUseModifiedForTransientDeliveryErrors(boolean amqpUseModifiedForTransientDeliveryErrors) {
|
||||
this.amqpUseModifiedForTransientDeliveryErrors = amqpUseModifiedForTransientDeliveryErrors;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -32,6 +32,9 @@ public class AmqpSupport {
|
|||
public static final int AMQP_CREDITS_DEFAULT = 1000;
|
||||
public static final int AMQP_LOW_CREDITS_DEFAULT = 300;
|
||||
|
||||
// Defaults for controlling the behaviour of AMQP dispositions
|
||||
public static final boolean AMQP_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS = false;
|
||||
|
||||
// Identification values used to locating JMS selector types.
|
||||
public static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L);
|
||||
public static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string");
|
||||
|
|
|
@ -20,6 +20,8 @@ import java.util.Arrays;
|
|||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
@ -40,10 +42,14 @@ import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
|
|||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.messaging.Modified;
|
||||
import org.apache.qpid.proton.amqp.messaging.Outcome;
|
||||
import org.apache.qpid.proton.amqp.messaging.Rejected;
|
||||
import org.apache.qpid.proton.amqp.messaging.Source;
|
||||
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
|
||||
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
|
||||
import org.apache.qpid.proton.amqp.transport.AmqpError;
|
||||
import org.apache.qpid.proton.amqp.transport.DeliveryState;
|
||||
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
||||
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
|
||||
import org.apache.qpid.proton.codec.ReadableBuffer;
|
||||
|
@ -77,6 +83,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
|||
* We set it as ran as the first one should always go through
|
||||
*/
|
||||
protected final AtomicRunnable creditRunnable;
|
||||
private final boolean useModified;
|
||||
|
||||
/**
|
||||
* This Credit Runnable may be used in Mock tests to simulate the credit semantic here
|
||||
|
@ -125,6 +132,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
|||
this.amqpCredits = connection.getAmqpCredits();
|
||||
this.minCreditRefresh = connection.getAmqpLowCredits();
|
||||
this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver, connection).setRan();
|
||||
useModified = this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -304,20 +312,11 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
|||
sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data, routingContext);
|
||||
} catch (Exception e) {
|
||||
log.warn(e.getMessage(), e);
|
||||
Rejected rejected = new Rejected();
|
||||
ErrorCondition condition = new ErrorCondition();
|
||||
|
||||
if (e instanceof ActiveMQSecurityException) {
|
||||
condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
|
||||
} else {
|
||||
condition.setCondition(Symbol.valueOf("failed"));
|
||||
}
|
||||
DeliveryState deliveryState = determineDeliveryState(((Source) receiver.getSource()),
|
||||
useModified,
|
||||
e);
|
||||
connection.runLater(() -> {
|
||||
|
||||
condition.setDescription(e.getMessage());
|
||||
rejected.setError(condition);
|
||||
|
||||
delivery.disposition(rejected);
|
||||
delivery.disposition(deliveryState);
|
||||
delivery.settle();
|
||||
flow();
|
||||
connection.flush();
|
||||
|
@ -326,6 +325,50 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
|||
}
|
||||
}
|
||||
|
||||
private DeliveryState determineDeliveryState(final Source source, final boolean useModified, final Exception e) {
|
||||
Outcome defaultOutcome = getEffectiveDefaultOutcome(source);
|
||||
|
||||
if (isAddressFull(e) && useModified &&
|
||||
(outcomeSupported(source, Modified.DESCRIPTOR_SYMBOL) || defaultOutcome instanceof Modified)) {
|
||||
Modified modified = new Modified();
|
||||
modified.setDeliveryFailed(true);
|
||||
return modified;
|
||||
} else {
|
||||
if (outcomeSupported(source, Rejected.DESCRIPTOR_SYMBOL) || defaultOutcome instanceof Rejected) {
|
||||
return createRejected(e);
|
||||
} else if (source.getDefaultOutcome() instanceof DeliveryState) {
|
||||
return ((DeliveryState) source.getDefaultOutcome());
|
||||
} else {
|
||||
// The AMQP specification requires that Accepted is returned for this case. However there exist
|
||||
// implementations that set neither outcomes/default-outcome but use/expect for full range of outcomes.
|
||||
// To maintain compatibility with these implementations, we maintain previous behaviour.
|
||||
return createRejected(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isAddressFull(final Exception e) {
|
||||
return e instanceof ActiveMQException && ActiveMQExceptionType.ADDRESS_FULL.equals(((ActiveMQException) e).getType());
|
||||
}
|
||||
|
||||
private Rejected createRejected(final Exception e) {
|
||||
ErrorCondition condition = new ErrorCondition();
|
||||
|
||||
// Set condition
|
||||
if (e instanceof ActiveMQSecurityException) {
|
||||
condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
|
||||
} else if (isAddressFull(e)) {
|
||||
condition.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
|
||||
} else {
|
||||
condition.setCondition(Symbol.valueOf("failed"));
|
||||
}
|
||||
condition.setDescription(e.getMessage());
|
||||
|
||||
Rejected rejected = new Rejected();
|
||||
rejected.setError(condition);
|
||||
return rejected;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
|
||||
protonSession.removeReceiver(receiver);
|
||||
|
@ -375,4 +418,15 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
|||
public boolean isDraining() {
|
||||
return receiver.draining();
|
||||
}
|
||||
|
||||
private boolean outcomeSupported(final Source source, final Symbol outcome) {
|
||||
if (source != null && source.getOutcomes() != null) {
|
||||
return Arrays.asList(( source).getOutcomes()).contains(outcome);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private Outcome getEffectiveDefaultOutcome(final Source source) {
|
||||
return (source.getOutcomes() == null || source.getOutcomes().length == 0) ? source.getDefaultOutcome() : null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,16 +16,43 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.protocol.amqp.proton;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.nullable;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
|
||||
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
|
||||
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.messaging.Accepted;
|
||||
import org.apache.qpid.proton.amqp.messaging.Modified;
|
||||
import org.apache.qpid.proton.amqp.messaging.Outcome;
|
||||
import org.apache.qpid.proton.amqp.messaging.Rejected;
|
||||
import org.apache.qpid.proton.amqp.messaging.Source;
|
||||
import org.apache.qpid.proton.amqp.transport.DeliveryState;
|
||||
import org.apache.qpid.proton.codec.ReadableBuffer;
|
||||
import org.apache.qpid.proton.engine.Delivery;
|
||||
import org.apache.qpid.proton.engine.Receiver;
|
||||
import org.junit.Test;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
public class ProtonServerReceiverContextTest {
|
||||
|
||||
|
@ -39,6 +66,44 @@ public class ProtonServerReceiverContextTest {
|
|||
doOnMessageWithAbortedDeliveryTestImpl(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void addressFull_SourceSupportsModified() throws Exception {
|
||||
doOnMessageWithDeliveryException(asList(Rejected.DESCRIPTOR_SYMBOL,
|
||||
Accepted.DESCRIPTOR_SYMBOL,
|
||||
Modified.DESCRIPTOR_SYMBOL),
|
||||
null, new ActiveMQAddressFullException(),
|
||||
Modified.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void addressFull_SourceDoesNotSupportModified() throws Exception {
|
||||
doOnMessageWithDeliveryException(asList(Rejected.DESCRIPTOR_SYMBOL,
|
||||
Accepted.DESCRIPTOR_SYMBOL),
|
||||
null, new ActiveMQAddressFullException(),
|
||||
Rejected.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void otherFailure_SourceSupportsRejects() throws Exception {
|
||||
doOnMessageWithDeliveryException(asList(Rejected.DESCRIPTOR_SYMBOL,
|
||||
Accepted.DESCRIPTOR_SYMBOL,
|
||||
Modified.DESCRIPTOR_SYMBOL),
|
||||
null, new ActiveMQException(),
|
||||
Rejected.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void otherFailure_SourceDoesNotSupportReject() throws Exception {
|
||||
doOnMessageWithDeliveryException(singletonList(Accepted.DESCRIPTOR_SYMBOL),
|
||||
Accepted.getInstance(), new ActiveMQException(),
|
||||
Accepted.class);
|
||||
// violates AMQP specification - see explanation ProtonServerReceiverContext.determineDeliveryState
|
||||
doOnMessageWithDeliveryException(singletonList(Accepted.DESCRIPTOR_SYMBOL),
|
||||
null,
|
||||
new ActiveMQException(),
|
||||
Rejected.class);
|
||||
}
|
||||
|
||||
private void doOnMessageWithAbortedDeliveryTestImpl(boolean drain) throws ActiveMQAMQPException {
|
||||
Receiver mockReceiver = mock(Receiver.class);
|
||||
AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class);
|
||||
|
@ -46,6 +111,8 @@ public class ProtonServerReceiverContextTest {
|
|||
when(mockConnContext.getAmqpCredits()).thenReturn(100);
|
||||
when(mockConnContext.getAmqpLowCredits()).thenReturn(30);
|
||||
|
||||
when(mockConnContext.getProtocolManager()).thenReturn(mock(ProtonProtocolManager.class));
|
||||
|
||||
ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext, null, mockReceiver);
|
||||
|
||||
Delivery mockDelivery = mock(Delivery.class);
|
||||
|
@ -72,4 +139,48 @@ public class ProtonServerReceiverContextTest {
|
|||
verifyNoMoreInteractions(mockReceiver);
|
||||
}
|
||||
|
||||
private void doOnMessageWithDeliveryException(List<Symbol> sourceSymbols,
|
||||
Outcome defaultOutcome, Exception deliveryException,
|
||||
Class<? extends DeliveryState> expectedDeliveryState) throws Exception {
|
||||
AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class);
|
||||
doAnswer((Answer<Void>) invocation -> {
|
||||
Runnable runnable = invocation.getArgument(0);
|
||||
runnable.run();
|
||||
return null;
|
||||
}).when(mockConnContext).runLater(any(Runnable.class));
|
||||
ProtonProtocolManager mockProtocolManager = mock(ProtonProtocolManager.class);
|
||||
when(mockProtocolManager.isUseModifiedForTransientDeliveryErrors()).thenReturn(true);
|
||||
when(mockConnContext.getProtocolManager()).thenReturn(mockProtocolManager);
|
||||
|
||||
|
||||
AMQPSessionCallback mockSession = mock(AMQPSessionCallback.class);
|
||||
|
||||
Receiver mockReceiver = mock(Receiver.class);
|
||||
ProtonServerReceiverContext rc = new ProtonServerReceiverContext(mockSession, mockConnContext, null, mockReceiver);
|
||||
|
||||
Delivery mockDelivery = mock(Delivery.class);
|
||||
when(mockDelivery.getLink()).thenReturn(mockReceiver);
|
||||
|
||||
when(mockReceiver.current()).thenReturn(mockDelivery);
|
||||
Source source = new Source();
|
||||
source.setOutcomes(sourceSymbols.toArray(new Symbol[]{}));
|
||||
source.setDefaultOutcome(defaultOutcome);
|
||||
when(mockReceiver.getSource()).thenReturn(source);
|
||||
|
||||
doThrow(deliveryException).when(mockSession)
|
||||
.serverSend(eq(rc),
|
||||
nullable(Transaction.class),
|
||||
eq(mockReceiver),
|
||||
eq(mockDelivery),
|
||||
nullable(SimpleString.class),
|
||||
anyInt(),
|
||||
nullable(ReadableBuffer.class),
|
||||
any(RoutingContext.class));
|
||||
|
||||
rc.onMessage(mockDelivery);
|
||||
|
||||
verify(mockDelivery, times(1)).settle();
|
||||
verify(mockDelivery, times(1)).disposition(any(expectedDeliveryState));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -59,6 +59,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
|
|||
private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
|
||||
|
||||
public static final long DEFAULT_SEND_TIMEOUT = 15000;
|
||||
public static final Symbol[] DEFAULT_OUTCOMES = {Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL};
|
||||
|
||||
private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true);
|
||||
private final AtomicBoolean closed = new AtomicBoolean();
|
||||
|
@ -70,6 +71,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
|
|||
private final Target userSpecifiedTarget;
|
||||
private final SenderSettleMode userSpecifiedSenderSettlementMode;
|
||||
private final ReceiverSettleMode userSpecifiedReceiverSettlementMode;
|
||||
private final Symbol[] outcomes;
|
||||
|
||||
private boolean presettle;
|
||||
private long sendTimeout = DEFAULT_SEND_TIMEOUT;
|
||||
|
@ -92,7 +94,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
|
|||
* The unique ID assigned to this sender.
|
||||
*/
|
||||
public AmqpSender(AmqpSession session, String address, String senderId) {
|
||||
this(session, address, senderId, null, null);
|
||||
this(session, address, senderId, null, null, DEFAULT_OUTCOMES);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -108,8 +110,15 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
|
|||
* The {@link SenderSettleMode} to use on open.
|
||||
* @param receiverMode
|
||||
* The {@link ReceiverSettleMode} to use on open.
|
||||
* @param outcomes
|
||||
* The outcomes to use on open
|
||||
*/
|
||||
public AmqpSender(AmqpSession session, String address, String senderId, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) {
|
||||
public AmqpSender(AmqpSession session,
|
||||
String address,
|
||||
String senderId,
|
||||
SenderSettleMode senderMode,
|
||||
ReceiverSettleMode receiverMode,
|
||||
Symbol[] outcomes) {
|
||||
|
||||
if (address != null && address.isEmpty()) {
|
||||
throw new IllegalArgumentException("Address cannot be empty.");
|
||||
|
@ -121,6 +130,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
|
|||
this.userSpecifiedTarget = null;
|
||||
this.userSpecifiedSenderSettlementMode = senderMode;
|
||||
this.userSpecifiedReceiverSettlementMode = receiverMode;
|
||||
this.outcomes = outcomes;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -145,6 +155,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
|
|||
this.userSpecifiedTarget = target;
|
||||
this.userSpecifiedSenderSettlementMode = null;
|
||||
this.userSpecifiedReceiverSettlementMode = null;
|
||||
outcomes = DEFAULT_OUTCOMES;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -311,11 +322,9 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
|
|||
|
||||
@Override
|
||||
protected void doOpen() {
|
||||
|
||||
Symbol[] outcomes = new Symbol[] {Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL};
|
||||
Source source = new Source();
|
||||
source.setAddress(senderId);
|
||||
source.setOutcomes(outcomes);
|
||||
source.setOutcomes(this.outcomes);
|
||||
|
||||
Target target = userSpecifiedTarget;
|
||||
if (target == null) {
|
||||
|
|
|
@ -189,10 +189,34 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
|
|||
*
|
||||
* @throws Exception if an error occurs while creating the sender.
|
||||
*/
|
||||
public AmqpSender createSender(final String address, final SenderSettleMode senderMode, ReceiverSettleMode receiverMode) throws Exception {
|
||||
public AmqpSender createSender(final String address,
|
||||
final SenderSettleMode senderMode,
|
||||
ReceiverSettleMode receiverMode) throws Exception {
|
||||
return createSender(address, senderMode, receiverMode, AmqpSender.DEFAULT_OUTCOMES);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a sender instance using the given address
|
||||
*
|
||||
* @param address
|
||||
* the address to which the sender will produce its messages.
|
||||
* @param senderSettlementMode
|
||||
* controls the settlement mode used by the created Sender
|
||||
* @param receiverSettlementMode
|
||||
* controls the desired settlement mode used by the remote Receiver
|
||||
* @param outcomes
|
||||
* specifies the outcomes supported by the sender
|
||||
*
|
||||
* @return a newly created sender that is ready for use.
|
||||
*
|
||||
* @throws Exception if an error occurs while creating the sender.
|
||||
*/
|
||||
public AmqpSender createSender(final String address,
|
||||
final SenderSettleMode senderMode,
|
||||
ReceiverSettleMode receiverMode, final Symbol[] outcomes) throws Exception {
|
||||
checkClosed();
|
||||
|
||||
final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId(), senderMode, receiverMode);
|
||||
final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId(), senderMode, receiverMode, outcomes);
|
||||
final ClientFuture request = new ClientFuture();
|
||||
|
||||
connection.getScheduler().execute(new Runnable() {
|
||||
|
|
|
@ -27,70 +27,148 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
|||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.runners.Enclosed;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
public class AmqpFlowControlFailTest extends JMSClientTestSupport {
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.messaging.Accepted;
|
||||
import org.apache.qpid.proton.amqp.messaging.Modified;
|
||||
import org.apache.qpid.proton.amqp.messaging.Rejected;
|
||||
|
||||
@Override
|
||||
protected void configureAddressPolicy(ActiveMQServer server) {
|
||||
// For BLOCK tests
|
||||
@RunWith(Enclosed.class)
|
||||
public class AmqpFlowControlFailTest {
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public static class AmqpFlowControlFailDispositionTests extends JMSClientTestSupport {
|
||||
|
||||
@Parameterized.Parameter()
|
||||
public boolean useModified;
|
||||
|
||||
@Parameterized.Parameter(1)
|
||||
public Symbol[] outcomes;
|
||||
|
||||
@Parameterized.Parameter(2)
|
||||
public String expectedMessage;
|
||||
|
||||
|
||||
@Parameterized.Parameters(name = "useModified={0}")
|
||||
public static Collection<Object[]> parameters() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{true, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL}, "failure at remote"},
|
||||
{true, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL}, "[condition = amqp:resource-limit-exceeded]"},
|
||||
{false, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL}, "[condition = amqp:resource-limit-exceeded]"},
|
||||
{false, new Symbol[]{}, "[condition = amqp:resource-limit-exceeded]"}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configureAddressPolicy(ActiveMQServer server) {
|
||||
AmqpFlowControlFailTest.configureAddressPolicy(server);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
|
||||
params.put("amqpUseModifiedForTransientDeliveryErrors", useModified);
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testAddressFullDisposition() throws Exception {
|
||||
AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI());
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
try {
|
||||
AmqpSession session = connection.createSession();
|
||||
AmqpSender sender = session.createSender(getQueueName(), null, null, outcomes);
|
||||
boolean rejected = false;
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
final AmqpMessage message = new AmqpMessage();
|
||||
byte[] payload = new byte[10];
|
||||
message.setBytes(payload);
|
||||
try {
|
||||
sender.send(message);
|
||||
} catch (IOException e) {
|
||||
rejected = true;
|
||||
assertTrue(String.format("Unexpected message expected %s to contain %s", e.getMessage(), expectedMessage),
|
||||
e.getMessage().contains(expectedMessage));
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue("Expected messages to be refused by broker", rejected);
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class AmqpFlowControlFailOrdinaryTests extends JMSClientTestSupport {
|
||||
|
||||
@Override
|
||||
protected void configureAddressPolicy(ActiveMQServer server) {
|
||||
AmqpFlowControlFailTest.configureAddressPolicy(server);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testMesagesNotSent() throws Exception {
|
||||
AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI());
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
int messagesSent = 0;
|
||||
try {
|
||||
AmqpSession session = connection.createSession();
|
||||
AmqpSender sender = session.createSender(getQueueName());
|
||||
boolean rejected = false;
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
final AmqpMessage message = new AmqpMessage();
|
||||
byte[] payload = new byte[10];
|
||||
message.setBytes(payload);
|
||||
try {
|
||||
sender.send(message);
|
||||
messagesSent++;
|
||||
} catch (IOException e) {
|
||||
rejected = true;
|
||||
}
|
||||
}
|
||||
assertTrue(rejected);
|
||||
rejected = false;
|
||||
assertEquals(0, sender.getSender().getCredit());
|
||||
AmqpSession session2 = connection.createSession();
|
||||
AmqpReceiver receiver = session2.createReceiver(getQueueName());
|
||||
receiver.flow(messagesSent);
|
||||
for (int i = 0; i < messagesSent; i++) {
|
||||
AmqpMessage receive = receiver.receive();
|
||||
receive.accept();
|
||||
}
|
||||
receiver.close();
|
||||
session2.close();
|
||||
|
||||
Wait.assertEquals(1000, sender.getSender()::getCredit);
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
final AmqpMessage message = new AmqpMessage();
|
||||
byte[] payload = new byte[100];
|
||||
message.setBytes(payload);
|
||||
try {
|
||||
sender.send(message);
|
||||
} catch (IOException e) {
|
||||
rejected = true;
|
||||
}
|
||||
}
|
||||
assertTrue(rejected);
|
||||
assertEquals(0, sender.getSender().getCredit());
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void configureAddressPolicy(final ActiveMQServer server) {
|
||||
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");
|
||||
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
|
||||
addressSettings.setMaxSizeBytes(1000);
|
||||
// addressSettings.setMaxSizeBytesRejectThreshold(MAX_SIZE_BYTES_REJECT_THRESHOLD);
|
||||
server.getAddressSettingsRepository().addMatch("#", addressSettings);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testMesagesNotSent() throws Exception {
|
||||
AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI());
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
int messagesSent = 0;
|
||||
try {
|
||||
AmqpSession session = connection.createSession();
|
||||
AmqpSender sender = session.createSender(getQueueName());
|
||||
boolean rejected = false;
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
final AmqpMessage message = new AmqpMessage();
|
||||
byte[] payload = new byte[10];
|
||||
message.setBytes(payload);
|
||||
try {
|
||||
sender.send(message);
|
||||
messagesSent++;
|
||||
System.out.println("message = " + message);
|
||||
} catch (IOException e) {
|
||||
rejected = true;
|
||||
}
|
||||
}
|
||||
assertTrue(rejected);
|
||||
rejected = false;
|
||||
assertEquals(0, sender.getSender().getCredit());
|
||||
AmqpSession session2 = connection.createSession();
|
||||
AmqpReceiver receiver = session2.createReceiver(getQueueName());
|
||||
receiver.flow(messagesSent);
|
||||
for (int i = 0; i < messagesSent; i++) {
|
||||
AmqpMessage receive = receiver.receive();
|
||||
receive.accept();
|
||||
}
|
||||
receiver.close();
|
||||
session2.close();
|
||||
|
||||
Wait.assertEquals(1000, sender.getSender()::getCredit);
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
final AmqpMessage message = new AmqpMessage();
|
||||
byte[] payload = new byte[100];
|
||||
message.setBytes(payload);
|
||||
try {
|
||||
sender.send(message);
|
||||
} catch (IOException e) {
|
||||
rejected = true;
|
||||
}
|
||||
}
|
||||
assertTrue(rejected);
|
||||
assertEquals(0, sender.getSender().getCredit());
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue