ARTEMIS-60 Validate AMQP sender applied TransactionState

Update the AMQP test client to allow for better inspection of the
delivery updates that happen during normal use.  Use those modification
to check that when the broker's sender accepts and settles a non-settled
disposition it adds a proper TransactionState disposition with the
correct outcome and txn-id in that state.
This commit is contained in:
Timothy Bish 2017-03-20 16:46:18 -04:00
parent 8c310a2ce8
commit a0948928c3
7 changed files with 94 additions and 22 deletions

View File

@ -242,7 +242,8 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
} }
@Override @Override
public void processDeliveryUpdates(AmqpConnection connection) throws IOException { public void processDeliveryUpdates(AmqpConnection connection, Delivery delivery) throws IOException {
doDeliveryUpdate(delivery);
} }
@Override @Override
@ -305,7 +306,14 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
} }
protected void doDeliveryUpdate(Delivery delivery) { protected void doDeliveryUpdate(Delivery delivery) {
AmqpValidator validator = getStateInspector();
if (validator != null) {
try {
validator.inspectDeliveryUpdate(delivery);
} catch (Throwable error) {
validator.markAsInvalid(error.getMessage());
}
}
} }
//----- Private implementation utility methods ---------------------------// //----- Private implementation utility methods ---------------------------//

View File

@ -43,6 +43,7 @@ import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Event.Type; import org.apache.qpid.proton.engine.Event.Type;
@ -697,7 +698,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
break; break;
case DELIVERY: case DELIVERY:
amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext(); amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
amqpEventSink.processDeliveryUpdates(this); amqpEventSink.processDeliveryUpdates(this, (Delivery) protonEvent.getContext());
break; break;
default: default:
break; break;

View File

