ARTEMIS-1653 Allow database tables to be created externally

The previous commit about this feature wasn't using the row count query
ResultSet.
The mechanics has been changed to allow the row count query
to fail, because DROP and CREATE aren't transactional and immediate
in most DBMS.
It includes a test that stress its mechanics if used with DBMS like
DB2 10.5 and Oracle 12c.
Additional checks and logs have been added to trace each steps.
This commit is contained in:
Francesco Nigro 2018-04-03 10:11:04 +02:00 committed by Clebert Suconic
parent 8f4042c40c
commit c7651853cd
4 changed files with 225 additions and 32 deletions

View File

@ -182,25 +182,62 @@ public abstract class AbstractJDBCDriver {
logger.tracef("Validating if table %s didn't exist before creating", tableName);
try {
connection.setAutoCommit(false);
final boolean tableExists;
try (ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null)) {
if (rs != null && !rs.next()) {
if ((rs == null) || (rs != null && !rs.next())) {
tableExists = false;
if (logger.isTraceEnabled()) {
logger.tracef("Table %s did not exist, creating it with SQL=%s", tableName, Arrays.toString(sqls));
}
if (rs != null) {
final SQLWarning sqlWarning = rs.getWarnings();
if (sqlWarning != null) {
logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), sqlWarning));
}
}
} else {
tableExists = true;
}
}
if (tableExists) {
logger.tracef("Validating if the existing table %s is initialized or not", tableName);
try (Statement statement = connection.createStatement();
ResultSet cntRs = statement.executeQuery(sqlProvider.getCountJournalRecordsSQL())) {
if (rs.next() && rs.getInt(1) > 0) {
logger.tracef("Table %s did exist but is not empty. Skipping initialization.", tableName);
logger.tracef("Validation of the existing table %s initialization is started", tableName);
int rows;
if (cntRs.next() && (rows = cntRs.getInt(1)) > 0) {
logger.tracef("Table %s did exist but is not empty. Skipping initialization. Found %d rows.", tableName, rows);
if (logger.isDebugEnabled()) {
final long expectedRows = Stream.of(sqls).map(String::toUpperCase).filter(sql -> sql.contains("INSERT INTO")).count();
if (rows < expectedRows) {
logger.debug("Table " + tableName + " was expected to contain " + expectedRows + " rows while it has " + rows + " rows.");
}
}
connection.commit();
return;
} else {
sqls = Arrays.copyOfRange(sqls, 1, sqls.length);
sqls = Stream.of(sqls).filter(sql -> {
final String upperCaseSql = sql.toUpperCase();
return !(upperCaseSql.contains("CREATE TABLE") || upperCaseSql.contains("CREATE INDEX"));
}).toArray(String[]::new);
if (sqls.length > 0) {
logger.tracef("Table %s did exist but is empty. Starting initialization.", tableName);
} else {
logger.tracef("Table %s did exist but is empty. Initialization completed: no initialization statements left.", tableName);
}
}
} catch (SQLException e) {
logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder("Can't verify the initialization of table ").append(tableName).append(" due to:"), e, sqlProvider.getCountJournalRecordsSQL()));
try {
connection.rollback();
} catch (SQLException rollbackEx) {
logger.debug("Rollback failed while validating initialization of a table", rollbackEx);
}
connection.setAutoCommit(false);
logger.tracef("Table %s seems to exist, but we can't verify the initialization. Keep trying to create and initialize.", tableName);
}
}
if (sqls.length > 0) {
try (Statement statement = connection.createStatement()) {
for (String sql : sqls) {
statement.executeUpdate(sql);
@ -210,9 +247,9 @@ public abstract class AbstractJDBCDriver {
}
}
}
}
connection.commit();
}
} catch (SQLException e) {
final String sqlStatements = Stream.of(sqls).collect(Collectors.joining("\n"));
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, sqlStatements));

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.util.UUID;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
import org.junit.Test;
public class JdbcSharedStateManagerTest extends ActiveMQTestBase {
private DatabaseStorageConfiguration dbConf;
private SQLProvider sqlProvider;
@Before
public void configure() {
dbConf = createDefaultDatabaseStorageConfiguration();
sqlProvider = JDBCUtils.getSQLProvider(
dbConf.getJdbcDriverClassName(),
dbConf.getNodeManagerStoreTableName(),
SQLProvider.DatabaseStoreType.NODE_MANAGER);
}
private TestJDBCDriver createFakeDriver(boolean initializeTable) {
return TestJDBCDriver.usingConnectionUrl(
dbConf.getJdbcConnectionUrl(),
dbConf.getJdbcDriverClassName(),
sqlProvider,
initializeTable);
}
private JdbcSharedStateManager createSharedStateManager() {
return JdbcSharedStateManager.usingConnectionUrl(
UUID.randomUUID().toString(),
dbConf.getJdbcLockExpirationMillis(),
dbConf.getJdbcMaxAllowedMillisFromDbTime(),
dbConf.getJdbcConnectionUrl(),
dbConf.getJdbcDriverClassName(),
sqlProvider);
}
@Test(timeout = 10000)
public void shouldStartIfTableNotExist() throws Exception {
final JdbcSharedStateManager sharedStateManager = createSharedStateManager();
try {
sharedStateManager.destroy();
} finally {
sharedStateManager.stop();
}
}
@Test(timeout = 10000)
public void shouldStartIfTableExistEmpty() throws Exception {
final TestJDBCDriver fakeDriver = createFakeDriver(false);
fakeDriver.start();
final JdbcSharedStateManager sharedStateManager = createSharedStateManager();
sharedStateManager.stop();
try {
fakeDriver.destroy();
} finally {
fakeDriver.stop();
}
}
@Test(timeout = 10000)
public void shouldStartIfTableExistInitialized() throws Exception {
final TestJDBCDriver fakeDriver = createFakeDriver(true);
fakeDriver.start();
final JdbcSharedStateManager sharedStateManager = createSharedStateManager();
sharedStateManager.stop();
try {
fakeDriver.destroy();
} finally {
fakeDriver.stop();
}
}
@Test(timeout = 10000)
public void shouldStartTwoIfTableNotExist() throws Exception {
final JdbcSharedStateManager liveSharedStateManager = createSharedStateManager();
final JdbcSharedStateManager backupSharedStateManager = createSharedStateManager();
backupSharedStateManager.stop();
try {
liveSharedStateManager.destroy();
} finally {
liveSharedStateManager.stop();
}
}
}

