ARTEMIS-1125 Persist JMS Bindings in Database on JDBC

This commit is contained in:
Martyn Taylor 2017-04-20 18:31:24 +01:00
parent 8a3f4ccd6f
commit 9c013e74cb
11 changed files with 83 additions and 13 deletions

View File

@ -428,11 +428,15 @@ public final class ActiveMQDefaultConfiguration {
// Default large messages table name, used with Database storage type
private static final String DEFAULT_LARGE_MESSAGES_TABLE_NAME = "LARGE_MESSAGES";
// Default large messages table name, used with Database storage type
// Default Page Store table name, used with Database storage type
private static final String DEFAULT_PAGE_STORE_TABLE_NAME = "PAGE_STORE";
private static final int DEFAULT_JDBC_NETWORK_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20);
// Default JMS Bingings table name, used with Database storage type
private static final String DEFAULT_JMS_BINDINGS_TABLE_NAME = "JMS_BINDINGS";
// Default period to wait between connection TTL checks
public static final long DEFAULT_CONNECTION_TTL_CHECK_INTERVAL = 2000;
@ -1180,6 +1184,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_JDBC_NETWORK_TIMEOUT;
}
public static String getDefaultJMSBindingsTableName() {
return DEFAULT_JMS_BINDINGS_TABLE_NAME;
}
public static long getDefaultConnectionTtlCheckInterval() {
return DEFAULT_CONNECTION_TTL_CHECK_INTERVAL;
}

View File

@ -22,11 +22,15 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.Journal;
@ -36,6 +40,10 @@ import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.replication.ReplicatedJournal;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.jms.persistence.JMSStorageManager;
import org.apache.activemq.artemis.jms.persistence.config.PersistedBindings;
import org.apache.activemq.artemis.jms.persistence.config.PersistedConnectionFactory;
@ -75,10 +83,13 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager {
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public JMSJournalStorageManagerImpl(ExecutorFactory ioExecutors,
final IDGenerator idGenerator,
final Configuration config,
final ReplicationManager replicator) {
final ReplicationManager replicator,
final ScheduledExecutorService scheduledExecutorService,
final IOCriticalErrorListener criticalErrorListener) {
final EnumSet<JournalType> supportedJournalTypes = EnumSet.allOf(JournalType.class);
if (!supportedJournalTypes.contains(config.getJournalType())) {
throw new IllegalArgumentException("Only " + supportedJournalTypes + " are supported Journal types");
@ -88,14 +99,29 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager {
createDir = config.isCreateBindingsDir();
SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1);
Journal localJMS = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1, 0);
if (replicator != null) {
jmsJournal = new ReplicatedJournal((byte) 2, localJMS, replicator);
} else {
Journal localJMS;
if (config.getStoreConfiguration() != null && config.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration();
if (dbConf.getDataSource() != null) {
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
if (sqlProviderFactory == null) {
sqlProviderFactory = new GenericSQLProvider.Factory();
}
localJMS = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getJMSBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, ioExecutors.getExecutor(), criticalErrorListener);
} else {
String driverClassName = dbConf.getJdbcDriverClassName();
localJMS = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getJMSBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, ioExecutors.getExecutor(), criticalErrorListener);
}
jmsJournal = localJMS;
} else {
SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1);
localJMS = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1, 0);
if (replicator != null) {
jmsJournal = new ReplicatedJournal((byte) 2, localJMS, replicator);
} else {
jmsJournal = localJMS;
}
}
this.idGenerator = idGenerator;

View File

