Add support for transactions to the test client.
This commit is contained in:
Timothy Bish 2015-11-20 17:48:14 -05:00
parent 4a27b72377
commit 272fb2b973
12 changed files with 1238 additions and 32 deletions

View File

@ -71,6 +71,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicLong sessionIdGenerator = new AtomicLong();
private final AtomicLong txIdGenerator = new AtomicLong();
private final Collector protonCollector = new CollectorImpl();
private final NettyTransport transport;
private final Transport protonTransport = Transport.Factory.create();
@ -429,6 +430,14 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
return getEndpoint();
}
String getConnectionId() {
return this.connectionId;
}
AmqpTransactionId getNextTransactionId() {
return new AmqpTransactionId(connectionId + ":" + txIdGenerator.incrementAndGet());
}
void pumpToProtonTransport() {
try {
boolean done = false;

View File

@ -34,6 +34,7 @@ import javax.jms.InvalidDestinationException;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
@ -44,6 +45,7 @@ import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
@ -301,10 +303,22 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
checkClosed();
try {
if (!delivery.isSettled()) {
if (session.isInTransaction()) {
Binary txnId = session.getTransactionId().getRemoteTxId();
if (txnId != null) {
TransactionalState txState = new TransactionalState();
txState.setOutcome(Accepted.getInstance());
txState.setTxnId(txnId);
delivery.disposition(txState);
delivery.settle();
session.getTransactionContext().registerTxConsumer(AmqpReceiver.this);
}
} else {
delivery.disposition(Accepted.getInstance());
delivery.settle();
session.pumpToProtonTransport();
}
}
session.pumpToProtonTransport();
request.onSuccess();
} catch (Exception e) {
request.onFailure(e);
@ -657,4 +671,18 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
throw new IllegalStateException("Receiver is already closed");
}
}
//----- Internal Transaction state callbacks -----------------------------//
void preCommit() {
}
void preRollback() {
}
void postCommit() {
}
void postRollback() {
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client;
import java.io.IOException;
/**
* {@link IOException} derivative that defines that the remote peer has requested that this
* connection be redirected to some alternative peer.
*/
public class AmqpRedirectedException extends IOException {
private static final long serialVersionUID = 5872211116061710369L;
private final String hostname;
private final String networkHost;
private final int port;
public AmqpRedirectedException(String reason, String hostname, String networkHost, int port) {
super(reason);
this.hostname = hostname;
this.networkHost = networkHost;
this.port = port;
}
/**
* @return the host name of the container being redirected to.
*/
public String getHostname() {
return hostname;
}
/**
* @return the DNS host name or IP address of the peer this connection is being redirected to.
*/
public String getNetworkHost() {
return networkHost;
}
/**
* @return the port number on the peer this connection is being redirected to.
*/
public int getPort() {
return port;
}
}

View File

@ -29,14 +29,18 @@ import javax.jms.InvalidDestinationException;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableSender;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
@ -316,20 +320,25 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
}
private void doSend(AmqpMessage message, AsyncResult request) throws Exception {
LOG.trace("Producer sending message: {}", message);
byte[] tag = tagGenerator.getNextTag();
Delivery delivery = null;
if (presettle) {
delivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
} else {
byte[] tag = tagGenerator.getNextTag();
delivery = getEndpoint().delivery(tag, 0, tag.length);
}
delivery.setContext(request);
if (session.isInTransaction()) {
Binary amqpTxId = session.getTransactionId().getRemoteTxId();
TransactionalState state = new TransactionalState();
state.setTxnId(amqpTxId);
delivery.disposition(state);
}
encodeAndSend(message.getWrappedMessage(), delivery);
if (presettle) {
@ -390,26 +399,38 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
}
AsyncResult request = (AsyncResult) delivery.getContext();
Exception deliveryError = null;
if (outcome instanceof Accepted) {
LOG.trace("Outcome of delivery was accepted: {}", delivery);
tagGenerator.returnTag(delivery.getTag());
if (request != null && !request.isComplete()) {
request.onSuccess();
}
} else if (outcome instanceof Rejected) {
Exception remoteError = getRemoteError();
LOG.trace("Outcome of delivery was rejected: {}", delivery);
tagGenerator.returnTag(delivery.getTag());
if (request != null && !request.isComplete()) {
request.onFailure(remoteError);
} else {
connection.fireClientException(getRemoteError());
}
} else if (outcome != null) {
LOG.warn("Message send updated with unsupported outcome: {}", outcome);
ErrorCondition remoteError = ((Rejected) outcome).getError();
if (remoteError == null) {
remoteError = getEndpoint().getRemoteCondition();
}
deliveryError = AmqpSupport.convertToException(remoteError);
} else if (outcome instanceof Released) {
LOG.trace("Outcome of delivery was released: {}", delivery);
deliveryError = new IOException("Delivery failed: released by receiver");
} else if (outcome instanceof Modified) {
LOG.trace("Outcome of delivery was modified: {}", delivery);
deliveryError = new IOException("Delivery failed: failure at remote");
}
if (deliveryError != null) {
if (request != null && !request.isComplete()) {
request.onFailure(deliveryError);
} else {
connection.fireClientException(deliveryError);
}
}
tagGenerator.returnTag(delivery.getTag());
delivery.settle();
toRemove.add(delivery);
}

View File

@ -36,6 +36,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
private final AmqpConnection connection;
private final String sessionId;
private final AmqpTransactionContext txContext;
/**
* Create a new session instance.
@ -48,6 +49,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
public AmqpSession(AmqpConnection connection, String sessionId) {
this.connection = connection;
this.sessionId = sessionId;
this.txContext = new AmqpTransactionContext(this);
}
/**
@ -363,7 +365,59 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
return new UnmodifiableSession(getEndpoint());
}
//----- Internal getters used from the child AmqpResource classes --------//
public boolean isInTransaction() {
return txContext.isInTransaction();
}
@Override
public String toString() {
return "AmqpSession { " + sessionId + " }";
}
//----- Session Transaction Methods --------------------------------------//
/**
* Starts a new transaction associated with this session.
*
* @throws Exception if an error occurs starting a new Transaction.
*/
public void begin() throws Exception {
if (txContext.isInTransaction()) {
throw new javax.jms.IllegalStateException("Session already has an active transaction");
}
txContext.begin();
}
/**
* Commit the current transaction associated with this session.
*
* @throws Exception if an error occurs committing the Transaction.
*/
public void commit() throws Exception {
if (!txContext.isInTransaction()) {
throw new javax.jms.IllegalStateException(
"Commit called on Session that does not have an active transaction");
}
txContext.commit();
}
/**
* Roll back the current transaction associated with this session.
*
* @throws Exception if an error occurs rolling back the Transaction.
*/
public void rollback() throws Exception {
if (!txContext.isInTransaction()) {
throw new javax.jms.IllegalStateException(
"Rollback called on Session that does not have an active transaction");
}
txContext.rollback();
}
//----- Internal access used to manage resources -------------------------//
ScheduledExecutorService getScheduler() {
return connection.getScheduler();
@ -377,6 +431,14 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
connection.pumpToProtonTransport();
}
AmqpTransactionId getTransactionId() {
return txContext.getTransactionId();
}
AmqpTransactionContext getTransactionContext() {
return txContext;
}
//----- Private implementation details -----------------------------------//
@Override
@ -410,9 +472,4 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
throw new IllegalStateException("Session is already closed");
}
}
@Override
public String toString() {
return "AmqpSession { " + sessionId + " }";
}
}

