This commit is contained in:
Clebert Suconic 2018-04-18 09:53:17 -04:00
commit e3f426b4e3
5 changed files with 310 additions and 28 deletions

View File

@ -110,7 +110,7 @@ public abstract class AbstractJDBCDriver {
protected abstract void createSchema() throws SQLException; protected abstract void createSchema() throws SQLException;
protected final void createTable(String... schemaSqls) throws SQLException { protected final void createTable(String... schemaSqls) throws SQLException {
createTableIfNotExists(connection, sqlProvider.getTableName(), schemaSqls); createTableIfNotExists(sqlProvider.getTableName(), schemaSqls);
} }
private void connect() throws SQLException { private void connect() throws SQLException {
@ -175,33 +175,78 @@ public abstract class AbstractJDBCDriver {
} }
} }
private static void createTableIfNotExists(Connection connection, private void createTableIfNotExists(String tableName, String... sqls) throws SQLException {
String tableName,
String... sqls) throws SQLException {
logger.tracef("Validating if table %s didn't exist before creating", tableName); logger.tracef("Validating if table %s didn't exist before creating", tableName);
try { try {
connection.setAutoCommit(false); connection.setAutoCommit(false);
final boolean tableExists;
try (ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null)) { try (ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null)) {
if (rs != null && !rs.next()) { if ((rs == null) || (rs != null && !rs.next())) {
tableExists = false;
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.tracef("Table %s did not exist, creating it with SQL=%s", tableName, Arrays.toString(sqls)); logger.tracef("Table %s did not exist, creating it with SQL=%s", tableName, Arrays.toString(sqls));
} }
final SQLWarning sqlWarning = rs.getWarnings(); if (rs != null) {
if (sqlWarning != null) { final SQLWarning sqlWarning = rs.getWarnings();
logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), sqlWarning)); if (sqlWarning != null) {
logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), sqlWarning));
}
} }
try (Statement statement = connection.createStatement()) { } else {
for (String sql : sqls) { tableExists = true;
statement.executeUpdate(sql); }
final SQLWarning statementSqlWarning = statement.getWarnings(); }
if (statementSqlWarning != null) { if (tableExists) {
logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), statementSqlWarning, sql)); logger.tracef("Validating if the existing table %s is initialized or not", tableName);
try (Statement statement = connection.createStatement();
ResultSet cntRs = statement.executeQuery(sqlProvider.getCountJournalRecordsSQL())) {
logger.tracef("Validation of the existing table %s initialization is started", tableName);
int rows;
if (cntRs.next() && (rows = cntRs.getInt(1)) > 0) {
logger.tracef("Table %s did exist but is not empty. Skipping initialization. Found %d rows.", tableName, rows);
if (logger.isDebugEnabled()) {
final long expectedRows = Stream.of(sqls).map(String::toUpperCase).filter(sql -> sql.contains("INSERT INTO")).count();
if (rows < expectedRows) {
logger.debug("Table " + tableName + " was expected to contain " + expectedRows + " rows while it has " + rows + " rows.");
} }
} }
connection.commit();
return;
} else {
sqls = Stream.of(sqls).filter(sql -> {
final String upperCaseSql = sql.toUpperCase();
return !(upperCaseSql.contains("CREATE TABLE") || upperCaseSql.contains("CREATE INDEX"));
}).toArray(String[]::new);
if (sqls.length > 0) {
logger.tracef("Table %s did exist but is empty. Starting initialization.", tableName);
} else {
logger.tracef("Table %s did exist but is empty. Initialization completed: no initialization statements left.", tableName);
}
}
} catch (SQLException e) {
logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder("Can't verify the initialization of table ").append(tableName).append(" due to:"), e, sqlProvider.getCountJournalRecordsSQL()));
try {
connection.rollback();
} catch (SQLException rollbackEx) {
logger.debug("Rollback failed while validating initialization of a table", rollbackEx);
}
connection.setAutoCommit(false);
logger.tracef("Table %s seems to exist, but we can't verify the initialization. Keep trying to create and initialize.", tableName);
}
}
if (sqls.length > 0) {
try (Statement statement = connection.createStatement()) {
for (String sql : sqls) {
statement.executeUpdate(sql);
final SQLWarning statementSqlWarning = statement.getWarnings();
if (statementSqlWarning != null) {
logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), statementSqlWarning, sql));
}
} }
} }
connection.commit();
} }
connection.commit();
} catch (SQLException e) { } catch (SQLException e) {
final String sqlStatements = Stream.of(sqls).collect(Collectors.joining("\n")); final String sqlStatements = Stream.of(sqls).collect(Collectors.joining("\n"));
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, sqlStatements)); logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, sqlStatements));

View File

