This commit is contained in:
Justin Bertram 2019-05-29 11:30:35 -05:00
commit 3e154ae26d
14 changed files with 191 additions and 17 deletions

View File

@ -56,13 +56,19 @@ public abstract class AbstractJDBCDriver {
private int networkTimeoutMillis;
private String user;
private String password;
public AbstractJDBCDriver() {
this.networkTimeoutExecutor = null;
this.networkTimeoutMillis = -1;
}
public AbstractJDBCDriver(SQLProvider sqlProvider, String jdbcConnectionUrl, String jdbcDriverClass) {
public AbstractJDBCDriver(SQLProvider sqlProvider, String jdbcConnectionUrl, String user, String password, String jdbcDriverClass) {
this.jdbcConnectionUrl = jdbcConnectionUrl;
this.user = user;
this.password = password;
this.jdbcDriverClass = jdbcDriverClass;
this.sqlProvider = sqlProvider;
this.networkTimeoutExecutor = null;
@ -140,7 +146,12 @@ public abstract class AbstractJDBCDriver {
throw new IllegalStateException("jdbcConnectionUrl is null or empty!");
}
final Driver dbDriver = getDriver(jdbcDriverClass);
connection = dbDriver.connect(jdbcConnectionUrl, new Properties());
Properties properties = new Properties();
if (user != null) {
properties.setProperty("user", user);
properties.setProperty("password", password);
}
connection = dbDriver.connect(jdbcConnectionUrl, properties);
if (logger.isTraceEnabled() && !(connection instanceof LoggingConnection)) {
this.connection = new LoggingConnection(connection, logger);
@ -328,6 +339,22 @@ public abstract class AbstractJDBCDriver {
this.jdbcConnectionUrl = jdbcConnectionUrl;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public void setJdbcDriverClass(String jdbcDriverClass) {
this.jdbcDriverClass = jdbcDriverClass;
}

View File

@ -31,6 +31,8 @@ class JDBCFileUtils {
static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass,
String jdbcConnectionUrl,
String user,
String password,
SQLProvider provider) throws SQLException {
final JDBCSequentialFileFactoryDriver dbDriver;
final PropertySQLProvider.Factory.SQLDialect sqlDialect = PropertySQLProvider.Factory.identifyDialect(driverClass);
@ -44,6 +46,8 @@ class JDBCFileUtils {
dbDriver.setSqlProvider(provider);
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
dbDriver.setJdbcDriverClass(driverClass);
dbDriver.setUser(user);
dbDriver.setPassword(password);
return dbDriver;
}

View File

@ -69,6 +69,8 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
}
public JDBCSequentialFileFactory(final String connectionUrl,
String userName,
String password,
final String className,
final SQLProvider sqlProvider,
Executor executor,
@ -76,7 +78,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
this.executor = executor;
this.criticalErrorListener = criticalErrorListener;
try {
this.dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
this.dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, userName, password, sqlProvider);
} catch (SQLException e) {
criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null);
}

View File

@ -109,13 +109,15 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
}
public JDBCJournalImpl(String jdbcUrl,
String user,
String password,
String jdbcDriverClass,
SQLProvider sqlProvider,
ScheduledExecutorService scheduledExecutorService,
Executor completeExecutor,
IOCriticalErrorListener criticalIOErrorListener,
long syncDelay) {
super(sqlProvider, jdbcUrl, jdbcDriverClass);
super(sqlProvider, jdbcUrl, user, password, jdbcDriverClass);
records = new ArrayList<>();
this.scheduledExecutorService = scheduledExecutorService;
this.completeExecutor = completeExecutor;

View File

@ -19,6 +19,8 @@ package org.apache.activemq.artemis.jdbc.file;
import java.nio.ByteBuffer;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@ -46,12 +48,15 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@RunWith(Parameterized.class)
public class JDBCSequentialFileFactoryTest {
@Rule
@ -63,13 +68,28 @@ public class JDBCSequentialFileFactoryTest {
private ExecutorService executor;
@Parameterized.Parameter
public boolean useAuthentication;
private String user = null;
private String password = null;
@Parameterized.Parameters(name = "authentication = {0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{{false}, {true}});
}
@Before
public void setup() throws Exception {
executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
if (useAuthentication) {
user = "testuser";
password = "testpassword";
System.setProperty("derby.connection.requireAuthentication", "true");
System.setProperty("derby.user." + user, password);
}
String connectionUrl = "jdbc:derby:target/data;create=true";
String tableName = "FILES";
factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName, SQLProvider.DatabaseStoreType.PAGE), executor, new IOCriticalErrorListener() {
factory = new JDBCSequentialFileFactory(connectionUrl, user, password, className, JDBCUtils.getSQLProvider(className, tableName, SQLProvider.DatabaseStoreType.PAGE), executor, new IOCriticalErrorListener() {
@Override
public void onIOException(Throwable code, String message, SequentialFile file) {
}
@ -86,9 +106,17 @@ public class JDBCSequentialFileFactoryTest {
@After
public void shutdownDerby() {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
if (useAuthentication) {
DriverManager.getConnection("jdbc:derby:;shutdown=true", user, password);
} else {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
}
} catch (Exception ignored) {
}
if (useAuthentication) {
System.clearProperty("derby.connection.requireAuthentication");
System.clearProperty("derby.user." + user);
}
}
@Test

View File

@ -36,6 +36,10 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl();
private String jdbcUser;
private String jdbcPassword;
private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName();
private DataSource dataSource;
@ -105,6 +109,22 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
return jdbcConnectionUrl;
}
public String getJdbcUser() {
return jdbcUser;
}
public void setJdbcUser(String jdbcUser) {
this.jdbcUser = jdbcUser;
}
public String getJdbcPassword() {
return jdbcPassword;
}
public void setJdbcPassword(String jdbcPassword) {
this.jdbcPassword = jdbcPassword;
}
public void setJdbcDriverClassName(String jdbcDriverClassName) {
this.jdbcDriverClassName = jdbcDriverClassName;
}

View File

@ -1323,13 +1323,13 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
STORE_TYPE_LIST.add("file-store");
}
private void parseStoreConfiguration(final Element e, final Configuration mainConfig) {
private void parseStoreConfiguration(final Element e, final Configuration mainConfig) throws Exception {
for (String storeType : STORE_TYPE_LIST) {
NodeList storeNodeList = e.getElementsByTagName(storeType);
if (storeNodeList.getLength() > 0) {
Element storeNode = (Element) storeNodeList.item(0);
if (storeNode.getTagName().equals("database-store")) {
mainConfig.setStoreConfiguration(createDatabaseStoreConfig(storeNode));
mainConfig.setStoreConfiguration(createDatabaseStoreConfig(storeNode, mainConfig));
} else if (storeNode.getTagName().equals("file-store")) {
mainConfig.setStoreConfiguration(createFileStoreConfig(storeNode));
}
@ -1561,7 +1561,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
return null;
}
private DatabaseStorageConfiguration createDatabaseStoreConfig(Element storeNode) {
private DatabaseStorageConfiguration createDatabaseStoreConfig(Element storeNode, Configuration mainConfig) throws Exception {
DatabaseStorageConfiguration conf = new DatabaseStorageConfiguration();
conf.setBindingsTableName(getString(storeNode, "bindings-table-name", conf.getBindingsTableName(), Validators.NO_CHECK));
conf.setMessageTableName(getString(storeNode, "message-table-name", conf.getMessageTableName(), Validators.NO_CHECK));
@ -1574,6 +1574,16 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
conf.setJdbcLockRenewPeriodMillis(getLong(storeNode, "jdbc-lock-renew-period", conf.getJdbcLockRenewPeriodMillis(), Validators.NO_CHECK));
conf.setJdbcLockExpirationMillis(getLong(storeNode, "jdbc-lock-expiration", conf.getJdbcLockExpirationMillis(), Validators.NO_CHECK));
conf.setJdbcJournalSyncPeriodMillis(getLong(storeNode, "jdbc-journal-sync-period", conf.getJdbcJournalSyncPeriodMillis(), Validators.NO_CHECK));
String jdbcUser = getString(storeNode, "jdbc-user", conf.getJdbcUser(), Validators.NO_CHECK);
if (jdbcUser != null) {
jdbcUser = PasswordMaskingUtil.resolveMask(mainConfig.isMaskPassword(), jdbcUser, mainConfig.getPasswordCodec());
}
conf.setJdbcUser(jdbcUser);
String password = getString(storeNode, "jdbc-password", conf.getJdbcPassword(), Validators.NO_CHECK);
if (password != null) {
password = PasswordMaskingUtil.resolveMask(mainConfig.isMaskPassword(), password, mainConfig.getPasswordCodec());
}
conf.setJdbcPassword(password);
return conf;
}

View File

@ -131,7 +131,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
} else {
String driverClassName = dbConf.getJdbcDriverClassName();
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), null, null, driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
}
final int jdbcNetworkTimeout = dbConf.getJdbcNetworkTimeout();
if (jdbcNetworkTimeout >= 0) {

View File

@ -72,9 +72,9 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executorFactory.getExecutor(), criticalErrorListener);
} else {
String driverClassName = dbConf.getJdbcDriverClassName();
bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener, dbConf.getJdbcJournalSyncPeriodMillis());
messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener, dbConf.getJdbcJournalSyncPeriodMillis());
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executorFactory.getExecutor(), criticalErrorListener);
bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcUser(), dbConf.getJdbcPassword(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener, dbConf.getJdbcJournalSyncPeriodMillis());
messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcUser(), dbConf.getJdbcPassword(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener, dbConf.getJdbcJournalSyncPeriodMillis());
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcUser(), dbConf.getJdbcPassword(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executorFactory.getExecutor(), criticalErrorListener);
}
final int networkTimeout = dbConf.getJdbcNetworkTimeout();
if (networkTimeout >= 0) {

View File

@ -2131,6 +2131,22 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="jdbc-user" type="xsd:string" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The JDBC User to use for connecting to the database, NB this will only work with drivers where support
DriverManager.getConnection(String url, String user, String password). This can be encrypted.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="jdbc-password" type="xsd:string" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The JDBC Password to use for connecting to the database, NB this will only work with drivers where support
DriverManager.getConnection(String url, String user, String password). This can be encrypted.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="message-table-name" type="xsd:string" minOccurs="1" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.config.impl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
@ -33,7 +34,10 @@ public class DatabaseStoreConfigurationTest extends ActiveMQTestBase {
public void databaseStoreConfigTest() throws Exception {
Configuration configuration = createConfiguration("database-store-config.xml");
ActiveMQServerImpl server = new ActiveMQServerImpl(configuration);
assertEquals(StoreConfiguration.StoreType.DATABASE, server.getConfiguration().getStoreConfiguration().getStoreType());
DatabaseStorageConfiguration storeConfiguration = (DatabaseStorageConfiguration) server.getConfiguration().getStoreConfiguration();
assertEquals(StoreConfiguration.StoreType.DATABASE, storeConfiguration.getStoreType());
assertEquals("sourcepassword", storeConfiguration.getJdbcUser());
assertEquals("targetpassword", storeConfiguration.getJdbcPassword());
}
@Test

View File

@ -22,6 +22,8 @@
<store>
<database-store>
<jdbc-connection-url>jdbc:derby:target/derby/database-store;create=true</jdbc-connection-url>
<jdbc-user>ENC(5493dd76567ee5ec269d11823973462f)</jdbc-user>
<jdbc-password>ENC(56a0db3b71043054269d11823973462f)</jdbc-password>
<bindings-table-name>BINDINGS_TABLE</bindings-table-name>
<message-table-name>MESSAGE_TABLE</message-table-name>
<large-message-table-name>LARGE_MESSAGE_TABLE</large-message-table-name>

View File

@ -488,6 +488,23 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a
Note that some DBMS (e.g. Oracle, 30 chars) have restrictions on the size of table names, this should be taken into consideration when configuring table names for the Artemis database store, pay particular attention to the page store table name, which can be appended with a unique ID of up to 20 characters. (for Oracle this would mean configuring a page-store-table-name of max size of 10 chars).
It is also possible to explicitly add the user and password rather than in the JDBC url if you need to encode it, this would look like:
```xml
<store>
<database-store>
<jdbc-connection-url>jdbc:derby:data/derby/database-store;create=true</jdbc-connection-url>
<jdbc-user>ENC(dasfn353cewc)</jdbc-user>
<jdbc-password>ENC(ucwiurfjtew345)</jdbc-password>
<bindings-table-name>BINDINGS_TABLE</bindings-table-name>
<message-table-name>MESSAGE_TABLE</message-table-name>
<page-store-table-name>MESSAGE_TABLE</page-store-table-name>
<large-message-table-name>LARGE_MESSAGES_TABLE</large-message-table-name>
<node-manager-store-table-name>NODE_MANAGER_TABLE</node-manager-store-table-name>
<jdbc-driver-class-name>org.apache.derby.jdbc.EmbeddedDriver</jdbc-driver-class-name>
</database-store>
</store>
```
## Zero Persistence
In some situations, zero persistence is sometimes required for a

View File

@ -19,6 +19,8 @@ package org.apache.activemq.artemis.tests.integration.jdbc.store.journal;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -43,7 +45,10 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class JDBCJournalTest extends ActiveMQTestBase {
@Rule
@ -59,31 +64,66 @@ public class JDBCJournalTest extends ActiveMQTestBase {
private DatabaseStorageConfiguration dbConf;
@Parameterized.Parameter
public boolean useAuthentication;
@Parameterized.Parameters(name = "authentication = {0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{{false}, {true}});
}
@After
@Override
public void tearDown() throws Exception {
journal.destroy();
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
if (useAuthentication) {
DriverManager.getConnection("jdbc:derby:;shutdown=true", getJdbcUser(), getJdbcPassword());
} else {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
}
} catch (Exception ignored) {
}
if (useAuthentication) {
System.clearProperty("derby.connection.requireAuthentication");
System.clearProperty("derby.user." + getJdbcUser());
}
scheduledExecutorService.shutdown();
scheduledExecutorService = null;
executorService.shutdown();
executorService = null;
}
protected String getJdbcUser() {
if (useAuthentication) {
return System.getProperty("jdbc.user", "testuser");
} else {
return null;
}
}
protected String getJdbcPassword() {
if (useAuthentication) {
return System.getProperty("jdbc.password", "testpassword");
} else {
return null;
}
}
@Before
public void setup() throws Exception {
dbConf = createDefaultDatabaseStorageConfiguration();
if (useAuthentication) {
System.setProperty("derby.connection.requireAuthentication", "true");
System.setProperty("derby.user." + getJdbcUser(), getJdbcPassword());
}
sqlProvider = JDBCUtils.getSQLProvider(
dbConf.getJdbcDriverClassName(),
dbConf.getMessageTableName(),
SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL);
scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
executorService = Executors.newSingleThreadExecutor();
journal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcDriverClassName(), sqlProvider, scheduledExecutorService, executorService, new IOCriticalErrorListener() {
journal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), getJdbcUser(), getJdbcPassword(), dbConf.getJdbcDriverClassName(), sqlProvider, scheduledExecutorService, executorService, new IOCriticalErrorListener() {
@Override
public void onIOException(Throwable code, String message, SequentialFile file) {
@ -106,6 +146,8 @@ public class JDBCJournalTest extends ActiveMQTestBase {
Assert.assertTrue(journal.isStarted());
Assert.assertEquals(0, journal.getNumberOfRecords());
final JDBCJournalImpl secondJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(),
getJdbcUser(),
getJdbcPassword(),
dbConf.getJdbcDriverClassName(),
sqlProvider, scheduledExecutorService,
executorService, (code, message, file) -> {