View File

@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client;
import java.io.IOException;
import java.util.Map;
import javax.jms.InvalidClientIDException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import javax.jms.ResourceAllocationException;
import javax.jms.TransactionRolledBackException;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transaction.TransactionErrors;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ConnectionError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
public class AmqpSupport {
// Symbols used for connection capabilities
public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container");
public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
// Symbols used to announce connection error information
public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field");
public static final Symbol CONTAINER_ID = Symbol.valueOf("container-id");
// Symbols used to announce connection redirect ErrorCondition 'info'
public static final Symbol PORT = Symbol.valueOf("port");
public static final Symbol NETWORK_HOST = Symbol.valueOf("network-host");
public static final Symbol OPEN_HOSTNAME = Symbol.valueOf("hostname");
// Symbols used for connection properties
public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
public static final Symbol PRODUCT = Symbol.valueOf("product");
public static final Symbol VERSION = Symbol.valueOf("version");
public static final Symbol PLATFORM = Symbol.valueOf("platform");
// Symbols used for receivers.
public static final Symbol COPY = Symbol.getSymbol("copy");
public static final Symbol NO_LOCAL_SYMBOL = Symbol.valueOf("no-local");
public static final Symbol SELECTOR_SYMBOL = Symbol.valueOf("jms-selector");
// Delivery states
public static final Rejected REJECTED = new Rejected();
public static final Modified MODIFIED_FAILED = new Modified();
public static final Modified MODIFIED_FAILED_UNDELIVERABLE = new Modified();
// Temporary Destination constants
public static final Symbol DYNAMIC_NODE_LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
public static final String TEMP_QUEUE_CREATOR = "temp-queue-creator:";
public static final String TEMP_TOPIC_CREATOR = "temp-topic-creator:";
//----- Static initializer -----------------------------------------------//
static {
MODIFIED_FAILED.setDeliveryFailed(true);
MODIFIED_FAILED_UNDELIVERABLE.setDeliveryFailed(true);
MODIFIED_FAILED_UNDELIVERABLE.setUndeliverableHere(true);
}
//----- Utility Methods --------------------------------------------------//
/**
* Given an ErrorCondition instance create a new Exception that best matches
* the error type.
*
* @param errorCondition
* The ErrorCondition returned from the remote peer.
*
* @return a new Exception instance that best matches the ErrorCondition value.
*/
public static Exception convertToException(ErrorCondition errorCondition) {
Exception remoteError = null;
if (errorCondition != null && errorCondition.getCondition() != null) {
Symbol error = errorCondition.getCondition();
String message = extractErrorMessage(errorCondition);
if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) {
remoteError = new JMSSecurityException(message);
} else if (error.equals(AmqpError.RESOURCE_LIMIT_EXCEEDED)) {
remoteError = new ResourceAllocationException(message);
} else if (error.equals(AmqpError.NOT_FOUND)) {
remoteError = new InvalidDestinationException(message);
} else if (error.equals(TransactionErrors.TRANSACTION_ROLLBACK)) {
remoteError = new TransactionRolledBackException(message);
} else if (error.equals(ConnectionError.REDIRECT)) {
remoteError = createRedirectException(error, message, errorCondition);
} else if (error.equals(AmqpError.INVALID_FIELD)) {
Map<?, ?> info = errorCondition.getInfo();
if (info != null && CONTAINER_ID.equals(info.get(INVALID_FIELD))) {
remoteError = new InvalidClientIDException(message);
} else {
remoteError = new JMSException(message);
}
} else {
remoteError = new JMSException(message);
}
} else {
remoteError = new JMSException("Unknown error from remote peer");
}
return remoteError;
}
/**
* Attempt to read and return the embedded error message in the given ErrorCondition
* object. If no message can be extracted a generic message is returned.
*
* @param errorCondition
* The ErrorCondition to extract the error message from.
*
* @return an error message extracted from the given ErrorCondition.
*/
public static String extractErrorMessage(ErrorCondition errorCondition) {
String message = "Received error from remote peer without description";
if (errorCondition != null) {
if (errorCondition.getDescription() != null && !errorCondition.getDescription().isEmpty()) {
message = errorCondition.getDescription();
}
Symbol condition = errorCondition.getCondition();
if (condition != null) {
message = message + " [condition = " + condition + "]";
}
}
return message;
}
/**
* When a redirect type exception is received this method is called to create the
* appropriate redirect exception type containing the error details needed.
*
* @param error
* the Symbol that defines the redirection error type.
* @param message
* the basic error message that should used or amended for the returned exception.
* @param condition
* the ErrorCondition that describes the redirection.
*
* @return an Exception that captures the details of the redirection error.
*/
public static Exception createRedirectException(Symbol error, String message, ErrorCondition condition) {
Exception result = null;
Map<?, ?> info = condition.getInfo();
if (info == null) {
result = new IOException(message + " : Redirection information not set.");
} else {
String hostname = (String) info.get(OPEN_HOSTNAME);
String networkHost = (String) info.get(NETWORK_HOST);
if (networkHost == null || networkHost.isEmpty()) {
result = new IOException(message + " : Redirection information not set.");
}
int port = 0;
try {
port = Integer.valueOf(info.get(PORT).toString());
} catch (Exception ex) {
result = new IOException(message + " : Redirection information not set.");
}
result = new AmqpRedirectedException(message, hostname, networkHost, port);
}
return result;
}
}

