Add workarounds to allow for TX work to take place in multiple sessions
on the same connection.  Future work needed to properly support TXN
Capabilities defined in the spec and support checking of violations of
expected behavior.
This commit is contained in:
Timothy Bish 2015-06-16 16:41:18 -04:00
parent 7b5c8be377
commit ed266835b5
6 changed files with 107 additions and 31 deletions

View File

@ -60,11 +60,13 @@ import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.amqp.AmqpHeader;
import org.apache.activemq.transport.amqp.AmqpInactivityMonitor;
@ -142,10 +144,12 @@ public class AmqpConnection implements AmqpProtocolConverter {
private final ConnectionInfo connectionInfo = new ConnectionInfo();
private long nextSessionId;
private long nextTempDestinationId;
private long nextTransactionId;
private boolean closing;
private boolean closedSocket;
private AmqpAuthenticator authenticator;
private final Map<TransactionId, AmqpTransactionCoordinator> transactions = new HashMap<TransactionId, AmqpTransactionCoordinator>();
private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final ConcurrentMap<ConsumerId, AmqpSender> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, AmqpSender>();
@ -667,6 +671,22 @@ public class AmqpConnection implements AmqpProtocolConverter {
subscriptionsByConsumerId.remove(consumerId);
}
void registerTransaction(TransactionId txId, AmqpTransactionCoordinator coordinator) {
transactions.put(txId, coordinator);
}
void unregisterTransaction(TransactionId txId) {
transactions.remove(txId);
}
AmqpTransactionCoordinator getTxCoordinator(TransactionId txId) {
return transactions.get(txId);
}
LocalTransactionId getNextTransactionId() {
return new LocalTransactionId(getConnectionId(), ++nextTransactionId);
}
ConsumerInfo lookupSubscription(String subscriptionName) throws AmqpProtocolException {
ConsumerInfo result = null;
RegionBroker regionBroker;

View File

@ -31,6 +31,7 @@ import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
import org.apache.activemq.transport.amqp.ResponseHandler;
import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer;
@ -205,9 +206,10 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
final DeliveryState remoteState = delivery.getRemoteState();
if (remoteState != null && remoteState instanceof TransactionalState) {
TransactionalState s = (TransactionalState) remoteState;
long txid = toLong(s.getTxnId());
message.setTransactionId(new LocalTransactionId(session.getConnection().getConnectionId(), txid));
TransactionalState txState = (TransactionalState) remoteState;
TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId()));
session.enlist(txId);
message.setTransactionId(txId);
}
message.onSend();

View File

@ -34,6 +34,7 @@ import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
import org.apache.activemq.transport.amqp.ResponseHandler;
import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
@ -447,14 +448,13 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
DeliveryState remoteState = delivery.getRemoteState();
if (remoteState != null && remoteState instanceof TransactionalState) {
TransactionalState s = (TransactionalState) remoteState;
long txid = toLong(s.getTxnId());
LocalTransactionId localTxId = new LocalTransactionId(session.getConnection().getConnectionId(), txid);
ack.setTransactionId(localTxId);
TransactionalState txState = (TransactionalState) remoteState;
TransactionId txId = new LocalTransactionId(session.getConnection().getConnectionId(), toLong(txState.getTxnId()));
ack.setTransactionId(txId);
// Store the message sent in this TX we might need to
// re-send on rollback
md.getMessage().setTransactionId(localTxId);
// Store the message sent in this TX we might need to re-send on rollback
session.enlist(txId);
md.getMessage().setTransactionId(txId);
dispatchedInTx.addFirst(md);
}

View File

