AMQ-6317: Use an SQL Statement for each createSchemaStatement

closes #190
This commit is contained in:
Jeroen Bastijns 2016-06-09 11:09:18 +02:00 committed by gtully
parent 9e856290c4
commit 03a211ec06
3 changed files with 265 additions and 51 deletions

View File

@ -49,6 +49,32 @@
<artifactId>activeio-core</artifactId> <artifactId>activeio-core</artifactId>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<!-- =============================== -->
<!-- Testing Dependencies -->
<!-- =============================== -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<reporting> <reporting>

View File

@ -16,6 +16,9 @@
*/ */
package org.apache.activemq.store.jdbc.adapter; package org.apache.activemq.store.jdbc.adapter;
import static javax.xml.bind.DatatypeConverter.parseBase64Binary;
import static javax.xml.bind.DatatypeConverter.printBase64Binary;
import java.io.IOException; import java.io.IOException;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
@ -37,7 +40,6 @@ import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.jdbc.JDBCAdapter; import org.apache.activemq.store.jdbc.JDBCAdapter;
import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener; import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener; import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
import org.apache.activemq.store.jdbc.JDBCMessageStore;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore; import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore;
import org.apache.activemq.store.jdbc.Statements; import org.apache.activemq.store.jdbc.Statements;
@ -46,9 +48,6 @@ import org.apache.activemq.util.DataByteArrayOutputStream;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static javax.xml.bind.DatatypeConverter.parseBase64Binary;
import static javax.xml.bind.DatatypeConverter.printBase64Binary;
/** /**
* Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
* encouraged to override the default implementation of methods to account for differences in JDBC Driver * encouraged to override the default implementation of methods to account for differences in JDBC Driver
@ -65,6 +64,7 @@ import static javax.xml.bind.DatatypeConverter.printBase64Binary;
public class DefaultJDBCAdapter implements JDBCAdapter { public class DefaultJDBCAdapter implements JDBCAdapter {
private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class); private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class);
public static final int MAX_ROWS = org.apache.activemq.ActiveMQPrefetchPolicy.MAX_PREFETCH_SIZE; public static final int MAX_ROWS = org.apache.activemq.ActiveMQPrefetchPolicy.MAX_PREFETCH_SIZE;
private static final String FAILURE_MESSAGE = "Failure was: %s Message: %s SQLState: %s Vendor code: %s";
protected Statements statements; protected Statements statements;
private boolean batchStatements = true; private boolean batchStatements = true;
//This is deprecated and should be removed in a future release //This is deprecated and should be removed in a future release
@ -82,58 +82,68 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
@Override @Override
public void doCreateTables(TransactionContext c) throws SQLException, IOException { public void doCreateTables(TransactionContext transactionContext) throws SQLException, IOException {
Statement s = null;
cleanupExclusiveLock.writeLock().lock(); cleanupExclusiveLock.writeLock().lock();
try { try {
// Check to see if the table already exists. If it does, then don't // Check to see if the table already exists. If it does, then don't log warnings during startup.
// log warnings during startup. // Need to run the scripts anyways since they may contain ALTER statements that upgrade a previous version of the table
// Need to run the scripts anyways since they may contain ALTER boolean messageTableAlreadyExists = messageTableAlreadyExists(transactionContext);
// statements that upgrade a previous version
// of the table for (String createStatement : this.statements.getCreateSchemaStatements()) {
boolean alreadyExists = false;
ResultSet rs = null;
try {
rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),
new String[] { "TABLE" });
alreadyExists = rs.next();
} catch (Throwable ignore) {
} finally {
close(rs);
}
s = c.getConnection().createStatement();
String[] createStatments = this.statements.getCreateSchemaStatements();
for (int i = 0; i < createStatments.length; i++) {
// This will fail usually since the tables will be // This will fail usually since the tables will be
// created already. // created already.
try { executeStatement(transactionContext, createStatement, messageTableAlreadyExists);
LOG.debug("Executing SQL: " + createStatments[i]);
s.execute(createStatments[i]);
} catch (SQLException e) {
if (alreadyExists) {
LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: "
+ createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
+ " Vendor code: " + e.getErrorCode());
} else {
LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
+ createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
+ " Vendor code: " + e.getErrorCode());
JDBCPersistenceAdapter.log("Failure details: ", e);
}
}
}
// if autoCommit used do not call commit
if(!c.getConnection().getAutoCommit()){
c.getConnection().commit();
} }
} finally { } finally {
cleanupExclusiveLock.writeLock().unlock(); cleanupExclusiveLock.writeLock().unlock();
try { }
s.close(); }
} catch (Throwable e) {
private boolean messageTableAlreadyExists(TransactionContext transactionContext) {
boolean alreadyExists = false;
ResultSet rs = null;
try {
rs = transactionContext.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(), new String[] { "TABLE" });
alreadyExists = rs.next();
} catch (Throwable ignore) {
} finally {
close(rs);
}
return alreadyExists;
}
private void executeStatement(TransactionContext transactionContext, String createStatement, boolean ignoreStatementExecutionFailure) throws IOException {
Statement statement = null;
try {
LOG.debug("Executing SQL: " + createStatement);
statement = transactionContext.getConnection().createStatement();
statement.execute(createStatement);
commitIfAutoCommitIsDisabled(transactionContext);
} catch (SQLException e) {
if (ignoreStatementExecutionFailure) {
LOG.debug("Could not create JDBC tables; The message table already existed. " + String.format(FAILURE_MESSAGE, createStatement, e.getMessage(), e.getSQLState(), e.getErrorCode()));
} else {
LOG.warn("Could not create JDBC tables; they could already exist. " + String.format(FAILURE_MESSAGE, createStatement, e.getMessage(), e.getSQLState(), e.getErrorCode()));
JDBCPersistenceAdapter.log("Failure details: ", e);
} }
} finally {
closeStatement(statement);
}
}
private void closeStatement(Statement statement) {
try {
if (statement != null) {
statement.close();
}
} catch (SQLException ignored) {}
}
private void commitIfAutoCommitIsDisabled(TransactionContext c) throws SQLException, IOException {
if (!c.getConnection().getAutoCommit()) {
c.getConnection().commit();
} }
} }
@ -157,10 +167,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
JDBCPersistenceAdapter.log("Failure details: ", e); JDBCPersistenceAdapter.log("Failure details: ", e);
} }
} }
// if autoCommit used do not call commit commitIfAutoCommitIsDisabled(c);
if(!c.getConnection().getAutoCommit()){
c.getConnection().commit();
}
} finally { } finally {
cleanupExclusiveLock.writeLock().unlock(); cleanupExclusiveLock.writeLock().unlock();
try { try {

View File

@ -0,0 +1,181 @@
package org.apache.activemq.store.jdbc.adapter;
import static org.apache.log4j.Level.DEBUG;
import static org.apache.log4j.Level.WARN;
import static org.junit.Assert.assertEquals;
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.activemq.store.jdbc.Statements;
import org.apache.activemq.store.jdbc.TransactionContext;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class DefaultJDBCAdapterDoCreateTablesTest {
private static final String CREATE_STATEMENT1 = "createStatement1";
private static final String CREATE_STATEMENT2 = "createStatement2";
private static final String[] CREATE_STATEMENTS = new String[] { CREATE_STATEMENT1, CREATE_STATEMENT2 };
private static final int VENDOR_CODE = 1;
private static final String SQL_STATE = "SqlState";
private static final String MY_REASON = "MyReason";
private DefaultJDBCAdapter defaultJDBCAdapter;
private List<LoggingEvent> loggingEvents = new ArrayList<>();
@Mock
private ReadWriteLock readWriteLock;
@Mock
private Lock lock;
@Mock
private TransactionContext transactionContext;
@Mock(answer = RETURNS_DEEP_STUBS)
private Connection connection;
@Mock
private Statements statements;
@Mock
private ResultSet resultSet;
@Mock
private Statement statement1, statement2;
@Before
public void setUp() throws IOException, SQLException {
DefaultTestAppender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
loggingEvents.add(event);
}
};
Logger rootLogger = Logger.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
rootLogger.addAppender(appender);
defaultJDBCAdapter = new DefaultJDBCAdapter();
defaultJDBCAdapter.cleanupExclusiveLock = readWriteLock;
defaultJDBCAdapter.statements = statements;
when(statements.getCreateSchemaStatements()).thenReturn(CREATE_STATEMENTS);
when(transactionContext.getConnection()).thenReturn(connection);
when(connection.getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),new String[] { "TABLE" })).thenReturn(resultSet);
when(connection.createStatement()).thenReturn(statement1, statement2);
when(connection.getAutoCommit()).thenReturn(true);
when(readWriteLock.writeLock()).thenReturn(lock);
}
@After
public void tearDown() {
loggingEvents = new ArrayList<>();
}
@Test
public void createsTheTablesWhenNoMessageTableExistsAndLogsSqlExceptionsInWarnLevel() throws IOException, SQLException {
when(resultSet.next()).thenReturn(false);
when(statement2.execute(CREATE_STATEMENT2)).thenThrow(new SQLException(MY_REASON, SQL_STATE, VENDOR_CODE));
defaultJDBCAdapter.doCreateTables(transactionContext);
InOrder inOrder = inOrder(lock, resultSet, connection, statement1, statement2);
inOrder.verify(lock).lock();
inOrder.verify(resultSet).next();
inOrder.verify(resultSet).close();
inOrder.verify(connection).createStatement();
inOrder.verify(statement1).execute(CREATE_STATEMENT1);
inOrder.verify(statement1).close();
inOrder.verify(connection).createStatement();
inOrder.verify(statement2).execute(CREATE_STATEMENT2);
inOrder.verify(statement2).close();
inOrder.verify(lock).unlock();
assertEquals(4, loggingEvents.size());
assertLog(0, DEBUG, "Executing SQL: " + CREATE_STATEMENT1);
assertLog(1, DEBUG, "Executing SQL: " + CREATE_STATEMENT2);
assertLog(2, WARN, "Could not create JDBC tables; they could already exist. Failure was: " + CREATE_STATEMENT2 + " Message: " + MY_REASON + " SQLState: " + SQL_STATE + " Vendor code: " + VENDOR_CODE);
assertLog(3, WARN, "Failure details: " + MY_REASON);
}
@Test
public void triesTocreateTheTablesWhenMessageTableExistsAndLogsSqlExceptionsInDebugLevel() throws SQLException, IOException {
when(resultSet.next()).thenReturn(true);
when(statement1.execute(CREATE_STATEMENT1)).thenThrow(new SQLException(MY_REASON, SQL_STATE, VENDOR_CODE));
defaultJDBCAdapter.doCreateTables(transactionContext);
InOrder inOrder = inOrder(lock, resultSet, connection, statement1, statement2);
inOrder.verify(lock).lock();
inOrder.verify(resultSet).next();
inOrder.verify(resultSet).close();
inOrder.verify(connection).createStatement();
inOrder.verify(statement1).execute(CREATE_STATEMENT1);
inOrder.verify(statement1).close();
inOrder.verify(connection).createStatement();
inOrder.verify(statement2).execute(CREATE_STATEMENT2);
inOrder.verify(statement2).close();
inOrder.verify(lock).unlock();
assertEquals(3, loggingEvents.size());
assertLog(0, DEBUG, "Executing SQL: " + CREATE_STATEMENT1);
assertLog(1, DEBUG, "Could not create JDBC tables; The message table already existed. Failure was: " + CREATE_STATEMENT1 + " Message: " + MY_REASON + " SQLState: " + SQL_STATE + " Vendor code: " + VENDOR_CODE);
assertLog(2, DEBUG, "Executing SQL: " + CREATE_STATEMENT2);
}
@Test
public void commitsTheTransactionWhenAutoCommitIsDisabled() throws SQLException, IOException {
when(connection.getAutoCommit()).thenReturn(false);
when(resultSet.next()).thenReturn(false);
defaultJDBCAdapter.doCreateTables(transactionContext);
InOrder inOrder = inOrder(lock, resultSet, connection, statement1, statement2);
inOrder.verify(lock).lock();
inOrder.verify(resultSet).next();
inOrder.verify(resultSet).close();
inOrder.verify(connection).createStatement();
inOrder.verify(statement1).execute(CREATE_STATEMENT1);
inOrder.verify(connection).commit();
inOrder.verify(statement1).close();
inOrder.verify(connection).createStatement();
inOrder.verify(statement2).execute(CREATE_STATEMENT2);
inOrder.verify(connection).commit();
inOrder.verify(statement2).close();
inOrder.verify(lock).unlock();
assertEquals(2, loggingEvents.size());
assertLog(0, DEBUG, "Executing SQL: " + CREATE_STATEMENT1);
assertLog(1, DEBUG, "Executing SQL: " + CREATE_STATEMENT2);
}
private void assertLog(int messageNumber, Level level, String message) {
LoggingEvent loggingEvent = loggingEvents.get(messageNumber);
assertEquals(level, loggingEvent.getLevel());
assertEquals(message, loggingEvent.getMessage());
}
}