[AMQ-6906] tidy up cleanup on jdbc error and combine updates in single completion to avoid prepared sequence update on non transacted add with error. More jdbc error related tests

This commit is contained in:
gtully 2018-05-03 11:32:21 +01:00
parent 314d5a5168
commit bd45d931ba
7 changed files with 1213 additions and 53 deletions

View File

@ -785,12 +785,10 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
}
}
public void commitAdd(ConnectionContext context, MessageId messageId, long preparedSequenceId) throws IOException {
public void commitAdd(ConnectionContext context, final MessageId messageId, final long preparedSequenceId, final long newSequence) throws IOException {
TransactionContext c = getTransactionContext(context);
try {
long sequence = (Long)messageId.getEntryLocator();
getAdapter().doCommitAddOp(c, preparedSequenceId, sequence);
messageId.setEntryLocator(preparedSequenceId);
getAdapter().doCommitAddOp(c, preparedSequenceId, newSequence);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to commit add: " + messageId + ". Reason: " + e, e);

View File

@ -78,11 +78,12 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
cmd.run(ctx);
}
persistenceAdapter.commitTransaction(ctx);
} catch ( IOException e ) {
persistenceAdapter.rollbackTransaction(ctx);
throw e;
}
persistenceAdapter.commitTransaction(ctx);
ctx.setXid(null);
// setup for commit outcome
@ -126,13 +127,15 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
final Long preparedEntrySequence = (Long) message.getMessageId().getEntryLocator();
TransactionContext c = jdbcPersistenceAdapter.getTransactionContext(context);
long newSequence;
synchronized (jdbcMessageStore.pendingAdditions) {
message.getMessageId().setEntryLocator(jdbcPersistenceAdapter.getNextSequenceId());
newSequence = jdbcPersistenceAdapter.getNextSequenceId();
final long sequenceToSet = newSequence;
c.onCompletion(new Runnable() {
@Override
public void run() {
message.getMessageId().setFutureOrSequenceLong(message.getMessageId().getEntryLocator());
message.getMessageId().setEntryLocator(sequenceToSet);
message.getMessageId().setFutureOrSequenceLong(sequenceToSet);
}
});
@ -141,7 +144,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
}
}
jdbcPersistenceAdapter.commitAdd(context, message.getMessageId(), preparedEntrySequence);
jdbcPersistenceAdapter.commitAdd(context, message.getMessageId(), preparedEntrySequence, newSequence);
jdbcMessageStore.onAdd(message, (Long)message.getMessageId().getEntryLocator(), message.getPriority());
}
@ -175,8 +178,9 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
((LastAckCommand)removeMessageCommand).rollback(ctx);
} else {
MessageId messageId = removeMessageCommand.getMessageAck().getLastMessageId();
long sequence = (Long)messageId.getEntryLocator();
// need to unset the txid flag on the existing row
((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx, messageId, (Long)messageId.getEntryLocator());
((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx, messageId, sequence, sequence);
if (removeMessageCommand instanceof RecoveredRemoveMessageCommand) {
((JDBCMessageStore) removeMessageCommand.getMessageStore()).trackRollbackAck(((RecoveredRemoveMessageCommand) removeMessageCommand).getMessage());

View File

@ -84,7 +84,7 @@ public class TransactionContext {
} catch (IllegalMonitorStateException oops) {
LOG.error("Thread does not hold the context lock on close of:" + connection, oops);
}
close();
silentClose();
IOException ioe = IOExceptionSupport.create(e);
if (persistenceAdapter.getBrokerService() != null) {
persistenceAdapter.getBrokerService().handleIOException(ioe);
@ -137,45 +137,39 @@ public class TransactionContext {
} finally {
try {
p.close();
} catch (Throwable e) {
} catch (Throwable ignored) {}
}
}
private void silentClose() {
silentClosePreparedStatements();
if (connection != null) {
try {
connection.close();
} catch (Throwable ignored) {}
connection = null;
}
}
public void close() throws IOException {
if (!inTx) {
try {
/**
* we are not in a transaction so should not be committing ??
* This was previously commented out - but had adverse affects
* on testing - so it's back!
*
*/
try {
// can be null for topic ops that bypass the store via existing cursor state
if (connection != null) {
final boolean needsCommit = !connection.getAutoCommit();
executeBatch();
} finally {
if (connection != null && !connection.getAutoCommit()) {
if (needsCommit) {
connection.commit();
}
}
} catch (SQLException e) {
JDBCPersistenceAdapter.log("Error while closing connection: ", e);
IOException ioe = IOExceptionSupport.create(e);
persistenceAdapter.getBrokerService().handleIOException(ioe);
throw ioe;
} finally {
try {
if (connection != null) {
connection.close();
}
} catch (Throwable e) {
// ignore
LOG.trace("Closing connection failed due: " + e.getMessage() + ". This exception is ignored.", e);
} finally {
connection = null;
}
silentClose();
for (Runnable completion: completions) {
completion.run();
}
@ -197,8 +191,9 @@ public class TransactionContext {
throw new IOException("Not started.");
}
try {
final boolean needsCommit = !connection.getAutoCommit();
executeBatch();
if (!connection.getAutoCommit()) {
if (needsCommit) {
connection.commit();
}
} catch (SQLException e) {
@ -230,19 +225,23 @@ public class TransactionContext {
}
}
private PreparedStatement silentClosePreparedStatement(PreparedStatement preparedStatement) {
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (Throwable ignored) {}
}
return null;
}
private void silentClosePreparedStatements() {
addMessageStatement = silentClosePreparedStatement(addMessageStatement);
removedMessageStatement = silentClosePreparedStatement(removedMessageStatement);
updateLastAckStatement = silentClosePreparedStatement(updateLastAckStatement);
}
private void doRollback() throws SQLException {
if (addMessageStatement != null) {
addMessageStatement.close();
addMessageStatement = null;
}
if (removedMessageStatement != null) {
removedMessageStatement.close();
removedMessageStatement = null;
}
if (updateLastAckStatement != null) {
updateLastAckStatement.close();
updateLastAckStatement = null;
}
silentClosePreparedStatements();
completions.clear();
connection.rollback();
}

View File

@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
*/
public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(XARecoveryBrokerTest.class);
public boolean prioritySupport = false;
public boolean prioritySupport = true;
public void testPreparedJmxView() throws Exception {
@ -712,7 +712,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
}
public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() {
public void x_initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() {
addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
}

View File

@ -76,19 +76,23 @@ public class JDBCCommitExceptionTest extends TestCase {
broker.stop();
}
protected void dumpMessages() throws Exception {
protected int dumpMessages() throws Exception {
int count = 0;
WireFormat wireFormat = new OpenWireFormat();
java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection();
PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG FROM ACTIVEMQ_MSGS");
PreparedStatement statement = conn.prepareStatement("SELECT ID, XID, MSG FROM ACTIVEMQ_MSGS");
ResultSet result = statement.executeQuery();
LOG.info("Messages left in broker after test");
while(result.next()) {
long id = result.getLong(1);
org.apache.activemq.command.Message message = (org.apache.activemq.command.Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
String xid = result.getString(2);
org.apache.activemq.command.Message message = (org.apache.activemq.command.Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(3)));
LOG.info("id: " + id + ", xid: " + xid + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
count++;
}
statement.close();
conn.close();
return count;
}
protected int receiveMessages(int messagesExpected) throws Exception {

View File

@ -318,6 +318,7 @@ public class XACompletionTest extends TestSupport {
resource.recover(XAResource.TMSTARTRSCAN);
resource.recover(XAResource.TMNOFLAGS);
dumpMessages();
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
@ -342,6 +343,9 @@ public class XACompletionTest extends TestSupport {
consumer.close();
LOG.info("after close");
dumpMessages();
assertEquals("drain", 5, drainUnack(5, "TEST"));
dumpMessages();