View File

@ -0,0 +1,258 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client;
import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.Set;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.ClientFutureSynchronization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Defines a context under which resources in a given session
* will operate inside transaction scoped boundaries.
*/
public class AmqpTransactionContext {
private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionContext.class);
private final AmqpSession session;
private final Set<AmqpReceiver> txReceivers = new LinkedHashSet<AmqpReceiver>();
private AmqpTransactionCoordinator coordinator;
private AmqpTransactionId transactionId;
public AmqpTransactionContext(AmqpSession session) {
this.session = session;
}
/**
* Begins a new transaction scoped to the target session.
*
* @param txId
* The transaction Id to use for this new transaction.
*
* @throws Exception if an error occurs while starting the transaction.
*/
public void begin() throws Exception {
if (transactionId != null) {
throw new IOException("Begin called while a TX is still Active.");
}
final AmqpTransactionId txId = session.getConnection().getNextTransactionId();
final ClientFuture request = new ClientFuture(new ClientFutureSynchronization() {
@Override
public void onPendingSuccess() {
transactionId = txId;
}
@Override
public void onPendingFailure(Throwable cause) {
transactionId = null;
}
});
LOG.info("Attempting to Begin TX:[{}]", txId);
session.getScheduler().execute(new Runnable() {
@Override
public void run() {
if (coordinator == null || coordinator.isClosed()) {
LOG.info("Creating new Coordinator for TX:[{}]", txId);
coordinator = new AmqpTransactionCoordinator(session);
coordinator.open(new AsyncResult() {
@Override
public void onSuccess() {
try {
LOG.info("Attempting to declare TX:[{}]", txId);
coordinator.declare(txId, request);
} catch (Exception e) {
request.onFailure(e);
}
}
@Override
public void onFailure(Throwable result) {
request.onFailure(result);
}
@Override
public boolean isComplete() {
return request.isComplete();
}
});
} else {
try {
LOG.info("Attempting to declare TX:[{}]", txId);
coordinator.declare(txId, request);
} catch (Exception e) {
request.onFailure(e);
}
}
session.pumpToProtonTransport();
}
});
request.sync();
}
/**
* Commit this transaction which then ends the lifetime of the transacted operation.
*
* @throws Exception if an error occurs while performing the commit
*/
public void commit() throws Exception {
if (transactionId == null) {
throw new IllegalStateException("Commit called with no active Transaction.");
}
preCommit();
final ClientFuture request = new ClientFuture(new ClientFutureSynchronization() {
@Override
public void onPendingSuccess() {
transactionId = null;
postCommit();
}
@Override
public void onPendingFailure(Throwable cause) {
transactionId = null;
postCommit();
}
});
LOG.debug("Commit on TX[{}] initiated", transactionId);
session.getScheduler().execute(new Runnable() {
@Override
public void run() {
try {
LOG.info("Attempting to commit TX:[{}]", transactionId);
coordinator.discharge(transactionId, request, true);
session.pumpToProtonTransport();
} catch (Exception e) {
request.onFailure(e);
}
}
});
request.sync();
}
/**
* Rollback any transacted work performed under the current transaction.
*
* @throws Exception if an error occurs during the rollback operation.
*/
public void rollback() throws Exception {
if (transactionId == null) {
throw new IllegalStateException("Rollback called with no active Transaction.");
}
preRollback();
final ClientFuture request = new ClientFuture(new ClientFutureSynchronization() {
@Override
public void onPendingSuccess() {
transactionId = null;
postRollback();
}
@Override
public void onPendingFailure(Throwable cause) {
transactionId = null;
postRollback();
}
});
LOG.debug("Rollback on TX[{}] initiated", transactionId);
session.getScheduler().execute(new Runnable() {
@Override
public void run() {
try {
LOG.info("Attempting to roll back TX:[{}]", transactionId);
coordinator.discharge(transactionId, request, false);
session.pumpToProtonTransport();
} catch (Exception e) {
request.onFailure(e);
}
}
});
request.sync();
}
//----- Internal access to context properties ----------------------------//
AmqpTransactionCoordinator getCoordinator() {
return coordinator;
}
AmqpTransactionId getTransactionId() {
return transactionId;
}
boolean isInTransaction() {
return transactionId != null;
}
void registerTxConsumer(AmqpReceiver consumer) {
txReceivers.add(consumer);
}
//----- Transaction pre / post completion --------------------------------//
private void preCommit() {
for (AmqpReceiver receiver : txReceivers) {
receiver.preCommit();
}
}
private void preRollback() {
for (AmqpReceiver receiver : txReceivers) {
receiver.preRollback();
}
}
private void postCommit() {
for (AmqpReceiver receiver : txReceivers) {
receiver.postCommit();
}
txReceivers.clear();
}
private void postRollback() {
for (AmqpReceiver receiver : txReceivers) {
receiver.postRollback();
}
txReceivers.clear();
}
}