@ -18,6 +18,8 @@
package org.apache.activemq.artemis.core.server.impl.jdbc; package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -33,13 +35,31 @@ import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
@RunWith(Parameterized.class)
public class JdbcLeaseLockTest extends ActiveMQTestBase { public class JdbcLeaseLockTest extends ActiveMQTestBase {
private JdbcSharedStateManager jdbcSharedStateManager; private JdbcSharedStateManager jdbcSharedStateManager;
private DatabaseStorageConfiguration dbConf; private DatabaseStorageConfiguration dbConf;
private SQLProvider sqlProvider; private SQLProvider sqlProvider;
@Parameterized.Parameters(name = "create_tables_prior_test")
public static List<Object[]> data() {
return Arrays.asList(new Object[][] {
{true, null},
{false, null}
});
}
@Parameter(0)
public boolean withExistingTable;
@Parameter(1)
public Object result;
private LeaseLock lock() { private LeaseLock lock() {
return lock(dbConf.getJdbcLockExpirationMillis()); return lock(dbConf.getJdbcLockExpirationMillis());
} }
@ -59,12 +79,23 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
} }
@Before @Before
public void createLockTable() { public void createLockTable() throws Exception {
dbConf = createDefaultDatabaseStorageConfiguration(); dbConf = createDefaultDatabaseStorageConfiguration();
sqlProvider = JDBCUtils.getSQLProvider( sqlProvider = JDBCUtils.getSQLProvider(
dbConf.getJdbcDriverClassName(), dbConf.getJdbcDriverClassName(),
dbConf.getNodeManagerStoreTableName(), dbConf.getNodeManagerStoreTableName(),
SQLProvider.DatabaseStoreType.NODE_MANAGER); SQLProvider.DatabaseStoreType.NODE_MANAGER);
if (withExistingTable) {
TestJDBCDriver testDriver = TestJDBCDriver
.usingConnectionUrl(
dbConf.getJdbcConnectionUrl(),
dbConf.getJdbcDriverClassName(),
sqlProvider);
testDriver.start();
testDriver.stop();
}
jdbcSharedStateManager = JdbcSharedStateManager jdbcSharedStateManager = JdbcSharedStateManager
.usingConnectionUrl( .usingConnectionUrl(
UUID.randomUUID().toString(), UUID.randomUUID().toString(),

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.util.UUID;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
import org.junit.Test;
public class JdbcSharedStateManagerTest extends ActiveMQTestBase {
private DatabaseStorageConfiguration dbConf;
private SQLProvider sqlProvider;
@Before
public void configure() {
dbConf = createDefaultDatabaseStorageConfiguration();
sqlProvider = JDBCUtils.getSQLProvider(
dbConf.getJdbcDriverClassName(),
dbConf.getNodeManagerStoreTableName(),
SQLProvider.DatabaseStoreType.NODE_MANAGER);
}
private TestJDBCDriver createFakeDriver(boolean initializeTable) {
return TestJDBCDriver.usingConnectionUrl(
dbConf.getJdbcConnectionUrl(),
dbConf.getJdbcDriverClassName(),
sqlProvider,
initializeTable);
}
private JdbcSharedStateManager createSharedStateManager() {
return JdbcSharedStateManager.usingConnectionUrl(
UUID.randomUUID().toString(),
dbConf.getJdbcLockExpirationMillis(),
dbConf.getJdbcMaxAllowedMillisFromDbTime(),
dbConf.getJdbcConnectionUrl(),
dbConf.getJdbcDriverClassName(),
sqlProvider);
}
@Test(timeout = 10000)
public void shouldStartIfTableNotExist() throws Exception {
final JdbcSharedStateManager sharedStateManager = createSharedStateManager();
try {
sharedStateManager.destroy();
} finally {
sharedStateManager.stop();
}
}
@Test(timeout = 10000)
public void shouldStartIfTableExistEmpty() throws Exception {
final TestJDBCDriver fakeDriver = createFakeDriver(false);
fakeDriver.start();
final JdbcSharedStateManager sharedStateManager = createSharedStateManager();
sharedStateManager.stop();
try {
fakeDriver.destroy();
} finally {
fakeDriver.stop();
}
}
@Test(timeout = 10000)
public void shouldStartIfTableExistInitialized() throws Exception {
final TestJDBCDriver fakeDriver = createFakeDriver(true);
fakeDriver.start();
final JdbcSharedStateManager sharedStateManager = createSharedStateManager();
sharedStateManager.stop();
try {
fakeDriver.destroy();
} finally {
fakeDriver.stop();
}
}
@Test(timeout = 10000)
public void shouldStartTwoIfTableNotExist() throws Exception {
final JdbcSharedStateManager liveSharedStateManager = createSharedStateManager();
final JdbcSharedStateManager backupSharedStateManager = createSharedStateManager();
backupSharedStateManager.stop();
try {
liveSharedStateManager.destroy();
} finally {
liveSharedStateManager.stop();
}
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.sql.SQLException;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.junit.Assert;
public class TestJDBCDriver extends AbstractJDBCDriver {
public static TestJDBCDriver usingConnectionUrl(String jdbcConnectionUrl,
String jdbcDriverClass,
SQLProvider provider) {
return usingConnectionUrl(jdbcConnectionUrl, jdbcDriverClass, provider, false);
}
public static TestJDBCDriver usingConnectionUrl(String jdbcConnectionUrl,
String jdbcDriverClass,
SQLProvider provider,
boolean initialize) {
TestJDBCDriver driver = new TestJDBCDriver(initialize);
driver.setSqlProvider(provider);
driver.setJdbcConnectionUrl(jdbcConnectionUrl);
driver.setJdbcDriverClass(jdbcDriverClass);
return driver;
}
private boolean initialize;
private TestJDBCDriver(boolean initialize) {
this.initialize = initialize;
}
@Override
protected void prepareStatements() throws SQLException {
}
@Override
protected void createSchema() throws SQLException {
try {
connection.createStatement().execute(sqlProvider.createNodeManagerStoreTableSQL());
if (initialize) {
connection.createStatement().execute(sqlProvider.createNodeIdSQL());
connection.createStatement().execute(sqlProvider.createStateSQL());
connection.createStatement().execute(sqlProvider.createLiveLockSQL());
connection.createStatement().execute(sqlProvider.createBackupLockSQL());
}
} catch (SQLException e) {
Assert.fail(e.getMessage());
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.jdbc.store.journal; package org.apache.activemq.artemis.tests.integration.jdbc.store.journal;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -26,40 +27,38 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl; import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.DERBY;
public class JDBCJournalTest extends ActiveMQTestBase { public class JDBCJournalTest extends ActiveMQTestBase {
@Rule @Rule
public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule(); public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();
private static final String JOURNAL_TABLE_NAME = "MESSAGE_JOURNAL";
private static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
private JDBCJournalImpl journal; private JDBCJournalImpl journal;
private String jdbcUrl;
private ScheduledExecutorService scheduledExecutorService; private ScheduledExecutorService scheduledExecutorService;
private ExecutorService executorService; private ExecutorService executorService;
private SQLProvider sqlProvider;
private DatabaseStorageConfiguration dbConf;
@After @After
@Override @Override
public void tearDown() throws Exception { public void tearDown() throws Exception {
@ -77,11 +76,14 @@ public class JDBCJournalTest extends ActiveMQTestBase {
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
dbConf = createDefaultDatabaseStorageConfiguration();
sqlProvider = JDBCUtils.getSQLProvider(
dbConf.getJdbcDriverClassName(),
dbConf.getMessageTableName(),
SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL);
scheduledExecutorService = new ScheduledThreadPoolExecutor(5); scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
executorService = Executors.newSingleThreadExecutor(); executorService = Executors.newSingleThreadExecutor();
jdbcUrl = "jdbc:derby:target/data;create=true"; journal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcDriverClassName(), sqlProvider, scheduledExecutorService, executorService, new IOCriticalErrorListener() {
SQLProvider.Factory factory = new PropertySQLProvider.Factory(DERBY);
journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME, SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService, new IOCriticalErrorListener() {
@Override @Override
public void onIOException(Throwable code, String message, SequentialFile file) { public void onIOException(Throwable code, String message, SequentialFile file) {
@ -90,6 +92,33 @@ public class JDBCJournalTest extends ActiveMQTestBase {
journal.start(); journal.start();
} }
@Test
public void testRestartEmptyJournal() throws SQLException {
Assert.assertTrue(journal.isStarted());
Assert.assertEquals(0, journal.getNumberOfRecords());
journal.stop();
journal.start();
Assert.assertTrue(journal.isStarted());
}
@Test
public void testConcurrentEmptyJournal() throws SQLException {
Assert.assertTrue(journal.isStarted());
Assert.assertEquals(0, journal.getNumberOfRecords());
final JDBCJournalImpl secondJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(),
dbConf.getJdbcDriverClassName(),
sqlProvider, scheduledExecutorService,
executorService, (code, message, file) -> {
Assert.fail(message);
});
secondJournal.start();
try {
Assert.assertTrue(secondJournal.isStarted());
} finally {
secondJournal.stop();
}
}
@Test @Test
public void testInsertRecords() throws Exception { public void testInsertRecords() throws Exception {
int noRecords = 10; int noRecords = 10;