@ -18,6 +18,8 @@ package org.apache.activemq.transport.amqp.client;
import java.io.IOException; import java.io.IOException;
import org.apache.qpid.proton.engine.Delivery;
/** /**
* Interface used by classes that want to process AMQP events sent from * Interface used by classes that want to process AMQP events sent from
* the transport layer. * the transport layer.
@ -53,9 +55,10 @@ public interface AmqpEventSink {
* for the given endpoint. * for the given endpoint.
* *
* @param connection the AmqpConnection instance for easier access to fire events. * @param connection the AmqpConnection instance for easier access to fire events.
* @param delivery the Delivery that was updated.
* @throws IOException if an error occurs while processing the update. * @throws IOException if an error occurs while processing the update.
*/ */
void processDeliveryUpdates(AmqpConnection connection) throws IOException; void processDeliveryUpdates(AmqpConnection connection, Delivery delivery) throws IOException;
/** /**
* Called when the Proton Engine signals an Flow related event has been triggered * Called when the Proton Engine signals an Flow related event has been triggered

View File

@ -794,7 +794,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
} }
@Override @Override
public void processDeliveryUpdates(AmqpConnection connection) throws IOException { public void processDeliveryUpdates(AmqpConnection connection, Delivery delivery) throws IOException {
Delivery incoming = null; Delivery incoming = null;
do { do {
incoming = getEndpoint().current(); incoming = getEndpoint().current();
@ -823,7 +823,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
} }
} while (incoming != null); } while (incoming != null);
super.processDeliveryUpdates(connection); super.processDeliveryUpdates(connection, delivery);
} }
private void processDelivery(Delivery incoming) throws Exception { private void processDelivery(Delivery incoming) throws Exception {

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.transport.amqp.client; package org.apache.activemq.transport.amqp.client;
import javax.jms.InvalidDestinationException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
@ -26,6 +25,8 @@ import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.InvalidDestinationException;
import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableSender; import org.apache.activemq.transport.amqp.client.util.UnmodifiableSender;
@ -419,7 +420,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
} }
@Override @Override
public void processDeliveryUpdates(AmqpConnection connection) throws IOException { public void processDeliveryUpdates(AmqpConnection connection, Delivery updated) throws IOException {
List<Delivery> toRemove = new ArrayList<>(); List<Delivery> toRemove = new ArrayList<>();
for (Delivery delivery : pending) { for (Delivery delivery : pending) {
@ -485,13 +486,4 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
public String toString() { public String toString() {
return getClass().getSimpleName() + "{ address = " + address + "}"; return getClass().getSimpleName() + "{ address = " + address + "}";
} }
@Override
protected void doDeliveryUpdate(Delivery delivery) {
try {
getStateInspector().inspectDeliveryUpdate(delivery);
} catch (Throwable error) {
getStateInspector().markAsInvalid(error.getMessage());
}
}
} }

View File

@ -16,9 +16,6 @@
*/ */
package org.apache.activemq.transport.amqp.client; package org.apache.activemq.transport.amqp.client;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.TransactionRolledBackException;
import java.io.IOException; import java.io.IOException;
import java.nio.BufferOverflowException; import java.nio.BufferOverflowException;
import java.util.HashMap; import java.util.HashMap;
@ -27,6 +24,10 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.TransactionRolledBackException;
import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.AmqpValue;
@ -67,7 +68,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<Sender> {
} }
@Override @Override
public void processDeliveryUpdates(AmqpConnection connection) throws IOException { public void processDeliveryUpdates(AmqpConnection connection, Delivery delivery) throws IOException {
try { try {
Iterator<Delivery> deliveries = pendingDeliveries.iterator(); Iterator<Delivery> deliveries = pendingDeliveries.iterator();
while (deliveries.hasNext()) { while (deliveries.hasNext()) {
@ -112,7 +113,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<Sender> {
deliveries.remove(); deliveries.remove();
} }
super.processDeliveryUpdates(connection); super.processDeliveryUpdates(connection, delivery);
} catch (Exception e) { } catch (Exception e) {
throw IOExceptionSupport.create(e); throw IOExceptionSupport.create(e);
} }

View File

@ -41,6 +41,7 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Delivery;
@ -920,6 +921,72 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
sendConnection.close(); sendConnection.close();
consumerConnection.close(); consumerConnection.close();
} }
}
@Test(timeout = 30000)
public void testUnsettledTXMessageGetTransactedDispostion() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
assertNotNull(session);
AmqpSender sender = session.createSender(getTestName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.setStateInspector(new AmqpValidator() {
@Override
public void inspectDeliveryUpdate(Delivery delivery) {
if (delivery.remotelySettled()) {
LOG.info("Receiver got delivery update for: {}", delivery);
if (!(delivery.getRemoteState() instanceof TransactionalState)) {
markAsInvalid("Transactionally acquire work no tagged as being in a transaction.");
} else {
TransactionalState txState = (TransactionalState) delivery.getRemoteState();
if (!(txState.getOutcome() instanceof Accepted)) {
markAsInvalid("Transaction state lacks any outcome");
} else if (txState.getTxnId() == null) {
markAsInvalid("Transaction state lacks any TX Id");
}
}
if (!(delivery.getLocalState() instanceof TransactionalState)) {
markAsInvalid("Transactionally acquire work no tagged as being in a transaction.");
} else {
TransactionalState txState = (TransactionalState) delivery.getLocalState();
if (!(txState.getOutcome() instanceof Accepted)) {
markAsInvalid("Transaction state lacks any outcome");
} else if (txState.getTxnId() == null) {
markAsInvalid("Transaction state lacks any TX Id");
}
}
TransactionalState localTxState = (TransactionalState) delivery.getLocalState();
TransactionalState remoteTxState = (TransactionalState) delivery.getRemoteState();
if (!localTxState.getTxnId().equals(remoteTxState)) {
markAsInvalid("Message not enrolled in expected transaction");
}
}
}
});
session.begin();
assertTrue(session.isInTransaction());
receiver.flow(1);
AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
assertNotNull(received);
received.accept(false);
session.commit();
sender.getStateInspector().assertValid();
connection.close();
} }
} }