https://issues.apache.org/jira/browse/AMQ-4426 - default to client ack, so that if a tx aborts we never auto ack or start a local transaction

This commit is contained in:
gtully 2013-09-19 16:26:07 +01:00
parent f6f22df4f4
commit c387522217
4 changed files with 116 additions and 8 deletions

View File

@ -820,7 +820,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
for (MessageDispatch old : list) { for (MessageDispatch old : list) {
// ensure we don't filter this as a duplicate // ensure we don't filter this as a duplicate
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("on close, rollback: " + old.getMessage().getMessageId()); LOG.debug("on close, rollback duplicate: " + old.getMessage().getMessageId());
} }
session.connection.rollbackDuplicate(this, old.getMessage()); session.connection.rollbackDuplicate(this, old.getMessage());
} }

View File

@ -398,8 +398,9 @@ public class TransactionContext implements XAResource {
beforeEnd(); beforeEnd();
} catch (JMSException e) { } catch (JMSException e) {
throw toXAException(e); throw toXAException(e);
} } finally {
setXid(null); setXid(null);
}
} else if ((flags & TMSUCCESS) == TMSUCCESS) { } else if ((flags & TMSUCCESS) == TMSUCCESS) {
// set to null if this is the current xid. // set to null if this is the current xid.
// otherwise this could be an asynchronous success call // otherwise this could be an asynchronous success call
@ -408,9 +409,10 @@ public class TransactionContext implements XAResource {
beforeEnd(); beforeEnd();
} catch (JMSException e) { } catch (JMSException e) {
throw toXAException(e); throw toXAException(e);
} } finally {
setXid(null); setXid(null);
} }
}
} else { } else {
throw new XAException(XAException.XAER_INVAL); throw new XAException(XAException.XAER_INVAL);
} }
@ -684,6 +686,7 @@ public class TransactionContext implements XAResource {
this.connection.checkClosedOrFailed(); this.connection.checkClosedOrFailed();
this.connection.ensureConnectionInfoSent(); this.connection.ensureConnectionInfoSent();
} catch (JMSException e) { } catch (JMSException e) {
disassociate();
throw toXAException(e); throw toXAException(e);
} }
@ -699,6 +702,7 @@ public class TransactionContext implements XAResource {
LOG.debug("Started XA transaction: " + transactionId); LOG.debug("Started XA transaction: " + transactionId);
} }
} catch (JMSException e) { } catch (JMSException e) {
disassociate();
throw toXAException(e); throw toXAException(e);
} }
@ -712,6 +716,7 @@ public class TransactionContext implements XAResource {
LOG.debug("Ended XA transaction: " + transactionId); LOG.debug("Ended XA transaction: " + transactionId);
} }
} catch (JMSException e) { } catch (JMSException e) {
disassociate();
throw toXAException(e); throw toXAException(e);
} }
@ -729,11 +734,15 @@ public class TransactionContext implements XAResource {
} }
} }
disassociate();
}
}
private void disassociate() {
// dis-associate // dis-associate
associatedXid = null; associatedXid = null;
transactionId = null; transactionId = null;
} }
}
/** /**
* Sends the given command. Also sends the command in case of interruption, * Sends the given command. Also sends the command in case of interruption,

View File

@ -46,8 +46,9 @@ public class XaConnectionPool extends ConnectionPool {
try { try {
boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION); boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION);
if (isXa) { if (isXa) {
transacted = true; // if the xa tx aborts inflight we don't want to auto create a local transaction or auto ack
ackMode = Session.SESSION_TRANSACTED; transacted = false;
ackMode = Session.CLIENT_ACKNOWLEDGE;
} else if (transactionManager != null) { } else if (transactionManager != null) {
// cmt or transactionManager managed // cmt or transactionManager managed
transacted = false; transacted = false;

View File

@ -151,6 +151,104 @@ public class XAConnectionPoolTest extends TestSupport {
connection.close(); connection.close();
} }
public void testAckModeOfPoolNonXAWithTM() throws Exception {
final Vector<Synchronization> syncs = new Vector<Synchronization>();
ActiveMQTopic topic = new ActiveMQTopic("test");
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
pcf.setConnectionFactory(new ActiveMQConnectionFactory("vm://test?broker.persistent=false"));
// simple TM that is in a tx and will track syncs
pcf.setTransactionManager(new TransactionManager(){
@Override
public void begin() throws NotSupportedException, SystemException {
}
@Override
public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
}
@Override
public int getStatus() throws SystemException {
return Status.STATUS_ACTIVE;
}
@Override
public Transaction getTransaction() throws SystemException {
return new Transaction() {
@Override
public void commit() throws HeuristicMixedException, HeuristicRollbackException, RollbackException, SecurityException, SystemException {
}
@Override
public boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException {
return false;
}
@Override
public boolean enlistResource(XAResource xaRes) throws IllegalStateException, RollbackException, SystemException {
return false;
}
@Override
public int getStatus() throws SystemException {
return 0;
}
@Override
public void registerSynchronization(Synchronization synch) throws IllegalStateException, RollbackException, SystemException {
syncs.add(synch);
}
@Override
public void rollback() throws IllegalStateException, SystemException {
}
@Override
public void setRollbackOnly() throws IllegalStateException, SystemException {
}
};
}
@Override
public void resume(Transaction tobj) throws IllegalStateException, InvalidTransactionException, SystemException {
}
@Override
public void rollback() throws IllegalStateException, SecurityException, SystemException {
}
@Override
public void setRollbackOnly() throws IllegalStateException, SystemException {
}
@Override
public void setTransactionTimeout(int seconds) throws SystemException {
}
@Override
public Transaction suspend() throws SystemException {
return null;
}
});
TopicConnection connection = (TopicConnection) pcf.createConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
assertEquals("client ack is enforce", Session.CLIENT_ACKNOWLEDGE, session.getAcknowledgeMode());
TopicPublisher publisher = session.createPublisher(topic);
publisher.publish(session.createMessage());
// simulate a commit
for (Synchronization sync : syncs) {
sync.beforeCompletion();
}
for (Synchronization sync : syncs) {
sync.afterCompletion(1);
}
connection.close();
}
public void testInstanceOf() throws Exception { public void testInstanceOf() throws Exception {
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory(); XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
assertTrue(pcf instanceof QueueConnectionFactory); assertTrue(pcf instanceof QueueConnectionFactory);