resolve issue with broken ra tests, allow XA operation on non transacted session, check for transaction now is xa aware

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@739961 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-02-02 11:14:05 +00:00
parent 59643ef23e
commit 40465998ee
5 changed files with 39 additions and 25 deletions

View File

@ -592,7 +592,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
*/ */
public void close() throws JMSException { public void close() throws JMSException {
if (!unconsumedMessages.isClosed()) { if (!unconsumedMessages.isClosed()) {
if (session.isTransacted() && session.getTransactionContext().getTransactionId() != null) { if (session.getTransactionContext().isInTransaction()) {
session.getTransactionContext().addSynchronization(new Synchronization() { session.getTransactionContext().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception { public void afterCommit() throws Exception {
doClose(); doClose();
@ -667,7 +667,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
// Do we have any acks we need to send out before closing? // Do we have any acks we need to send out before closing?
// Ack any delivered messages now. // Ack any delivered messages now.
if (!session.isTransacted()) { if (!session.getTransacted()) {
deliverAcks(); deliverAcks();
if (session.isDupsOkAcknowledge()) { if (session.isDupsOkAcknowledge()) {
acknowledge(); acknowledge();
@ -752,7 +752,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
synchronized(deliveredMessages) { synchronized(deliveredMessages) {
deliveredMessages.addFirst(md); deliveredMessages.addFirst(md);
} }
if (session.isTransacted()) { if (session.getTransacted()) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE); ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
} }
} }
@ -766,7 +766,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
ackLater(md, MessageAck.DELIVERED_ACK_TYPE); ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
} else { } else {
stats.onMessage(); stats.onMessage();
if (session.isTransacted()) { if (session.getTransacted()) {
// Do nothing. // Do nothing.
} else if (session.isAutoAcknowledge()) { } else if (session.isAutoAcknowledge()) {
synchronized (deliveredMessages) { synchronized (deliveredMessages) {
@ -830,7 +830,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
// Don't acknowledge now, but we may need to let the broker know the // Don't acknowledge now, but we may need to let the broker know the
// consumer got the message // consumer got the message
// to expand the pre-fetch window // to expand the pre-fetch window
if (session.isTransacted()) { if (session.getTransacted()) {
session.doStartTransaction(); session.doStartTransaction();
if (!synchronizationRegistered) { if (!synchronizationRegistered) {
synchronizationRegistered = true; synchronizationRegistered = true;
@ -892,7 +892,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (ack == null) if (ack == null)
return; // no msgs return; // no msgs
if (session.isTransacted()) { if (session.getTransacted()) {
session.doStartTransaction(); session.doStartTransaction();
ack.setTransactionId(session.getTransactionContext().getTransactionId()); ack.setTransactionId(session.getTransactionContext().getTransactionId());
} }
@ -903,7 +903,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
deliveredCounter -= deliveredMessages.size(); deliveredCounter -= deliveredMessages.size();
additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
if (!session.isTransacted()) { if (!session.getTransacted()) {
deliveredMessages.clear(); deliveredMessages.clear();
} }
} }

View File

@ -1955,7 +1955,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
} }
protected void sendAck(MessageAck ack, boolean lazy) throws JMSException { protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
if (lazy || connection.isSendAcksAsync() || isTransacted()) { if (lazy || connection.isSendAcksAsync() || getTransacted()) {
asyncSendPacket(ack); asyncSendPacket(ack);
} else { } else {
syncSendPacket(ack); syncSendPacket(ack);

View File

@ -90,6 +90,10 @@ public class TransactionContext implements XAResource {
return transactionId != null && transactionId.isLocalTransaction(); return transactionId != null && transactionId.isLocalTransaction();
} }
public boolean isInTransaction() {
return transactionId != null;
}
/** /**
* @return Returns the localTransactionEventListener. * @return Returns the localTransactionEventListener.
*/ */

View File

@ -203,6 +203,7 @@ public class JmsRollbackRedeliveryTest extends AutoFailTestSupport {
} }
session.close(); session.close();
} }
consumeMessage(connection, maxRetries + 1);
} }
} }
@ -233,37 +234,55 @@ public class JmsRollbackRedeliveryTest extends AutoFailTestSupport {
} }
session.close(); session.close();
} }
consumeMessage(connection, maxRetries + 1);
} }
} }
private void consumeMessage(Connection connection, final int deliveryCount)
throws JMSException {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination destination = session.createQueue(destinationName);
MessageConsumer consumer = session.createConsumer(destination);
TextMessage msg = (TextMessage) consumer.receive(1000);
assertNotNull(msg);
assertEquals("redelivery property matches deliveries", deliveryCount, msg.getLongProperty("JMSXDeliveryCount"));
session.commit();
session.close();
}
public void testRedeliveryPropertyWithNoRollback() throws Exception { public void testRedeliveryPropertyWithNoRollback() throws Exception {
final int numMessages = 1;
ConnectionFactory connectionFactory = ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(brokerUrl); new ActiveMQConnectionFactory(brokerUrl);
Connection connection = connectionFactory.createConnection(); Connection connection = connectionFactory.createConnection();
connection.start(); connection.start();
populateDestination(nbMessages, destinationName, connection); populateDestination(numMessages, destinationName, connection);
connection.close(); connection.close();
{ {
AtomicInteger received = new AtomicInteger(); AtomicInteger received = new AtomicInteger();
while (received.get() < nbMessages) { final int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries();
while (received.get() < maxRetries) {
connection = connectionFactory.createConnection(); connection = connectionFactory.createConnection();
connection.start(); connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination destination = session.createQueue(destinationName); Destination destination = session.createQueue(destinationName);
MessageConsumer consumer = session.createConsumer(destination); MessageConsumer consumer = session.createConsumer(destination);
TextMessage msg = (TextMessage) consumer.receive(2000); TextMessage msg = (TextMessage) consumer.receive(2000);
if (msg != null) { if (msg != null) {
LOG.info("Received message " + msg.getText() + LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
" (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); assertEquals("redelivery property matches deliveries", received.get(), msg.getLongProperty("JMSXDeliveryCount"));
assertFalse(msg.getJMSRedelivered());
assertEquals(1, msg.getLongProperty("JMSXDeliveryCount"));
} }
session.close(); session.close();
connection.close(); connection.close();
} }
connection = connectionFactory.createConnection();
connection.start();
consumeMessage(connection, maxRetries + 1);
} }
} }

View File

@ -47,15 +47,6 @@ public class JmsXAQueueTransactionTest extends JmsQueueTransactionTest {
private static long txGenerator; private static long txGenerator;
private Xid xid; private Xid xid;
// TODO fix for XA
public void testReceiveTwoThenCloseConnection() throws Exception {}
public void testReceiveRollback() throws Exception {}
public void testReceiveTwoThenRollback() throws Exception {}
public void testReceiveTwoThenRollbackManyTimes() throws Exception {}
public void testReceiveRollbackWithPrefetchOfOne() throws Exception {}
public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception {}
@Override @Override
protected void setSessionTransacted() { protected void setSessionTransacted() {
resourceProvider.setTransacted(false); resourceProvider.setTransacted(false);