Allow for inspection of Deliveries on Receivers and Delivery updates on
Senders.
This commit is contained in:
Timothy Bish 2016-10-26 15:17:24 -04:00
parent b34336cc0a
commit cec3245a9f
5 changed files with 54 additions and 4 deletions

View File

@ -297,7 +297,6 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
return new IOException("Open failed unexpectedly."); return new IOException("Open failed unexpectedly.");
} }
// TODO - Fina a more generic way to do this.
protected abstract void doOpenInspection(); protected abstract void doOpenInspection();
protected abstract void doClosedInspection(); protected abstract void doClosedInspection();

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
@ -856,6 +856,8 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
} }
private void processDelivery(Delivery incoming) throws Exception { private void processDelivery(Delivery incoming) throws Exception {
doDeliveryInspection(incoming);
Message message = null; Message message = null;
try { try {
message = decodeIncomingMessage(incoming); message = decodeIncomingMessage(incoming);
@ -878,6 +880,14 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
} }
} }
private void doDeliveryInspection(Delivery delivery) {
try {
getStateInspector().inspectDelivery(getReceiver(), delivery);
} catch (Throwable error) {
getStateInspector().markAsInvalid(error.getMessage());
}
}
@Override @Override
public void processFlowUpdates(AmqpConnection connection) throws IOException { public void processFlowUpdates(AmqpConnection connection) throws IOException {
if (pullRequest != null || stopRequest != null) { if (pullRequest != null || stopRequest != null) {

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
@ -322,6 +322,14 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
} }
} }
protected void doDeliveryUpdateInspection(Delivery delivery) {
try {
getStateInspector().inspectDeliveryUpdate(getSender(), delivery);
} catch (Throwable error) {
getStateInspector().markAsInvalid(error.getMessage());
}
}
@Override @Override
protected Exception getOpenAbortException() { protected Exception getOpenAbortException() {
// Verify the attach response contained a non-null target // Verify the attach response contained a non-null target
@ -408,6 +416,8 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
continue; continue;
} }
doDeliveryUpdateInspection(delivery);
Outcome outcome = null; Outcome outcome = null;
if (state instanceof TransactionalState) { if (state instanceof TransactionalState) {
LOG.trace("State of delivery is Transactional, retrieving outcome: {}", state); LOG.trace("State of delivery is Transactional, retrieving outcome: {}", state);

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp.client;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.engine.Session;
@ -71,6 +72,14 @@ public class AmqpValidator {
} }
public void inspectDelivery(Receiver receiver, Delivery delivery) {
}
public void inspectDeliveryUpdate(Sender sender, Delivery delivery) {
}
public boolean isValid() { public boolean isValid() {
return errorMessage.get() != null; return errorMessage.get() != null;
} }

View File

@ -20,6 +20,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.TopicViewMBean; import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClient;
@ -28,7 +31,10 @@ import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.junit.Test; import org.junit.Test;
/** /**
@ -123,10 +129,23 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
public void testUnsettledSender() throws Exception { public void testUnsettledSender() throws Exception {
final int MSG_COUNT = 1000; final int MSG_COUNT = 1000;
final CountDownLatch settled = new CountDownLatch(MSG_COUNT);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect()); AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
connection.setStateInspector(new AmqpValidator() {
@Override
public void inspectDeliveryUpdate(Sender sender, Delivery delivery) {
if (delivery.remotelySettled()) {
LOG.trace("Remote settled message for sender: {}", sender.getName());
settled.countDown();
}
}
});
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("topic://" + getTestName(), false); AmqpSender sender = session.createSender("topic://" + getTestName(), false);
for (int i = 1; i <= MSG_COUNT; ++i) { for (int i = 1; i <= MSG_COUNT; ++i) {
@ -149,6 +168,9 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
})); }));
sender.close(); sender.close();
assertTrue("Remote should have settled all deliveries", settled.await(5, TimeUnit.MINUTES));
connection.close(); connection.close();
} }