diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 5d8624336a..48b46a945d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -428,11 +428,15 @@ public final class ActiveMQDefaultConfiguration { // Default large messages table name, used with Database storage type private static final String DEFAULT_LARGE_MESSAGES_TABLE_NAME = "LARGE_MESSAGES"; - // Default large messages table name, used with Database storage type + // Default Page Store table name, used with Database storage type private static final String DEFAULT_PAGE_STORE_TABLE_NAME = "PAGE_STORE"; + private static final int DEFAULT_JDBC_NETWORK_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20); + // Default JMS Bingings table name, used with Database storage type + private static final String DEFAULT_JMS_BINDINGS_TABLE_NAME = "JMS_BINDINGS"; + // Default period to wait between connection TTL checks public static final long DEFAULT_CONNECTION_TTL_CHECK_INTERVAL = 2000; @@ -1180,6 +1184,10 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_JDBC_NETWORK_TIMEOUT; } + public static String getDefaultJMSBindingsTableName() { + return DEFAULT_JMS_BINDINGS_TABLE_NAME; + } + public static long getDefaultConnectionTtlCheckInterval() { return DEFAULT_CONNECTION_TTL_CHECK_INTERVAL; } diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java index bc288db496..9cd3976c20 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java @@ -22,11 +22,15 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.StoreConfiguration; +import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.Journal; @@ -36,6 +40,10 @@ import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.core.replication.ReplicatedJournal; import org.apache.activemq.artemis.core.replication.ReplicationManager; import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; +import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl; +import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jms.persistence.JMSStorageManager; import org.apache.activemq.artemis.jms.persistence.config.PersistedBindings; import org.apache.activemq.artemis.jms.persistence.config.PersistedConnectionFactory; @@ -75,10 +83,13 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager { // Static -------------------------------------------------------- // Constructors -------------------------------------------------- + public JMSJournalStorageManagerImpl(ExecutorFactory ioExecutors, final IDGenerator idGenerator, final Configuration config, - final ReplicationManager replicator) { + final ReplicationManager replicator, + final ScheduledExecutorService scheduledExecutorService, + final IOCriticalErrorListener criticalErrorListener) { final EnumSet supportedJournalTypes = EnumSet.allOf(JournalType.class); if (!supportedJournalTypes.contains(config.getJournalType())) { throw new IllegalArgumentException("Only " + supportedJournalTypes + " are supported Journal types"); @@ -88,14 +99,29 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager { createDir = config.isCreateBindingsDir(); - SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1); - - Journal localJMS = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1, 0); - - if (replicator != null) { - jmsJournal = new ReplicatedJournal((byte) 2, localJMS, replicator); - } else { + Journal localJMS; + if (config.getStoreConfiguration() != null && config.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) { + DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration(); + if (dbConf.getDataSource() != null) { + SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory(); + if (sqlProviderFactory == null) { + sqlProviderFactory = new GenericSQLProvider.Factory(); + } + localJMS = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getJMSBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, ioExecutors.getExecutor(), criticalErrorListener); + } else { + String driverClassName = dbConf.getJdbcDriverClassName(); + localJMS = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getJMSBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, ioExecutors.getExecutor(), criticalErrorListener); + } jmsJournal = localJMS; + } else { + SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1); + localJMS = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1, 0); + + if (replicator != null) { + jmsJournal = new ReplicatedJournal((byte) 2, localJMS, replicator); + } else { + jmsJournal = localJMS; + } } this.idGenerator = idGenerator; diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java index b5bc7015a8..9ce3299408 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java @@ -1523,7 +1523,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback private void initJournal() throws Exception { this.coreConfig = server.getConfiguration(); - createJournal(); + createJournal(server); storage.load(); @@ -1547,12 +1547,12 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback /** * @throws Exception */ - private void createJournal() throws Exception { + private void createJournal(ActiveMQServer activeMQserver) throws Exception { if (storage != null) { storage.stop(); } if (coreConfig.isPersistenceEnabled()) { - storage = new JMSJournalStorageManagerImpl(server.getIOExecutorFactory(), new TimeAndCounterIDGenerator(), server.getConfiguration(), server.getReplicationManager()); + storage = new JMSJournalStorageManagerImpl(server.getIOExecutorFactory(), new TimeAndCounterIDGenerator(), server.getConfiguration(), server.getReplicationManager(), server.getScheduledPool(), activeMQserver.getCriticalIOErrorListener()); } else { storage = new NullJMSStorageManagerImpl(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java index 76626c015c..285d1cfee4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java @@ -32,6 +32,8 @@ public class DatabaseStorageConfiguration implements StoreConfiguration { private String pageStoreTableName = ActiveMQDefaultConfiguration.getDefaultPageStoreTableName(); + private String jmsBindingsTableName = ActiveMQDefaultConfiguration.getDefaultJMSBindingsTableName(); + private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl(); private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName(); @@ -79,6 +81,14 @@ public class DatabaseStorageConfiguration implements StoreConfiguration { this.pageStoreTableName = pageStoreTableName; } + public String getJMSBindingsTableName() { + return jmsBindingsTableName; + } + + public void setJMSBindingsTableName(String jmsBindingsTableName) { + this.jmsBindingsTableName = jmsBindingsTableName; + } + public void setJdbcConnectionUrl(String jdbcConnectionUrl) { this.jdbcConnectionUrl = jdbcConnectionUrl; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index c36699e583..0b82a9014e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -1157,6 +1157,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { conf.setMessageTableName(getString(storeNode, "message-table-name", conf.getMessageTableName(), Validators.NO_CHECK)); conf.setLargeMessageTableName(getString(storeNode, "large-message-table-name", conf.getJdbcConnectionUrl(), Validators.NO_CHECK)); conf.setPageStoreTableName(getString(storeNode, "page-store-table-name", conf.getPageStoreTableName(), Validators.NO_CHECK)); + conf.setJMSBindingsTableName(getString(storeNode, "jms-bindings-table-name", conf.getJMSBindingsTableName(), Validators.NO_CHECK)); conf.setJdbcConnectionUrl(getString(storeNode, "jdbc-connection-url", conf.getJdbcConnectionUrl(), Validators.NO_CHECK)); conf.setJdbcDriverClassName(getString(storeNode, "jdbc-driver-class-name", conf.getJdbcDriverClassName(), Validators.NO_CHECK)); conf.setJdbcNetworkTimeout(getInteger(storeNode, "jdbc-network-timeout", conf.getJdbcNetworkTimeout(), Validators.NO_CHECK)); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index f8a5d7516b..8703e834f9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.DivertConfiguration; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.persistence.OperationContext; @@ -421,4 +422,6 @@ public interface ActiveMQServer extends ActiveMQComponent { boolean addClientConnection(String clientId, boolean unique); void removeClientConnection(String clientId); + + IOCriticalErrorListener getCriticalIOErrorListener(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 31aa1782d4..f25b11530a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2493,6 +2493,11 @@ public class ActiveMQServerImpl implements ActiveMQServer { } + @Override + public IOCriticalErrorListener getCriticalIOErrorListener() { + return shutdownOnCriticalIO; + } + private final class ConfigurationFileReloader implements ReloadCallback { @Override diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 6fa5fc45cd..d0c0b259aa 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -1714,6 +1714,13 @@ + + + + The table name used to store JMS binding journal entries + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 7d10245d62..47c916aca2 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -414,6 +414,7 @@ public abstract class ActiveMQTestBase extends Assert { protected Configuration createDefaultJDBCConfig(boolean isNetty) throws Exception { Configuration configuration = createDefaultConfig(isNetty); setDBStoreType(configuration); + configuration.setPersistenceEnabled(true); return configuration; } @@ -462,6 +463,7 @@ public abstract class ActiveMQTestBase extends Assert { dbStorageConfiguration.setMessageTableName("MESSAGE"); dbStorageConfiguration.setLargeMessageTableName("LARGE_MESSAGE"); dbStorageConfiguration.setPageStoreTableName("PAGE_STORE"); + dbStorageConfiguration.setJMSBindingsTableName("JMS_BINDINGS"); dbStorageConfiguration.setJdbcDriverClassName(getJDBCClassName()); configuration.setStoreConfiguration(dbStorageConfiguration); diff --git a/artemis-server/src/test/resources/database-store-config.xml b/artemis-server/src/test/resources/database-store-config.xml index 69f9da70f0..ed570ad828 100644 --- a/artemis-server/src/test/resources/database-store-config.xml +++ b/artemis-server/src/test/resources/database-store-config.xml @@ -26,6 +26,7 @@ MESSAGE_TABLE LARGE_MESSAGE_TABLE PAGE_STORE_TABLE + JMS_BINDINGS_TABLE org.apache.derby.jdbc.EmbeddedDriver diff --git a/docs/user-manual/en/persistence.md b/docs/user-manual/en/persistence.md index 7e07773eed..8a422edb91 100644 --- a/docs/user-manual/en/persistence.md +++ b/docs/user-manual/en/persistence.md @@ -420,6 +420,14 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a The name of the table in which messages and related data will be persisted for the ActiveMQ Artemis server. Specifying table names allows users to share single database amongst multiple servers, without interference. +- `page-store-table-name` + + The name of the table in which paged messages are stored. Specifying table names allows users to share single database amongst multiple servers, without interference. + +- `jms-bindings-table-name` + + The name of the table in which JMS bindings data will be persisted for the ActiveMQ Artemis server. Specifying table names allows users to share single database amongst multiple servers, without interference. + - `jdbc-driver-class-name` The fully qualified class name of the desired database Driver. diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java index a1043638c9..9504417edd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java @@ -155,7 +155,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase { * @throws Exception */ protected void createJMSStorage() throws Exception { - jmsJournal = new JMSJournalStorageManagerImpl(null, new TimeAndCounterIDGenerator(), createDefaultInVMConfig(), null); + jmsJournal = new JMSJournalStorageManagerImpl(null, new TimeAndCounterIDGenerator(), createDefaultInVMConfig(), null, scheduledExecutorService, null); addActiveMQComponent(jmsJournal); jmsJournal.start(); jmsJournal.load();