diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java index e17a0c9f9e..d62178a9f6 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java @@ -297,7 +297,6 @@ public abstract class AmqpAbstractResource implements AmqpRe return new IOException("Open failed unexpectedly."); } - // TODO - Fina a more generic way to do this. protected abstract void doOpenInspection(); protected abstract void doClosedInspection(); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index d2b859a4d5..fbe36ff3f3 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -856,6 +856,8 @@ public class AmqpReceiver extends AmqpAbstractResource { } private void processDelivery(Delivery incoming) throws Exception { + doDeliveryInspection(incoming); + Message message = null; try { message = decodeIncomingMessage(incoming); @@ -878,6 +880,14 @@ public class AmqpReceiver extends AmqpAbstractResource { } } + private void doDeliveryInspection(Delivery delivery) { + try { + getStateInspector().inspectDelivery(getReceiver(), delivery); + } catch (Throwable error) { + getStateInspector().markAsInvalid(error.getMessage()); + } + } + @Override public void processFlowUpdates(AmqpConnection connection) throws IOException { if (pullRequest != null || stopRequest != null) { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index dd3a3719a4..3850ec7082 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -322,6 +322,14 @@ public class AmqpSender extends AmqpAbstractResource { } } + protected void doDeliveryUpdateInspection(Delivery delivery) { + try { + getStateInspector().inspectDeliveryUpdate(getSender(), delivery); + } catch (Throwable error) { + getStateInspector().markAsInvalid(error.getMessage()); + } + } + @Override protected Exception getOpenAbortException() { // Verify the attach response contained a non-null target @@ -408,6 +416,8 @@ public class AmqpSender extends AmqpAbstractResource { continue; } + doDeliveryUpdateInspection(delivery); + Outcome outcome = null; if (state instanceof TransactionalState) { LOG.trace("State of delivery is Transactional, retrieving outcome: {}", state); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java index c8a20ac892..a8fd04abd1 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java @@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp.client; import java.util.concurrent.atomic.AtomicReference; import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; @@ -71,6 +72,14 @@ public class AmqpValidator { } + public void inspectDelivery(Receiver receiver, Delivery delivery) { + + } + + public void inspectDeliveryUpdate(Sender sender, Delivery delivery) { + + } + public boolean isValid() { return errorMessage.get() != null; } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java index 66e1efc051..3c3b75d499 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java @@ -20,6 +20,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.jmx.TopicViewMBean; import org.apache.activemq.transport.amqp.client.AmqpClient; @@ -28,7 +31,10 @@ import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.activemq.util.Wait; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Sender; import org.junit.Test; /** @@ -123,10 +129,23 @@ public class AmqpSenderTest extends AmqpClientTestSupport { public void testUnsettledSender() throws Exception { final int MSG_COUNT = 1000; + final CountDownLatch settled = new CountDownLatch(MSG_COUNT); + AmqpClient client = createAmqpClient(); AmqpConnection connection = trackConnection(client.connect()); - AmqpSession session = connection.createSession(); + connection.setStateInspector(new AmqpValidator() { + + @Override + public void inspectDeliveryUpdate(Sender sender, Delivery delivery) { + if (delivery.remotelySettled()) { + LOG.trace("Remote settled message for sender: {}", sender.getName()); + settled.countDown(); + } + } + }); + + AmqpSession session = connection.createSession(); AmqpSender sender = session.createSender("topic://" + getTestName(), false); for (int i = 1; i <= MSG_COUNT; ++i) { @@ -149,6 +168,9 @@ public class AmqpSenderTest extends AmqpClientTestSupport { })); sender.close(); + + assertTrue("Remote should have settled all deliveries", settled.await(5, TimeUnit.MINUTES)); + connection.close(); }