NO-JIRA Database Page Store Improvements
This commit is contained in:
parent
a1012884cc
commit
a3c852eb04
|
@ -45,7 +45,6 @@ import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriv
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Integration point between Paging and JDBC
|
* Integration point between Paging and JDBC
|
||||||
|
@ -80,6 +79,8 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
||||||
|
|
||||||
private JDBCSequentialFile directoryList;
|
private JDBCSequentialFile directoryList;
|
||||||
|
|
||||||
|
private boolean started = false;
|
||||||
|
|
||||||
public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
|
public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
|
||||||
final StorageManager storageManager,
|
final StorageManager storageManager,
|
||||||
final long syncTimeout,
|
final long syncTimeout,
|
||||||
|
@ -93,27 +94,33 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
||||||
this.scheduledExecutor = scheduledExecutor;
|
this.scheduledExecutor = scheduledExecutor;
|
||||||
this.syncTimeout = syncTimeout;
|
this.syncTimeout = syncTimeout;
|
||||||
this.dbConf = dbConf;
|
this.dbConf = dbConf;
|
||||||
|
start();
|
||||||
if (dbConf.getDataSource() != null) {
|
|
||||||
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
|
|
||||||
if (sqlProviderFactory == null) {
|
|
||||||
sqlProviderFactory = new GenericSQLProvider.Factory();
|
|
||||||
}
|
|
||||||
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getPageStoreTableName()), executorFactory.getExecutor());
|
|
||||||
} else {
|
|
||||||
String driverClassName = dbConf.getJdbcDriverClassName();
|
|
||||||
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName()), executorFactory.getExecutor());
|
|
||||||
}
|
|
||||||
pagingFactoryFileFactory.start();
|
|
||||||
directoryList = (JDBCSequentialFile) pagingFactoryFileFactory.createSequentialFile(DIRECTORY_NAME);
|
|
||||||
directoryList.open();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized void start() throws Exception {
|
||||||
|
if (!started) {
|
||||||
|
if (dbConf.getDataSource() != null) {
|
||||||
|
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
|
||||||
|
if (sqlProviderFactory == null) {
|
||||||
|
sqlProviderFactory = new GenericSQLProvider.Factory();
|
||||||
|
}
|
||||||
|
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getPageStoreTableName()), executorFactory.getExecutor());
|
||||||
|
} else {
|
||||||
|
String driverClassName = dbConf.getJdbcDriverClassName();
|
||||||
|
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName()), executorFactory.getExecutor());
|
||||||
|
}
|
||||||
|
pagingFactoryFileFactory.start();
|
||||||
|
started = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
// Public --------------------------------------------------------
|
// Public --------------------------------------------------------
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop() {
|
public synchronized void stop() {
|
||||||
pagingFactoryFileFactory.stop();
|
if (started) {
|
||||||
|
pagingFactoryFileFactory.stop();
|
||||||
|
started = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -136,8 +143,8 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized SequentialFileFactory newFileFactory(final SimpleString address) throws Exception {
|
public synchronized SequentialFileFactory newFileFactory(final SimpleString address) throws Exception {
|
||||||
String guid = UUIDGenerator.getInstance().generateStringUUID();
|
String tableName = "" + storageManager.generateID();
|
||||||
SequentialFileFactory factory = newFileFactory(guid, true);
|
SequentialFileFactory factory = newFileFactory(tableName, true);
|
||||||
factory.start();
|
factory.start();
|
||||||
|
|
||||||
SequentialFile file = factory.createSequentialFile(PagingStoreFactoryDatabase.ADDRESS_FILE);
|
SequentialFile file = factory.createSequentialFile(PagingStoreFactoryDatabase.ADDRESS_FILE);
|
||||||
|
@ -146,6 +153,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
||||||
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(SimpleString.sizeofNullableString(address));
|
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(SimpleString.sizeofNullableString(address));
|
||||||
buffer.writeSimpleString(address);
|
buffer.writeSimpleString(address);
|
||||||
file.write(buffer, true);
|
file.write(buffer, true);
|
||||||
|
file.close();
|
||||||
return factory;
|
return factory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -157,15 +165,18 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
||||||
@Override
|
@Override
|
||||||
public synchronized List<PagingStore> reloadStores(final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception {
|
public synchronized List<PagingStore> reloadStores(final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception {
|
||||||
// We assume the directory list < Integer.MAX_VALUE (this is only a list of addresses).
|
// We assume the directory list < Integer.MAX_VALUE (this is only a list of addresses).
|
||||||
|
JDBCSequentialFile directoryList = (JDBCSequentialFile) pagingFactoryFileFactory.createSequentialFile(DIRECTORY_NAME);
|
||||||
|
directoryList.open();
|
||||||
|
|
||||||
int size = ((Long) directoryList.size()).intValue();
|
int size = ((Long) directoryList.size()).intValue();
|
||||||
ActiveMQBuffer buffer = readActiveMQBuffer(directoryList, size);
|
ActiveMQBuffer buffer = readActiveMQBuffer(directoryList, size);
|
||||||
|
|
||||||
ArrayList<PagingStore> storesReturn = new ArrayList<>();
|
ArrayList<PagingStore> storesReturn = new ArrayList<>();
|
||||||
|
|
||||||
while (buffer.readableBytes() > 0) {
|
while (buffer.readableBytes() > 0) {
|
||||||
SimpleString guid = buffer.readSimpleString();
|
SimpleString table = buffer.readSimpleString();
|
||||||
|
|
||||||
JDBCSequentialFileFactory factory = (JDBCSequentialFileFactory) newFileFactory(guid.toString(), false);
|
JDBCSequentialFileFactory factory = (JDBCSequentialFileFactory) newFileFactory(table.toString(), false);
|
||||||
factory.start();
|
factory.start();
|
||||||
|
|
||||||
JDBCSequentialFile addressFile = (JDBCSequentialFile) factory.createSequentialFile(ADDRESS_FILE);
|
JDBCSequentialFile addressFile = (JDBCSequentialFile) factory.createSequentialFile(ADDRESS_FILE);
|
||||||
|
@ -185,15 +196,28 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
||||||
|
|
||||||
storesReturn.add(store);
|
storesReturn.add(store);
|
||||||
}
|
}
|
||||||
|
directoryList.close();
|
||||||
return storesReturn;
|
return storesReturn;
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized SequentialFileFactory newFileFactory(final String directoryName, boolean writeToDirectory) throws Exception {
|
private synchronized SequentialFileFactory newFileFactory(final String directoryName, boolean writeToDirectory) throws Exception {
|
||||||
|
JDBCSequentialFile directoryList = (JDBCSequentialFile) pagingFactoryFileFactory.createSequentialFile(DIRECTORY_NAME);
|
||||||
|
directoryList.open();
|
||||||
SimpleString simpleString = SimpleString.toSimpleString(directoryName);
|
SimpleString simpleString = SimpleString.toSimpleString(directoryName);
|
||||||
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(simpleString.sizeof());
|
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(simpleString.sizeof());
|
||||||
buffer.writeSimpleString(simpleString);
|
buffer.writeSimpleString(simpleString);
|
||||||
if (writeToDirectory) directoryList.write(buffer, true);
|
if (writeToDirectory) directoryList.write(buffer, true);
|
||||||
return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName)), executorFactory.getExecutor());
|
directoryList.close();
|
||||||
|
|
||||||
|
SQLProvider sqlProvider = null;
|
||||||
|
if (dbConf.getDataSource() != null) {
|
||||||
|
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory() == null ? new GenericSQLProvider.Factory() : dbConf.getSqlProviderFactory();
|
||||||
|
sqlProvider = sqlProviderFactory.create(getTableNameForGUID(directoryName));
|
||||||
|
} else {
|
||||||
|
sqlProvider = JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName));
|
||||||
|
}
|
||||||
|
|
||||||
|
return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), sqlProvider, executorFactory.getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getTableNameForGUID(String guid) {
|
private String getTableNameForGUID(String guid) {
|
||||||
|
|
Loading…
Reference in New Issue