@ -1523,7 +1523,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
private void initJournal() throws Exception {
this.coreConfig = server.getConfiguration();
createJournal();
createJournal(server);
storage.load();
@ -1547,12 +1547,12 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
/**
* @throws Exception
*/
private void createJournal() throws Exception {
private void createJournal(ActiveMQServer activeMQserver) throws Exception {
if (storage != null) {
storage.stop();
}
if (coreConfig.isPersistenceEnabled()) {
storage = new JMSJournalStorageManagerImpl(server.getIOExecutorFactory(), new TimeAndCounterIDGenerator(), server.getConfiguration(), server.getReplicationManager());
storage = new JMSJournalStorageManagerImpl(server.getIOExecutorFactory(), new TimeAndCounterIDGenerator(), server.getConfiguration(), server.getReplicationManager(), server.getScheduledPool(), activeMQserver.getCriticalIOErrorListener());
} else {
storage = new NullJMSStorageManagerImpl();
}

View File

@ -32,6 +32,8 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
private String pageStoreTableName = ActiveMQDefaultConfiguration.getDefaultPageStoreTableName();
private String jmsBindingsTableName = ActiveMQDefaultConfiguration.getDefaultJMSBindingsTableName();
private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl();
private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName();
@ -79,6 +81,14 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
this.pageStoreTableName = pageStoreTableName;
}
public String getJMSBindingsTableName() {
return jmsBindingsTableName;
}
public void setJMSBindingsTableName(String jmsBindingsTableName) {
this.jmsBindingsTableName = jmsBindingsTableName;
}
public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
this.jdbcConnectionUrl = jdbcConnectionUrl;
}

View File

@ -1157,6 +1157,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
conf.setMessageTableName(getString(storeNode, "message-table-name", conf.getMessageTableName(), Validators.NO_CHECK));
conf.setLargeMessageTableName(getString(storeNode, "large-message-table-name", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
conf.setPageStoreTableName(getString(storeNode, "page-store-table-name", conf.getPageStoreTableName(), Validators.NO_CHECK));
conf.setJMSBindingsTableName(getString(storeNode, "jms-bindings-table-name", conf.getJMSBindingsTableName(), Validators.NO_CHECK));
conf.setJdbcConnectionUrl(getString(storeNode, "jdbc-connection-url", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
conf.setJdbcDriverClassName(getString(storeNode, "jdbc-driver-class-name", conf.getJdbcDriverClassName(), Validators.NO_CHECK));
conf.setJdbcNetworkTimeout(getInteger(storeNode, "jdbc-network-timeout", conf.getJdbcNetworkTimeout(), Validators.NO_CHECK));

View File

@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.persistence.OperationContext;
@ -421,4 +422,6 @@ public interface ActiveMQServer extends ActiveMQComponent {
boolean addClientConnection(String clientId, boolean unique);
void removeClientConnection(String clientId);
IOCriticalErrorListener getCriticalIOErrorListener();
}

View File

@ -2493,6 +2493,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
public IOCriticalErrorListener getCriticalIOErrorListener() {
return shutdownOnCriticalIO;
}
private final class ConfigurationFileReloader implements ReloadCallback {
@Override

View File

@ -1714,6 +1714,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="jms-bindings-table-name" type="xsd:string" minOccurs="1" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The table name used to store JMS binding journal entries
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all>
</xsd:complexType>

View File

@ -414,6 +414,7 @@ public abstract class ActiveMQTestBase extends Assert {
protected Configuration createDefaultJDBCConfig(boolean isNetty) throws Exception {
Configuration configuration = createDefaultConfig(isNetty);
setDBStoreType(configuration);
configuration.setPersistenceEnabled(true);
return configuration;
}
@ -462,6 +463,7 @@ public abstract class ActiveMQTestBase extends Assert {
dbStorageConfiguration.setMessageTableName("MESSAGE");
dbStorageConfiguration.setLargeMessageTableName("LARGE_MESSAGE");
dbStorageConfiguration.setPageStoreTableName("PAGE_STORE");
dbStorageConfiguration.setJMSBindingsTableName("JMS_BINDINGS");
dbStorageConfiguration.setJdbcDriverClassName(getJDBCClassName());
configuration.setStoreConfiguration(dbStorageConfiguration);

View File

@ -420,6 +420,14 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a
The name of the table in which messages and related data will be persisted for the ActiveMQ Artemis server. Specifying table names allows users to share single database amongst multiple servers, without interference.
- `page-store-table-name`
The name of the table in which paged messages are stored. Specifying table names allows users to share single database amongst multiple servers, without interference.
- `jms-bindings-table-name`
The name of the table in which JMS bindings data will be persisted for the ActiveMQ Artemis server. Specifying table names allows users to share single database amongst multiple servers, without interference.
- `jdbc-driver-class-name`
The fully qualified class name of the desired database Driver.

View File

@ -155,7 +155,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
* @throws Exception
*/
protected void createJMSStorage() throws Exception {
jmsJournal = new JMSJournalStorageManagerImpl(null, new TimeAndCounterIDGenerator(), createDefaultInVMConfig(), null);
jmsJournal = new JMSJournalStorageManagerImpl(null, new TimeAndCounterIDGenerator(), createDefaultInVMConfig(), null, scheduledExecutorService, null);
addActiveMQComponent(jmsJournal);
jmsJournal.start();
jmsJournal.load();