This commit is contained in:
Clebert Suconic 2017-03-20 18:04:15 -04:00
commit 7374d2f728
7 changed files with 94 additions and 22 deletions

View File

@ -242,7 +242,8 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
}
@Override
public void processDeliveryUpdates(AmqpConnection connection) throws IOException {
public void processDeliveryUpdates(AmqpConnection connection, Delivery delivery) throws IOException {
doDeliveryUpdate(delivery);
}
@Override
@ -305,7 +306,14 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
}
protected void doDeliveryUpdate(Delivery delivery) {
AmqpValidator validator = getStateInspector();
if (validator != null) {
try {
validator.inspectDeliveryUpdate(delivery);
} catch (Throwable error) {
validator.markAsInvalid(error.getMessage());
}
}
}
//----- Private implementation utility methods ---------------------------//

View File

@ -43,6 +43,7 @@ import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Event.Type;
@ -697,7 +698,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
break;
case DELIVERY:
amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
amqpEventSink.processDeliveryUpdates(this);
amqpEventSink.processDeliveryUpdates(this, (Delivery) protonEvent.getContext());
break;
default:
break;

View File

@ -18,6 +18,8 @@ package org.apache.activemq.transport.amqp.client;
import java.io.IOException;
import org.apache.qpid.proton.engine.Delivery;
/**
* Interface used by classes that want to process AMQP events sent from
* the transport layer.
@ -53,9 +55,10 @@ public interface AmqpEventSink {
* for the given endpoint.
*
* @param connection the AmqpConnection instance for easier access to fire events.
* @param delivery the Delivery that was updated.
* @throws IOException if an error occurs while processing the update.
*/
void processDeliveryUpdates(AmqpConnection connection) throws IOException;
void processDeliveryUpdates(AmqpConnection connection, Delivery delivery) throws IOException;
/**
* Called when the Proton Engine signals an Flow related event has been triggered

View File

@ -794,7 +794,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
}
@Override
public void processDeliveryUpdates(AmqpConnection connection) throws IOException {
public void processDeliveryUpdates(AmqpConnection connection, Delivery delivery) throws IOException {
Delivery incoming = null;
do {
incoming = getEndpoint().current();
@ -823,7 +823,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
}
} while (incoming != null);
super.processDeliveryUpdates(connection);
super.processDeliveryUpdates(connection, delivery);
}
private void processDelivery(Delivery incoming) throws Exception {

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.transport.amqp.client;
import javax.jms.InvalidDestinationException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
@ -26,6 +25,8 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.InvalidDestinationException;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableSender;
@ -419,7 +420,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
}
@Override
public void processDeliveryUpdates(AmqpConnection connection) throws IOException {
public void processDeliveryUpdates(AmqpConnection connection, Delivery updated) throws IOException {
List<Delivery> toRemove = new ArrayList<>();
for (Delivery delivery : pending) {
@ -485,13 +486,4 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
public String toString() {
return getClass().getSimpleName() + "{ address = " + address + "}";
}
@Override
protected void doDeliveryUpdate(Delivery delivery) {
try {
getStateInspector().inspectDeliveryUpdate(delivery);
} catch (Throwable error) {
getStateInspector().markAsInvalid(error.getMessage());
}
}
}

View File

@ -16,9 +16,6 @@
*/
package org.apache.activemq.transport.amqp.client;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.TransactionRolledBackException;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.util.HashMap;
@ -27,6 +24,10 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.TransactionRolledBackException;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
@ -67,7 +68,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<Sender> {
}
@Override
public void processDeliveryUpdates(AmqpConnection connection) throws IOException {
public void processDeliveryUpdates(AmqpConnection connection, Delivery delivery) throws IOException {
try {
Iterator<Delivery> deliveries = pendingDeliveries.iterator();
while (deliveries.hasNext()) {
@ -112,7 +113,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<Sender> {
deliveries.remove();
}
super.processDeliveryUpdates(connection);
super.processDeliveryUpdates(connection, delivery);
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}

View File

@ -41,6 +41,7 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery;
@ -920,6 +921,72 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
sendConnection.close();
consumerConnection.close();
}
}
@Test(timeout = 30000)
public void testUnsettledTXMessageGetTransactedDispostion() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
assertNotNull(session);
AmqpSender sender = session.createSender(getTestName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.setStateInspector(new AmqpValidator() {
@Override
public void inspectDeliveryUpdate(Delivery delivery) {
if (delivery.remotelySettled()) {
LOG.info("Receiver got delivery update for: {}", delivery);
if (!(delivery.getRemoteState() instanceof TransactionalState)) {
markAsInvalid("Transactionally acquire work no tagged as being in a transaction.");
} else {
TransactionalState txState = (TransactionalState) delivery.getRemoteState();
if (!(txState.getOutcome() instanceof Accepted)) {
markAsInvalid("Transaction state lacks any outcome");
} else if (txState.getTxnId() == null) {
markAsInvalid("Transaction state lacks any TX Id");
}
}
if (!(delivery.getLocalState() instanceof TransactionalState)) {
markAsInvalid("Transactionally acquire work no tagged as being in a transaction.");
} else {
TransactionalState txState = (TransactionalState) delivery.getLocalState();
if (!(txState.getOutcome() instanceof Accepted)) {
markAsInvalid("Transaction state lacks any outcome");
} else if (txState.getTxnId() == null) {
markAsInvalid("Transaction state lacks any TX Id");
}
}
TransactionalState localTxState = (TransactionalState) delivery.getLocalState();
TransactionalState remoteTxState = (TransactionalState) delivery.getRemoteState();
if (!localTxState.getTxnId().equals(remoteTxState)) {
markAsInvalid("Message not enrolled in expected transaction");
}
}
}
});
session.begin();
assertTrue(session.isInTransaction());
receiver.flow(1);
AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
assertNotNull(received);
received.accept(false);
session.commit();
sender.getStateInspector().assertValid();
connection.close();
}
}