View File

@ -20,21 +20,33 @@ import java.sql.SQLException;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.junit.Assert;
public class TestJDBCDriver extends AbstractJDBCDriver {
public static TestJDBCDriver usingConnectionUrl(
String jdbcConnectionUrl,
public static TestJDBCDriver usingConnectionUrl(String jdbcConnectionUrl,
String jdbcDriverClass,
SQLProvider provider) {
TestJDBCDriver driver = new TestJDBCDriver();
return usingConnectionUrl(jdbcConnectionUrl, jdbcDriverClass, provider, false);
}
public static TestJDBCDriver usingConnectionUrl(String jdbcConnectionUrl,
String jdbcDriverClass,
SQLProvider provider,
boolean initialize) {
TestJDBCDriver driver = new TestJDBCDriver(initialize);
driver.setSqlProvider(provider);
driver.setJdbcConnectionUrl(jdbcConnectionUrl);
driver.setJdbcDriverClass(jdbcDriverClass);
return driver;
}
private boolean initialize;
private TestJDBCDriver(boolean initialize) {
this.initialize = initialize;
}
@Override
protected void prepareStatements() throws SQLException {
}
@ -43,7 +55,14 @@ public class TestJDBCDriver extends AbstractJDBCDriver {
protected void createSchema() throws SQLException {
try {
connection.createStatement().execute(sqlProvider.createNodeManagerStoreTableSQL());
if (initialize) {
connection.createStatement().execute(sqlProvider.createNodeIdSQL());
connection.createStatement().execute(sqlProvider.createStateSQL());
connection.createStatement().execute(sqlProvider.createLiveLockSQL());
connection.createStatement().execute(sqlProvider.createBackupLockSQL());
}
} catch (SQLException e) {
Assert.fail(e.getMessage());
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.jdbc.store.journal;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@ -26,40 +27,38 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.DERBY;
public class JDBCJournalTest extends ActiveMQTestBase {
@Rule
public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();
private static final String JOURNAL_TABLE_NAME = "MESSAGE_JOURNAL";
private static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
private JDBCJournalImpl journal;
private String jdbcUrl;
private ScheduledExecutorService scheduledExecutorService;
private ExecutorService executorService;
private SQLProvider sqlProvider;
private DatabaseStorageConfiguration dbConf;
@After
@Override
public void tearDown() throws Exception {
@ -77,19 +76,49 @@ public class JDBCJournalTest extends ActiveMQTestBase {
@Before
public void setup() throws Exception {
dbConf = createDefaultDatabaseStorageConfiguration();
sqlProvider = JDBCUtils.getSQLProvider(
dbConf.getJdbcDriverClassName(),
dbConf.getMessageTableName(),
SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL);
scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
executorService = Executors.newSingleThreadExecutor();
jdbcUrl = "jdbc:derby:target/data;create=true";
SQLProvider.Factory factory = new PropertySQLProvider.Factory(DERBY);
journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME, SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService, new IOCriticalErrorListener() {
journal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcDriverClassName(), sqlProvider, scheduledExecutorService, executorService, new IOCriticalErrorListener() {
@Override
public void onIOException(Throwable code, String message, SequentialFile file) {
}
},5);
}, 5);
journal.start();
}
@Test
public void testRestartEmptyJournal() throws SQLException {
Assert.assertTrue(journal.isStarted());
Assert.assertEquals(0, journal.getNumberOfRecords());
journal.stop();
journal.start();
Assert.assertTrue(journal.isStarted());
}
@Test
public void testConcurrentEmptyJournal() throws SQLException {
Assert.assertTrue(journal.isStarted());
Assert.assertEquals(0, journal.getNumberOfRecords());
final JDBCJournalImpl secondJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(),
dbConf.getJdbcDriverClassName(),
sqlProvider, scheduledExecutorService,
executorService, (code, message, file) -> {
Assert.fail(message);
}, 5);
secondJournal.start();
try {
Assert.assertTrue(secondJournal.isStarted());
} finally {
secondJournal.stop();
}
}
@Test
public void testInsertRecords() throws Exception {
int noRecords = 10;