@ -41,6 +41,7 @@ import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
import org.apache.activemq.transport.amqp.AmqpProtocolException;
@ -72,6 +73,7 @@ public class AmqpSession implements AmqpResource {
private final Session protonSession;
private final SessionId sessionId;
private boolean enlisted;
private long nextProducerId = 0;
private long nextConsumerId = 0;
@ -122,6 +124,8 @@ public class AmqpSession implements AmqpResource {
for (AmqpSender consumer : consumers.values()) {
consumer.commit();
}
enlisted = false;
}
/**
@ -133,6 +137,8 @@ public class AmqpSession implements AmqpResource {
for (AmqpSender consumer : consumers.values()) {
consumer.rollback();
}
enlisted = false;
}
/**
@ -367,6 +373,13 @@ public class AmqpSession implements AmqpResource {
connection.unregisterSender(consumerId);
}
public void enlist(TransactionId txId) {
if (!enlisted) {
connection.getTxCoordinator(txId).enlist(this);
enlisted = true;
}
}
//----- Configuration accessors ------------------------------------------//
public AmqpConnection getConnection() {

View File

@ -20,6 +20,8 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.toBytes;
import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionId;
@ -54,7 +56,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionCoordinator.class);
private long nextTransactionId;
private final Set<AmqpSession> txSessions = new HashSet<AmqpSession>();
/**
* Creates a new Transaction coordinator used to manage AMQP transactions.
@ -82,7 +84,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
}
final AmqpSession session = (AmqpSession) getEndpoint().getSession().getContext();
ConnectionId connectionId = session.getConnection().getConnectionId();
final ConnectionId connectionId = session.getConnection().getConnectionId();
final Object action = ((AmqpValue) message.getBody()).getValue();
LOG.debug("COORDINATOR received: {}, [{}]", action, deliveryBytes);
@ -92,35 +94,41 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
throw new Exception("don't know how to handle a declare /w a set GlobalId");
}
long txid = getNextTransactionId();
TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), TransactionInfo.BEGIN);
sendToActiveMQ(txinfo, null);
LOG.trace("started transaction {}", txid);
LocalTransactionId txId = session.getConnection().getNextTransactionId();
TransactionInfo txInfo = new TransactionInfo(connectionId, txId, TransactionInfo.BEGIN);
session.getConnection().registerTransaction(txId, this);
sendToActiveMQ(txInfo, null);
LOG.trace("started transaction {}", txId.getValue());
Declared declared = new Declared();
declared.setTxnId(new Binary(toBytes(txid)));
declared.setTxnId(new Binary(toBytes(txId.getValue())));
delivery.disposition(declared);
delivery.settle();
} else if (action instanceof Discharge) {
Discharge discharge = (Discharge) action;
long txid = toLong(discharge.getTxnId());
final Discharge discharge = (Discharge) action;
final LocalTransactionId txId = new LocalTransactionId(connectionId, toLong(discharge.getTxnId()));
final byte operation;
if (discharge.getFail()) {
LOG.trace("rollback transaction {}", txid);
LOG.trace("rollback transaction {}", txId.getValue());
operation = TransactionInfo.ROLLBACK;
} else {
LOG.trace("commit transaction {}", txid);
LOG.trace("commit transaction {}", txId.getValue());
operation = TransactionInfo.COMMIT_ONE_PHASE;
}
if (operation == TransactionInfo.ROLLBACK) {
session.rollback();
} else {
session.commit();
for (AmqpSession txSession : txSessions) {
if (operation == TransactionInfo.ROLLBACK) {
txSession.rollback();
} else {
txSession.commit();
}
}
TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), operation);
txSessions.clear();
session.getConnection().unregisterTransaction(txId);
TransactionInfo txinfo = new TransactionInfo(connectionId, txId, operation);
sendToActiveMQ(txinfo, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
@ -132,6 +140,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
} else {
delivery.disposition(Accepted.getInstance());
}
LOG.debug("TX: {} settling {}", operation, action);
delivery.settle();
session.pumpProtonToSocket();
@ -157,10 +166,6 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
}
}
private long getNextTransactionId() {
return ++nextTransactionId;
}
@Override
public ActiveMQDestination getDestination() {
return null;
@ -169,4 +174,8 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
@Override
public void setDestination(ActiveMQDestination destination) {
}
public void enlist(AmqpSession session) {
txSessions.add(session);
}
}

View File

@ -43,6 +43,38 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
private final int MSG_COUNT = 1000;
@Test(timeout = 60000)
public void testProduceOneConsumeOneInTx() throws Exception {
connection = createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination queue = session.createQueue(getTestName());
MessageProducer messageProducer = session.createProducer(queue);
messageProducer.send(session.createMessage());
session.rollback();
QueueViewMBean queueView = getProxyToQueue(getTestName());
assertEquals(0, queueView.getQueueSize());
messageProducer.send(session.createMessage());
session.commit();
assertEquals(1, queueView.getQueueSize());
MessageConsumer messageConsumer = session.createConsumer(queue);
assertNotNull(messageConsumer.receive(5000));
session.rollback();
assertEquals(1, queueView.getQueueSize());
assertNotNull(messageConsumer.receive(5000));
session.commit();
assertEquals(0, queueView.getQueueSize());
}
@Test(timeout = 60000)
public void testSingleConsumedMessagePerTxCase() throws Exception {
connection = createConnection();