View File

@ -0,0 +1,259 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.TransactionRolledBackException;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transaction.Declare;
import org.apache.qpid.proton.amqp.transaction.Declared;
import org.apache.qpid.proton.amqp.transaction.Discharge;
import org.apache.qpid.proton.amqp.transaction.TxnCapability;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Represents the AMQP Transaction coordinator link used by the transaction context
* of a session to control the lifetime of a given transaction.
*/
public class AmqpTransactionCoordinator extends AmqpAbstractResource<Sender> {
private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionCoordinator.class);
private final byte[] OUTBOUND_BUFFER = new byte[64];
private final AmqpSession session;
private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator();
private List<Delivery> pendingDeliveries = new LinkedList<Delivery>();
private Map<AmqpTransactionId, AsyncResult> pendingRequests = new HashMap<AmqpTransactionId, AsyncResult>();
public AmqpTransactionCoordinator(AmqpSession session) {
this.session = session;
}
@Override
public void processDeliveryUpdates(AmqpConnection connection) throws IOException {
try {
Iterator<Delivery> deliveries = pendingDeliveries.iterator();
while (deliveries.hasNext()) {
Delivery pendingDelivery = deliveries.next();
if (!pendingDelivery.remotelySettled()) {
continue;
}
DeliveryState state = pendingDelivery.getRemoteState();
AmqpTransactionId txId = (AmqpTransactionId) pendingDelivery.getContext();
AsyncResult pendingRequest = pendingRequests.get(txId);
if (pendingRequest == null) {
throw new IllegalStateException("Pending tx operation with no pending request");
}
if (state instanceof Declared) {
LOG.debug("New TX started: {}", txId.getTxId());
Declared declared = (Declared) state;
txId.setRemoteTxId(declared.getTxnId());
pendingRequest.onSuccess();
} else if (state instanceof Rejected) {
LOG.debug("Last TX request failed: {}", txId.getTxId());
Rejected rejected = (Rejected) state;
Exception cause = AmqpSupport.convertToException(rejected.getError());
JMSException failureCause = null;
if (txId.isCommit()) {
failureCause = new TransactionRolledBackException(cause.getMessage());
} else {
failureCause = new JMSException(cause.getMessage());
}
pendingRequest.onFailure(failureCause);
} else {
LOG.debug("Last TX request succeeded: {}", txId.getTxId());
pendingRequest.onSuccess();
}
// Clear state data
pendingDelivery.settle();
pendingRequests.remove(txId.getTxId());
deliveries.remove();
}
super.processDeliveryUpdates(connection);
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
}
public void declare(AmqpTransactionId txId, AsyncResult request) throws Exception {
if (txId.getRemoteTxId() != null) {
throw new IllegalStateException("Declar called while a TX is still Active.");
}
if (isClosed()) {
request.onFailure(new JMSException("Cannot start new transaction: Coordinator remotely closed"));
return;
}
Message message = Message.Factory.create();
Declare declare = new Declare();
message.setBody(new AmqpValue(declare));
Delivery pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
pendingDelivery.setContext(txId);
// Store away for completion
pendingDeliveries.add(pendingDelivery);
pendingRequests.put(txId, request);
sendTxCommand(message);
}
public void discharge(AmqpTransactionId txId, AsyncResult request, boolean commit) throws Exception {
if (isClosed()) {
Exception failureCause = null;
if (commit) {
failureCause = new TransactionRolledBackException("Transaction inbout: Coordinator remotely closed");
} else {
failureCause = new JMSException("Rollback cannot complete: Coordinator remotely closed");
}
request.onFailure(failureCause);
return;
}
// Store the context of this action in the transaction ID for later completion.
txId.setState(commit ? AmqpTransactionId.COMMIT_MARKER : AmqpTransactionId.ROLLBACK_MARKER);
Message message = Message.Factory.create();
Discharge discharge = new Discharge();
discharge.setFail(!commit);
discharge.setTxnId((Binary) txId.getRemoteTxId());
message.setBody(new AmqpValue(discharge));
Delivery pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag());
pendingDelivery.setContext(txId);
// Store away for completion
pendingDeliveries.add(pendingDelivery);
pendingRequests.put(txId, request);
sendTxCommand(message);
}
//----- Base class overrides ---------------------------------------------//
@Override
public void remotelyClosed(AmqpConnection connection) {
Exception txnError = AmqpSupport.convertToException(getEndpoint().getRemoteCondition());
// Alert any pending operation that the link failed to complete the pending
// begin / commit / rollback operation.
for (AsyncResult pendingRequest : pendingRequests.values()) {
pendingRequest.onFailure(txnError);
}
// Purge linkages to pending operations.
pendingDeliveries.clear();
pendingRequests.clear();
// Override the base class version because we do not want to propagate
// an error up to the client if remote close happens as that is an
// acceptable way for the remote to indicate the discharge could not
// be applied.
if (getEndpoint() != null) {
getEndpoint().close();
getEndpoint().free();
}
LOG.debug("Transaction Coordinator link {} was remotely closed", getEndpoint());
}
//----- Internal implementation ------------------------------------------//
private void sendTxCommand(Message message) throws IOException {
int encodedSize = 0;
byte[] buffer = OUTBOUND_BUFFER;
while (true) {
try {
encodedSize = message.encode(buffer, 0, buffer.length);
break;
} catch (BufferOverflowException e) {
buffer = new byte[buffer.length * 2];
}
}
Sender sender = getEndpoint();
sender.send(buffer, 0, encodedSize);
sender.advance();
}
@Override
protected void doOpen() {
Coordinator coordinator = new Coordinator();
coordinator.setCapabilities(TxnCapability.LOCAL_TXN);
Source source = new Source();
String coordinatorName = "qpid-jms:coordinator:" + session.getConnection().getConnectionId();
Sender sender = session.getEndpoint().sender(coordinatorName);
sender.setSource(source);
sender.setTarget(coordinator);
sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
setEndpoint(sender);
super.doOpen();
}
@Override
protected void doOpenInspection() {
// TODO
}
@Override
protected void doClosedInspection() {
// TODO
}
}

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client;
import org.apache.qpid.proton.amqp.Binary;
/**
* Wrapper For Transaction state in identification
*/
public class AmqpTransactionId {
public static final int DECLARE_MARKER = 1;
public static final int ROLLBACK_MARKER = 2;
public static final int COMMIT_MARKER = 3;
private final String txId;
private Binary remoteTxId;
private int state = DECLARE_MARKER;
public AmqpTransactionId(String txId) {
this.txId = txId;
}
public boolean isDeclare() {
return state == DECLARE_MARKER;
}
public boolean isCommit() {
return state == COMMIT_MARKER;
}
public boolean isRollback() {
return state == ROLLBACK_MARKER;
}
public void setState(int state) {
this.state = state;
}
public String getTxId() {
return txId;
}
public Binary getRemoteTxId() {
return remoteTxId;
}
public void setRemoteTxId(Binary remoteTxId) {
this.remoteTxId = remoteTxId;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((txId == null) ? 0 : txId.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
AmqpTransactionId other = (AmqpTransactionId) obj;
if (txId == null) {
if (other.txId != null) {
return false;
}
} else if (!txId.equals(other.txId)) {
return false;
}
return true;
}
}

View File

@ -19,21 +19,24 @@ package org.apache.activemq.transport.amqp.client.util;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Asynchronous Client Future class.
*/
public class ClientFuture extends WrappedAsyncResult {
public class ClientFuture implements AsyncResult {
protected final CountDownLatch latch = new CountDownLatch(1);
protected Throwable error;
private final AtomicBoolean completer = new AtomicBoolean();
private final CountDownLatch latch = new CountDownLatch(1);
private final ClientFutureSynchronization synchronization;
private volatile Throwable error;
public ClientFuture() {
super(null);
this(null);
}
public ClientFuture(AsyncResult watcher) {
super(watcher);
public ClientFuture(ClientFutureSynchronization synchronization) {
this.synchronization = synchronization;
}
@Override
@ -43,15 +46,23 @@ public class ClientFuture extends WrappedAsyncResult {
@Override
public void onFailure(Throwable result) {
if (completer.compareAndSet(false, true)) {
error = result;
if (synchronization != null) {
synchronization.onPendingFailure(error);
}
latch.countDown();
super.onFailure(result);
}
}
@Override
public void onSuccess() {
if (completer.compareAndSet(false, true)) {
if (synchronization != null) {
synchronization.onPendingSuccess();
}
latch.countDown();
super.onSuccess();
}
}
/**

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
/**
* Synchronization callback interface used to execute state updates
* or similar tasks in the thread context where the associated
* ProviderFuture is managed.
*/
public interface ClientFutureSynchronization {
void onPendingSuccess();
void onPendingFailure(Throwable cause);
}

View File

@ -0,0 +1,181 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
/**
* Test various aspects of Transaction support.
*/
public class AmqpTransactionTest extends AmqpClientTestSupport {
@Test(timeout = 30000)
public void testBeginAndCommitTransaction() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
assertNotNull(session);
session.begin();
assertTrue(session.isInTransaction());
session.commit();
connection.close();
}
@Test(timeout = 30000)
public void testBeginAndRollbackTransaction() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
assertNotNull(session);
session.begin();
assertTrue(session.isInTransaction());
session.rollback();
connection.close();
}
@Test(timeout = 60000)
public void testSendMessageToQueueWithCommit() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
final QueueViewMBean queue = getProxyToQueue(getTestName());
session.begin();
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
assertEquals(0, queue.getQueueSize());
session.commit();
assertEquals(1, queue.getQueueSize());
sender.close();
connection.close();
}
@Test(timeout = 60000)
public void testSendMessageToQueueWithRollback() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
final QueueViewMBean queue = getProxyToQueue(getTestName());
session.begin();
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
assertEquals(0, queue.getQueueSize());
session.rollback();
assertEquals(0, queue.getQueueSize());
sender.close();
connection.close();
}
@Test(timeout = 60000)
public void testReceiveMessageWithCommit() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
final QueueViewMBean queue = getProxyToQueue(getTestName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
assertEquals(1, queue.getQueueSize());
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
session.begin();
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
session.commit();
assertEquals(0, queue.getQueueSize());
sender.close();
connection.close();
}
@Test(timeout = 60000)
public void testReceiveMessageWithRollback() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
final QueueViewMBean queue = getProxyToQueue(getTestName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
assertEquals(1, queue.getQueueSize());
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
session.begin();
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
session.rollback();
assertEquals(1, queue.getQueueSize());
sender.close();
connection.close();
}
}