diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 4b2b669a8e..a65361daaa 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -440,6 +440,7 @@ public class AMQPSessionCallback implements SessionCallback { // Anonymous relay must set a To value address = message.getAddressSimpleString(); if (address == null) { + // Errors are not currently handled as required by AMQP 1.0 anonterm-v1.0 rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer"); return; } @@ -457,14 +458,14 @@ public class AMQPSessionCallback implements SessionCallback { PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString()); if (store != null && store.isRejectingMessages()) { // We drop pre-settled messages (and abort any associated Tx) + String amqpAddress = delivery.getLink().getTarget().getAddress(); + ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress); if (delivery.remotelySettled()) { if (transaction != null) { - String amqpAddress = delivery.getLink().getTarget().getAddress(); - ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress); transaction.markAsRollbackOnly(e); } } else { - rejectMessage(delivery, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address); + throw e; } } else { serverSend(context, transaction, message, delivery, receiver, routingContext); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index 5b9aa38b49..bab27d1964 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -69,6 +69,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager { - - condition.setDescription(e.getMessage()); - rejected.setError(condition); - - delivery.disposition(rejected); + delivery.disposition(deliveryState); delivery.settle(); flow(); connection.flush(); @@ -326,6 +325,50 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } } + private DeliveryState determineDeliveryState(final Source source, final boolean useModified, final Exception e) { + Outcome defaultOutcome = getEffectiveDefaultOutcome(source); + + if (isAddressFull(e) && useModified && + (outcomeSupported(source, Modified.DESCRIPTOR_SYMBOL) || defaultOutcome instanceof Modified)) { + Modified modified = new Modified(); + modified.setDeliveryFailed(true); + return modified; + } else { + if (outcomeSupported(source, Rejected.DESCRIPTOR_SYMBOL) || defaultOutcome instanceof Rejected) { + return createRejected(e); + } else if (source.getDefaultOutcome() instanceof DeliveryState) { + return ((DeliveryState) source.getDefaultOutcome()); + } else { + // The AMQP specification requires that Accepted is returned for this case. However there exist + // implementations that set neither outcomes/default-outcome but use/expect for full range of outcomes. + // To maintain compatibility with these implementations, we maintain previous behaviour. + return createRejected(e); + } + } + } + + private boolean isAddressFull(final Exception e) { + return e instanceof ActiveMQException && ActiveMQExceptionType.ADDRESS_FULL.equals(((ActiveMQException) e).getType()); + } + + private Rejected createRejected(final Exception e) { + ErrorCondition condition = new ErrorCondition(); + + // Set condition + if (e instanceof ActiveMQSecurityException) { + condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS); + } else if (isAddressFull(e)) { + condition.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED); + } else { + condition.setCondition(Symbol.valueOf("failed")); + } + condition.setDescription(e.getMessage()); + + Rejected rejected = new Rejected(); + rejected.setError(condition); + return rejected; + } + @Override public void close(boolean remoteLinkClose) throws ActiveMQAMQPException { protonSession.removeReceiver(receiver); @@ -375,4 +418,15 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements public boolean isDraining() { return receiver.draining(); } + + private boolean outcomeSupported(final Source source, final Symbol outcome) { + if (source != null && source.getOutcomes() != null) { + return Arrays.asList(( source).getOutcomes()).contains(outcome); + } + return false; + } + + private Outcome getEffectiveDefaultOutcome(final Source source) { + return (source.getOutcomes() == null || source.getOutcomes().length == 0) ? source.getDefaultOutcome() : null; + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java index a157ef1435..571ca92bc0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java @@ -16,16 +16,43 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import java.util.List; + +import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; +import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; + +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.Modified; +import org.apache.qpid.proton.amqp.messaging.Outcome; +import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.transport.DeliveryState; +import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; import org.junit.Test; +import org.mockito.stubbing.Answer; public class ProtonServerReceiverContextTest { @@ -39,6 +66,44 @@ public class ProtonServerReceiverContextTest { doOnMessageWithAbortedDeliveryTestImpl(true); } + @Test + public void addressFull_SourceSupportsModified() throws Exception { + doOnMessageWithDeliveryException(asList(Rejected.DESCRIPTOR_SYMBOL, + Accepted.DESCRIPTOR_SYMBOL, + Modified.DESCRIPTOR_SYMBOL), + null, new ActiveMQAddressFullException(), + Modified.class); + } + + @Test + public void addressFull_SourceDoesNotSupportModified() throws Exception { + doOnMessageWithDeliveryException(asList(Rejected.DESCRIPTOR_SYMBOL, + Accepted.DESCRIPTOR_SYMBOL), + null, new ActiveMQAddressFullException(), + Rejected.class); + } + + @Test + public void otherFailure_SourceSupportsRejects() throws Exception { + doOnMessageWithDeliveryException(asList(Rejected.DESCRIPTOR_SYMBOL, + Accepted.DESCRIPTOR_SYMBOL, + Modified.DESCRIPTOR_SYMBOL), + null, new ActiveMQException(), + Rejected.class); + } + + @Test + public void otherFailure_SourceDoesNotSupportReject() throws Exception { + doOnMessageWithDeliveryException(singletonList(Accepted.DESCRIPTOR_SYMBOL), + Accepted.getInstance(), new ActiveMQException(), + Accepted.class); + // violates AMQP specification - see explanation ProtonServerReceiverContext.determineDeliveryState + doOnMessageWithDeliveryException(singletonList(Accepted.DESCRIPTOR_SYMBOL), + null, + new ActiveMQException(), + Rejected.class); + } + private void doOnMessageWithAbortedDeliveryTestImpl(boolean drain) throws ActiveMQAMQPException { Receiver mockReceiver = mock(Receiver.class); AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class); @@ -46,6 +111,8 @@ public class ProtonServerReceiverContextTest { when(mockConnContext.getAmqpCredits()).thenReturn(100); when(mockConnContext.getAmqpLowCredits()).thenReturn(30); + when(mockConnContext.getProtocolManager()).thenReturn(mock(ProtonProtocolManager.class)); + ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext, null, mockReceiver); Delivery mockDelivery = mock(Delivery.class); @@ -72,4 +139,48 @@ public class ProtonServerReceiverContextTest { verifyNoMoreInteractions(mockReceiver); } + private void doOnMessageWithDeliveryException(List sourceSymbols, + Outcome defaultOutcome, Exception deliveryException, + Class expectedDeliveryState) throws Exception { + AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class); + doAnswer((Answer) invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(mockConnContext).runLater(any(Runnable.class)); + ProtonProtocolManager mockProtocolManager = mock(ProtonProtocolManager.class); + when(mockProtocolManager.isUseModifiedForTransientDeliveryErrors()).thenReturn(true); + when(mockConnContext.getProtocolManager()).thenReturn(mockProtocolManager); + + + AMQPSessionCallback mockSession = mock(AMQPSessionCallback.class); + + Receiver mockReceiver = mock(Receiver.class); + ProtonServerReceiverContext rc = new ProtonServerReceiverContext(mockSession, mockConnContext, null, mockReceiver); + + Delivery mockDelivery = mock(Delivery.class); + when(mockDelivery.getLink()).thenReturn(mockReceiver); + + when(mockReceiver.current()).thenReturn(mockDelivery); + Source source = new Source(); + source.setOutcomes(sourceSymbols.toArray(new Symbol[]{})); + source.setDefaultOutcome(defaultOutcome); + when(mockReceiver.getSource()).thenReturn(source); + + doThrow(deliveryException).when(mockSession) + .serverSend(eq(rc), + nullable(Transaction.class), + eq(mockReceiver), + eq(mockDelivery), + nullable(SimpleString.class), + anyInt(), + nullable(ReadableBuffer.class), + any(RoutingContext.class)); + + rc.onMessage(mockDelivery); + + verify(mockDelivery, times(1)).settle(); + verify(mockDelivery, times(1)).disposition(any(expectedDeliveryState)); + } + } diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index 703d4897c7..eed7cf2766 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -59,6 +59,7 @@ public class AmqpSender extends AmqpAbstractResource { private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {}; public static final long DEFAULT_SEND_TIMEOUT = 15000; + public static final Symbol[] DEFAULT_OUTCOMES = {Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL}; private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true); private final AtomicBoolean closed = new AtomicBoolean(); @@ -70,6 +71,7 @@ public class AmqpSender extends AmqpAbstractResource { private final Target userSpecifiedTarget; private final SenderSettleMode userSpecifiedSenderSettlementMode; private final ReceiverSettleMode userSpecifiedReceiverSettlementMode; + private final Symbol[] outcomes; private boolean presettle; private long sendTimeout = DEFAULT_SEND_TIMEOUT; @@ -92,7 +94,7 @@ public class AmqpSender extends AmqpAbstractResource { * The unique ID assigned to this sender. */ public AmqpSender(AmqpSession session, String address, String senderId) { - this(session, address, senderId, null, null); + this(session, address, senderId, null, null, DEFAULT_OUTCOMES); } /** @@ -108,8 +110,15 @@ public class AmqpSender extends AmqpAbstractResource { * The {@link SenderSettleMode} to use on open. * @param receiverMode * The {@link ReceiverSettleMode} to use on open. + * @param outcomes + * The outcomes to use on open */ - public AmqpSender(AmqpSession session, String address, String senderId, SenderSettleMode senderMode, ReceiverSettleMode receiverMode) { + public AmqpSender(AmqpSession session, + String address, + String senderId, + SenderSettleMode senderMode, + ReceiverSettleMode receiverMode, + Symbol[] outcomes) { if (address != null && address.isEmpty()) { throw new IllegalArgumentException("Address cannot be empty."); @@ -121,6 +130,7 @@ public class AmqpSender extends AmqpAbstractResource { this.userSpecifiedTarget = null; this.userSpecifiedSenderSettlementMode = senderMode; this.userSpecifiedReceiverSettlementMode = receiverMode; + this.outcomes = outcomes; } /** @@ -145,6 +155,7 @@ public class AmqpSender extends AmqpAbstractResource { this.userSpecifiedTarget = target; this.userSpecifiedSenderSettlementMode = null; this.userSpecifiedReceiverSettlementMode = null; + outcomes = DEFAULT_OUTCOMES; } /** @@ -311,11 +322,9 @@ public class AmqpSender extends AmqpAbstractResource { @Override protected void doOpen() { - - Symbol[] outcomes = new Symbol[] {Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL}; Source source = new Source(); source.setAddress(senderId); - source.setOutcomes(outcomes); + source.setOutcomes(this.outcomes); Target target = userSpecifiedTarget; if (target == null) { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 53d45e33b2..ff23e67560 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -189,10 +189,34 @@ public class AmqpSession extends AmqpAbstractResource { * * @throws Exception if an error occurs while creating the sender. */ - public AmqpSender createSender(final String address, final SenderSettleMode senderMode, ReceiverSettleMode receiverMode) throws Exception { + public AmqpSender createSender(final String address, + final SenderSettleMode senderMode, + ReceiverSettleMode receiverMode) throws Exception { + return createSender(address, senderMode, receiverMode, AmqpSender.DEFAULT_OUTCOMES); + } + + /** + * Create a sender instance using the given address + * + * @param address + * the address to which the sender will produce its messages. + * @param senderSettlementMode + * controls the settlement mode used by the created Sender + * @param receiverSettlementMode + * controls the desired settlement mode used by the remote Receiver + * @param outcomes + * specifies the outcomes supported by the sender + * + * @return a newly created sender that is ready for use. + * + * @throws Exception if an error occurs while creating the sender. + */ + public AmqpSender createSender(final String address, + final SenderSettleMode senderMode, + ReceiverSettleMode receiverMode, final Symbol[] outcomes) throws Exception { checkClosed(); - final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId(), senderMode, receiverMode); + final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId(), senderMode, receiverMode, outcomes); final ClientFuture request = new ClientFuture(); connection.getScheduler().execute(new Runnable() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java index c6119a188b..42fb5f3081 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java @@ -27,70 +27,148 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; -public class AmqpFlowControlFailTest extends JMSClientTestSupport { +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.Modified; +import org.apache.qpid.proton.amqp.messaging.Rejected; - @Override - protected void configureAddressPolicy(ActiveMQServer server) { - // For BLOCK tests +@RunWith(Enclosed.class) +public class AmqpFlowControlFailTest { + + @RunWith(Parameterized.class) + public static class AmqpFlowControlFailDispositionTests extends JMSClientTestSupport { + + @Parameterized.Parameter() + public boolean useModified; + + @Parameterized.Parameter(1) + public Symbol[] outcomes; + + @Parameterized.Parameter(2) + public String expectedMessage; + + + @Parameterized.Parameters(name = "useModified={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {true, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL}, "failure at remote"}, + {true, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL}, "[condition = amqp:resource-limit-exceeded]"}, + {false, new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL}, "[condition = amqp:resource-limit-exceeded]"}, + {false, new Symbol[]{}, "[condition = amqp:resource-limit-exceeded]"} + }); + } + + @Override + protected void configureAddressPolicy(ActiveMQServer server) { + AmqpFlowControlFailTest.configureAddressPolicy(server); + } + + @Override + protected void configureAMQPAcceptorParameters(Map params) { + params.put("amqpUseModifiedForTransientDeliveryErrors", useModified); + } + + + @Test(timeout = 60000) + public void testAddressFullDisposition() throws Exception { + AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI()); + AmqpConnection connection = addConnection(client.connect()); + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(getQueueName(), null, null, outcomes); + boolean rejected = false; + for (int i = 0; i < 1000; i++) { + final AmqpMessage message = new AmqpMessage(); + byte[] payload = new byte[10]; + message.setBytes(payload); + try { + sender.send(message); + } catch (IOException e) { + rejected = true; + assertTrue(String.format("Unexpected message expected %s to contain %s", e.getMessage(), expectedMessage), + e.getMessage().contains(expectedMessage)); + } + } + + assertTrue("Expected messages to be refused by broker", rejected); + } finally { + connection.close(); + } + } + } + + public static class AmqpFlowControlFailOrdinaryTests extends JMSClientTestSupport { + + @Override + protected void configureAddressPolicy(ActiveMQServer server) { + AmqpFlowControlFailTest.configureAddressPolicy(server); + } + + @Test(timeout = 60000) + public void testMesagesNotSent() throws Exception { + AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI()); + AmqpConnection connection = addConnection(client.connect()); + int messagesSent = 0; + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(getQueueName()); + boolean rejected = false; + for (int i = 0; i < 1000; i++) { + final AmqpMessage message = new AmqpMessage(); + byte[] payload = new byte[10]; + message.setBytes(payload); + try { + sender.send(message); + messagesSent++; + } catch (IOException e) { + rejected = true; + } + } + assertTrue(rejected); + rejected = false; + assertEquals(0, sender.getSender().getCredit()); + AmqpSession session2 = connection.createSession(); + AmqpReceiver receiver = session2.createReceiver(getQueueName()); + receiver.flow(messagesSent); + for (int i = 0; i < messagesSent; i++) { + AmqpMessage receive = receiver.receive(); + receive.accept(); + } + receiver.close(); + session2.close(); + + Wait.assertEquals(1000, sender.getSender()::getCredit); + for (int i = 0; i < 1000; i++) { + final AmqpMessage message = new AmqpMessage(); + byte[] payload = new byte[100]; + message.setBytes(payload); + try { + sender.send(message); + } catch (IOException e) { + rejected = true; + } + } + assertTrue(rejected); + assertEquals(0, sender.getSender().getCredit()); + } finally { + connection.close(); + } + } + } + + private static void configureAddressPolicy(final ActiveMQServer server) { AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#"); addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); addressSettings.setMaxSizeBytes(1000); - // addressSettings.setMaxSizeBytesRejectThreshold(MAX_SIZE_BYTES_REJECT_THRESHOLD); server.getAddressSettingsRepository().addMatch("#", addressSettings); } - - @Test(timeout = 60000) - public void testMesagesNotSent() throws Exception { - AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI()); - AmqpConnection connection = addConnection(client.connect()); - int messagesSent = 0; - try { - AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getQueueName()); - boolean rejected = false; - for (int i = 0; i < 1000; i++) { - final AmqpMessage message = new AmqpMessage(); - byte[] payload = new byte[10]; - message.setBytes(payload); - try { - sender.send(message); - messagesSent++; - System.out.println("message = " + message); - } catch (IOException e) { - rejected = true; - } - } - assertTrue(rejected); - rejected = false; - assertEquals(0, sender.getSender().getCredit()); - AmqpSession session2 = connection.createSession(); - AmqpReceiver receiver = session2.createReceiver(getQueueName()); - receiver.flow(messagesSent); - for (int i = 0; i < messagesSent; i++) { - AmqpMessage receive = receiver.receive(); - receive.accept(); - } - receiver.close(); - session2.close(); - - Wait.assertEquals(1000, sender.getSender()::getCredit); - for (int i = 0; i < 1000; i++) { - final AmqpMessage message = new AmqpMessage(); - byte[] payload = new byte[100]; - message.setBytes(payload); - try { - sender.send(message); - } catch (IOException e) { - rejected = true; - } - } - assertTrue(rejected); - assertEquals(0, sender.getSender().getCredit()); - } finally { - connection.close(); - } - } }