From c7651853cdb291dfa3bd2906e1e082fd06cff612 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Tue, 3 Apr 2018 10:11:04 +0200 Subject: [PATCH] ARTEMIS-1653 Allow database tables to be created externally The previous commit about this feature wasn't using the row count query ResultSet. The mechanics has been changed to allow the row count query to fail, because DROP and CREATE aren't transactional and immediate in most DBMS. It includes a test that stress its mechanics if used with DBMS like DB2 10.5 and Oracle 12c. Additional checks and logs have been added to trace each steps. --- .../store/drivers/AbstractJDBCDriver.java | 65 ++++++++--- .../impl/jdbc/JdbcSharedStateManagerTest.java | 108 ++++++++++++++++++ .../core/server/impl/jdbc/TestJDBCDriver.java | 29 ++++- .../jdbc/store/journal/JDBCJournalTest.java | 55 ++++++--- 4 files changed, 225 insertions(+), 32 deletions(-) create mode 100644 artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java index 62c9501e14..e421a3bba9 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java @@ -182,25 +182,62 @@ public abstract class AbstractJDBCDriver { logger.tracef("Validating if table %s didn't exist before creating", tableName); try { connection.setAutoCommit(false); + final boolean tableExists; try (ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null)) { - if (rs != null && !rs.next()) { + if ((rs == null) || (rs != null && !rs.next())) { + tableExists = false; if (logger.isTraceEnabled()) { logger.tracef("Table %s did not exist, creating it with SQL=%s", tableName, Arrays.toString(sqls)); } - final SQLWarning sqlWarning = rs.getWarnings(); - if (sqlWarning != null) { - logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), sqlWarning)); - } - } else { - try (Statement statement = connection.createStatement(); - ResultSet cntRs = statement.executeQuery(sqlProvider.getCountJournalRecordsSQL())) { - if (rs.next() && rs.getInt(1) > 0) { - logger.tracef("Table %s did exist but is not empty. Skipping initialization.", tableName); - } else { - sqls = Arrays.copyOfRange(sqls, 1, sqls.length); + if (rs != null) { + final SQLWarning sqlWarning = rs.getWarnings(); + if (sqlWarning != null) { + logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), sqlWarning)); } } + } else { + tableExists = true; } + } + if (tableExists) { + logger.tracef("Validating if the existing table %s is initialized or not", tableName); + try (Statement statement = connection.createStatement(); + ResultSet cntRs = statement.executeQuery(sqlProvider.getCountJournalRecordsSQL())) { + logger.tracef("Validation of the existing table %s initialization is started", tableName); + int rows; + if (cntRs.next() && (rows = cntRs.getInt(1)) > 0) { + logger.tracef("Table %s did exist but is not empty. Skipping initialization. Found %d rows.", tableName, rows); + if (logger.isDebugEnabled()) { + final long expectedRows = Stream.of(sqls).map(String::toUpperCase).filter(sql -> sql.contains("INSERT INTO")).count(); + if (rows < expectedRows) { + logger.debug("Table " + tableName + " was expected to contain " + expectedRows + " rows while it has " + rows + " rows."); + } + } + connection.commit(); + return; + } else { + sqls = Stream.of(sqls).filter(sql -> { + final String upperCaseSql = sql.toUpperCase(); + return !(upperCaseSql.contains("CREATE TABLE") || upperCaseSql.contains("CREATE INDEX")); + }).toArray(String[]::new); + if (sqls.length > 0) { + logger.tracef("Table %s did exist but is empty. Starting initialization.", tableName); + } else { + logger.tracef("Table %s did exist but is empty. Initialization completed: no initialization statements left.", tableName); + } + } + } catch (SQLException e) { + logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder("Can't verify the initialization of table ").append(tableName).append(" due to:"), e, sqlProvider.getCountJournalRecordsSQL())); + try { + connection.rollback(); + } catch (SQLException rollbackEx) { + logger.debug("Rollback failed while validating initialization of a table", rollbackEx); + } + connection.setAutoCommit(false); + logger.tracef("Table %s seems to exist, but we can't verify the initialization. Keep trying to create and initialize.", tableName); + } + } + if (sqls.length > 0) { try (Statement statement = connection.createStatement()) { for (String sql : sqls) { statement.executeUpdate(sql); @@ -210,9 +247,9 @@ public abstract class AbstractJDBCDriver { } } } - } - connection.commit(); + connection.commit(); + } } catch (SQLException e) { final String sqlStatements = Stream.of(sqls).collect(Collectors.joining("\n")); logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, sqlStatements)); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java new file mode 100644 index 0000000000..e7ac316f97 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.impl.jdbc; + +import java.util.UUID; + +import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Before; +import org.junit.Test; + +public class JdbcSharedStateManagerTest extends ActiveMQTestBase { + + private DatabaseStorageConfiguration dbConf; + private SQLProvider sqlProvider; + + @Before + public void configure() { + dbConf = createDefaultDatabaseStorageConfiguration(); + sqlProvider = JDBCUtils.getSQLProvider( + dbConf.getJdbcDriverClassName(), + dbConf.getNodeManagerStoreTableName(), + SQLProvider.DatabaseStoreType.NODE_MANAGER); + } + + private TestJDBCDriver createFakeDriver(boolean initializeTable) { + return TestJDBCDriver.usingConnectionUrl( + dbConf.getJdbcConnectionUrl(), + dbConf.getJdbcDriverClassName(), + sqlProvider, + initializeTable); + } + + private JdbcSharedStateManager createSharedStateManager() { + return JdbcSharedStateManager.usingConnectionUrl( + UUID.randomUUID().toString(), + dbConf.getJdbcLockExpirationMillis(), + dbConf.getJdbcMaxAllowedMillisFromDbTime(), + dbConf.getJdbcConnectionUrl(), + dbConf.getJdbcDriverClassName(), + sqlProvider); + } + + @Test(timeout = 10000) + public void shouldStartIfTableNotExist() throws Exception { + final JdbcSharedStateManager sharedStateManager = createSharedStateManager(); + try { + sharedStateManager.destroy(); + } finally { + sharedStateManager.stop(); + } + } + + @Test(timeout = 10000) + public void shouldStartIfTableExistEmpty() throws Exception { + final TestJDBCDriver fakeDriver = createFakeDriver(false); + fakeDriver.start(); + final JdbcSharedStateManager sharedStateManager = createSharedStateManager(); + sharedStateManager.stop(); + try { + fakeDriver.destroy(); + } finally { + fakeDriver.stop(); + } + } + + @Test(timeout = 10000) + public void shouldStartIfTableExistInitialized() throws Exception { + final TestJDBCDriver fakeDriver = createFakeDriver(true); + fakeDriver.start(); + final JdbcSharedStateManager sharedStateManager = createSharedStateManager(); + sharedStateManager.stop(); + try { + fakeDriver.destroy(); + } finally { + fakeDriver.stop(); + } + } + + @Test(timeout = 10000) + public void shouldStartTwoIfTableNotExist() throws Exception { + final JdbcSharedStateManager liveSharedStateManager = createSharedStateManager(); + final JdbcSharedStateManager backupSharedStateManager = createSharedStateManager(); + backupSharedStateManager.stop(); + try { + liveSharedStateManager.destroy(); + } finally { + liveSharedStateManager.stop(); + } + } +} diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java index 52b497aa38..2df62747d2 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java @@ -20,21 +20,33 @@ import java.sql.SQLException; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; +import org.junit.Assert; public class TestJDBCDriver extends AbstractJDBCDriver { + public static TestJDBCDriver usingConnectionUrl(String jdbcConnectionUrl, + String jdbcDriverClass, + SQLProvider provider) { + return usingConnectionUrl(jdbcConnectionUrl, jdbcDriverClass, provider, false); + } - public static TestJDBCDriver usingConnectionUrl( - String jdbcConnectionUrl, - String jdbcDriverClass, - SQLProvider provider) { - TestJDBCDriver driver = new TestJDBCDriver(); + public static TestJDBCDriver usingConnectionUrl(String jdbcConnectionUrl, + String jdbcDriverClass, + SQLProvider provider, + boolean initialize) { + TestJDBCDriver driver = new TestJDBCDriver(initialize); driver.setSqlProvider(provider); driver.setJdbcConnectionUrl(jdbcConnectionUrl); driver.setJdbcDriverClass(jdbcDriverClass); return driver; } + private boolean initialize; + + private TestJDBCDriver(boolean initialize) { + this.initialize = initialize; + } + @Override protected void prepareStatements() throws SQLException { } @@ -43,7 +55,14 @@ public class TestJDBCDriver extends AbstractJDBCDriver { protected void createSchema() throws SQLException { try { connection.createStatement().execute(sqlProvider.createNodeManagerStoreTableSQL()); + if (initialize) { + connection.createStatement().execute(sqlProvider.createNodeIdSQL()); + connection.createStatement().execute(sqlProvider.createStateSQL()); + connection.createStatement().execute(sqlProvider.createLiveLockSQL()); + connection.createStatement().execute(sqlProvider.createBackupLockSQL()); + } } catch (SQLException e) { + Assert.fail(e.getMessage()); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java index 6caae961d9..1661df91b8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.tests.integration.jdbc.store.journal; import java.sql.DriverManager; +import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -26,40 +27,38 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl; -import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.DERBY; - public class JDBCJournalTest extends ActiveMQTestBase { @Rule public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule(); - private static final String JOURNAL_TABLE_NAME = "MESSAGE_JOURNAL"; - - private static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver"; - private JDBCJournalImpl journal; - private String jdbcUrl; - private ScheduledExecutorService scheduledExecutorService; private ExecutorService executorService; + private SQLProvider sqlProvider; + + private DatabaseStorageConfiguration dbConf; + @After @Override public void tearDown() throws Exception { @@ -77,19 +76,49 @@ public class JDBCJournalTest extends ActiveMQTestBase { @Before public void setup() throws Exception { + dbConf = createDefaultDatabaseStorageConfiguration(); + sqlProvider = JDBCUtils.getSQLProvider( + dbConf.getJdbcDriverClassName(), + dbConf.getMessageTableName(), + SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL); scheduledExecutorService = new ScheduledThreadPoolExecutor(5); executorService = Executors.newSingleThreadExecutor(); - jdbcUrl = "jdbc:derby:target/data;create=true"; - SQLProvider.Factory factory = new PropertySQLProvider.Factory(DERBY); - journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME, SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService, new IOCriticalErrorListener() { + journal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcDriverClassName(), sqlProvider, scheduledExecutorService, executorService, new IOCriticalErrorListener() { @Override public void onIOException(Throwable code, String message, SequentialFile file) { } - },5); + }, 5); journal.start(); } + @Test + public void testRestartEmptyJournal() throws SQLException { + Assert.assertTrue(journal.isStarted()); + Assert.assertEquals(0, journal.getNumberOfRecords()); + journal.stop(); + journal.start(); + Assert.assertTrue(journal.isStarted()); + } + + @Test + public void testConcurrentEmptyJournal() throws SQLException { + Assert.assertTrue(journal.isStarted()); + Assert.assertEquals(0, journal.getNumberOfRecords()); + final JDBCJournalImpl secondJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), + dbConf.getJdbcDriverClassName(), + sqlProvider, scheduledExecutorService, + executorService, (code, message, file) -> { + Assert.fail(message); + }, 5); + secondJournal.start(); + try { + Assert.assertTrue(secondJournal.isStarted()); + } finally { + secondJournal.stop(); + } + } + @Test public void testInsertRecords() throws Exception { int noRecords = 10;