ARTEMIS-560 Add Support For JDBC Paging
This commit is contained in:
parent
a79094b6e5
commit
118c272c77
|
@ -432,6 +432,9 @@ public final class ActiveMQDefaultConfiguration {
|
|||
// Default large messages table name, used with Database storage type
|
||||
private static final String DEFAULT_LARGE_MESSAGES_TABLE_NAME = "LARGE_MESSAGES";
|
||||
|
||||
// Default large messages table name, used with Database storage type
|
||||
private static final String DEFAULT_PAGE_STORE_TABLE_NAME = "PAGE_STORE";
|
||||
|
||||
// Default period to wait between connection TTL checks
|
||||
public static final long DEFAULT_CONNECTION_TTL_CHECK_INTERVAL = 2000;
|
||||
|
||||
|
@ -1190,6 +1193,10 @@ public final class ActiveMQDefaultConfiguration {
|
|||
return DEFAULT_LARGE_MESSAGES_TABLE_NAME;
|
||||
}
|
||||
|
||||
public static String getDefaultPageStoreTableName() {
|
||||
return DEFAULT_PAGE_STORE_TABLE_NAME;
|
||||
}
|
||||
|
||||
public static long getDefaultConnectionTtlCheckInterval() {
|
||||
return DEFAULT_CONNECTION_TTL_CHECK_INTERVAL;
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.jboss.logging.Logger;
|
|||
/**
|
||||
* Class to hold common database functionality such as drivers and connections
|
||||
*/
|
||||
@SuppressWarnings("SynchronizeOnNonFinalField")
|
||||
public abstract class AbstractJDBCDriver {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(AbstractJDBCDriver.class);
|
||||
|
@ -66,11 +67,19 @@ public abstract class AbstractJDBCDriver {
|
|||
|
||||
public void start() throws SQLException {
|
||||
connect();
|
||||
synchronized (connection) {
|
||||
createSchema();
|
||||
prepareStatements();
|
||||
}
|
||||
}
|
||||
|
||||
public AbstractJDBCDriver(Connection connection, SQLProvider sqlProvider) {
|
||||
this.connection = connection;
|
||||
this.sqlProvider = sqlProvider;
|
||||
}
|
||||
|
||||
public void stop() throws SQLException {
|
||||
synchronized (connection) {
|
||||
if (sqlProvider.closeConnectionOnShutdown()) {
|
||||
try {
|
||||
connection.close();
|
||||
|
@ -80,6 +89,7 @@ public abstract class AbstractJDBCDriver {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void prepareStatements() throws SQLException;
|
||||
|
||||
|
@ -90,6 +100,7 @@ public abstract class AbstractJDBCDriver {
|
|||
}
|
||||
|
||||
private void connect() throws SQLException {
|
||||
if (connection == null) {
|
||||
if (dataSource != null) {
|
||||
try {
|
||||
connection = dataSource.getConnection();
|
||||
|
@ -117,6 +128,7 @@ public abstract class AbstractJDBCDriver {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void destroy() throws Exception {
|
||||
final String dropTableSql = "DROP TABLE " + sqlProvider.getTableName();
|
||||
|
@ -206,9 +218,11 @@ public abstract class AbstractJDBCDriver {
|
|||
return connection;
|
||||
}
|
||||
|
||||
public void setConnection(Connection connection) {
|
||||
public final void setConnection(Connection connection) {
|
||||
if (connection == null) {
|
||||
this.connection = connection;
|
||||
}
|
||||
}
|
||||
|
||||
public void setSqlProvider(SQLProvider sqlProvider) {
|
||||
this.sqlProvider = sqlProvider;
|
||||
|
@ -225,4 +239,5 @@ public abstract class AbstractJDBCDriver {
|
|||
public void setDataSource(DataSource dataSource) {
|
||||
this.dataSource = dataSource;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ public class DerbySQLProvider extends GenericSQLProvider {
|
|||
private final String appendToFileSQL;
|
||||
|
||||
private DerbySQLProvider(String tableName) {
|
||||
super(tableName);
|
||||
super(tableName.toUpperCase());
|
||||
|
||||
createFileTableSQL = "CREATE TABLE " + tableName +
|
||||
"(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.activemq.artemis.jdbc.store.file;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
|
||||
|
@ -45,4 +46,15 @@ class JDBCFileUtils {
|
|||
}
|
||||
return dbDriver;
|
||||
}
|
||||
|
||||
static JDBCSequentialFileFactoryDriver getDBFileDriver(Connection connection, SQLProvider provider) throws SQLException {
|
||||
JDBCSequentialFileFactoryDriver dbDriver;
|
||||
if (provider instanceof PostgresSQLProvider) {
|
||||
dbDriver = new PostgresSequentialSequentialFileDriver();
|
||||
dbDriver.setConnection(connection);
|
||||
} else {
|
||||
dbDriver = new JDBCSequentialFileFactoryDriver(connection, provider);
|
||||
}
|
||||
return dbDriver;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -88,7 +88,12 @@ public class JDBCSequentialFile implements SequentialFile {
|
|||
|
||||
@Override
|
||||
public boolean exists() {
|
||||
return isCreated;
|
||||
if (isCreated) return true;
|
||||
try {
|
||||
return fileFactory.listFiles(extension).contains(filename);
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.jdbc.store.file;
|
|||
import javax.sql.DataSource;
|
||||
import java.io.File;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
|
@ -60,6 +61,17 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
|||
dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
|
||||
}
|
||||
|
||||
public JDBCSequentialFileFactory(final Connection connection,
|
||||
final SQLProvider sqlProvider,
|
||||
final Executor executor) throws Exception {
|
||||
this.executor = executor;
|
||||
this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider);
|
||||
}
|
||||
|
||||
public JDBCSequentialFileFactoryDriver getDbDriver() {
|
||||
return dbDriver;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SequentialFileFactory setDatasync(boolean enabled) {
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.jdbc.store.file;
|
|||
import javax.sql.DataSource;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.sql.Blob;
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
|
@ -29,6 +30,7 @@ import java.util.List;
|
|||
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||
|
||||
@SuppressWarnings("SynchronizeOnNonFinalField")
|
||||
public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
||||
|
||||
protected PreparedStatement deleteFile;
|
||||
|
@ -55,6 +57,10 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
super(dataSource, provider);
|
||||
}
|
||||
|
||||
JDBCSequentialFileFactoryDriver(Connection connection, SQLProvider sqlProvider) {
|
||||
super(connection, sqlProvider);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void createSchema() throws SQLException {
|
||||
createTable(sqlProvider.getCreateFileTableSQL());
|
||||
|
@ -72,7 +78,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
|
||||
}
|
||||
|
||||
public synchronized List<String> listFiles(String extension) throws Exception {
|
||||
public List<String> listFiles(String extension) throws Exception {
|
||||
synchronized (connection) {
|
||||
List<String> fileNames = new ArrayList<>();
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
|
@ -89,6 +96,7 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
}
|
||||
return fileNames;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Opens the supplied file. If the file does not exist in the database it will create a new one.
|
||||
|
@ -113,7 +121,9 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
* @return
|
||||
* @throws SQLException
|
||||
*/
|
||||
public synchronized int fileExists(JDBCSequentialFile file) throws SQLException {
|
||||
public int fileExists(JDBCSequentialFile file) throws SQLException {
|
||||
try {
|
||||
synchronized (connection) {
|
||||
connection.setAutoCommit(false);
|
||||
selectFileByFileName.setString(1, file.getFileName());
|
||||
try (ResultSet rs = selectFileByFileName.executeQuery()) {
|
||||
|
@ -125,6 +135,11 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
} catch (NullPointerException npe) {
|
||||
npe.printStackTrace();
|
||||
throw npe;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads an existing file.
|
||||
|
@ -132,7 +147,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
* @param file
|
||||
* @throws SQLException
|
||||
*/
|
||||
public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
|
||||
public void loadFile(JDBCSequentialFile file) throws SQLException {
|
||||
synchronized (connection) {
|
||||
connection.setAutoCommit(false);
|
||||
readLargeObject.setInt(1, file.getId());
|
||||
|
||||
|
@ -146,6 +162,7 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new database row representing the supplied file.
|
||||
|
@ -153,7 +170,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
* @param file
|
||||
* @throws SQLException
|
||||
*/
|
||||
public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
|
||||
public void createFile(JDBCSequentialFile file) throws SQLException {
|
||||
synchronized (connection) {
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
createFile.setString(1, file.getFileName());
|
||||
|
@ -170,6 +188,7 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the fileName field to the new value.
|
||||
|
@ -178,7 +197,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
* @param newFileName
|
||||
* @throws SQLException
|
||||
*/
|
||||
public synchronized void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException {
|
||||
public void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException {
|
||||
synchronized (connection) {
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
renameFile.setString(1, newFileName);
|
||||
|
@ -190,6 +210,7 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the associated row in the database.
|
||||
|
@ -197,7 +218,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
* @param file
|
||||
* @throws SQLException
|
||||
*/
|
||||
public synchronized void deleteFile(JDBCSequentialFile file) throws SQLException {
|
||||
public void deleteFile(JDBCSequentialFile file) throws SQLException {
|
||||
synchronized (connection) {
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
deleteFile.setInt(1, file.getId());
|
||||
|
@ -208,6 +230,7 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Persists data to this files associated database mapping.
|
||||
|
@ -217,7 +240,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
* @return
|
||||
* @throws SQLException
|
||||
*/
|
||||
public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
|
||||
public int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
|
||||
synchronized (connection) {
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
appendToLargeObject.setBytes(1, data);
|
||||
|
@ -230,6 +254,7 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads data from the file (at file.readPosition) into the byteBuffer.
|
||||
|
@ -239,7 +264,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
* @return
|
||||
* @throws SQLException
|
||||
*/
|
||||
public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
|
||||
public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
|
||||
synchronized (connection) {
|
||||
connection.setAutoCommit(false);
|
||||
readLargeObject.setInt(1, file.getId());
|
||||
int readLength = 0;
|
||||
|
@ -257,6 +283,7 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy the data content of FileFrom to FileTo
|
||||
|
@ -265,7 +292,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
* @param fileTo
|
||||
* @throws SQLException
|
||||
*/
|
||||
public synchronized void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException {
|
||||
public void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException {
|
||||
synchronized (connection) {
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
copyFileRecord.setInt(1, fileFrom.getId());
|
||||
|
@ -277,12 +305,14 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Drop all tables and data
|
||||
*/
|
||||
@Override
|
||||
public synchronized void destroy() throws SQLException {
|
||||
public void destroy() throws SQLException {
|
||||
synchronized (connection) {
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
try (Statement statement = connection.createStatement()) {
|
||||
|
@ -294,6 +324,7 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public long calculateReadLength(long objectLength, int bufferSpace, long readPosition) {
|
||||
long bytesRemaining = objectLength - readPosition;
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.postgresql.PGConnection;
|
|||
import org.postgresql.largeobject.LargeObject;
|
||||
import org.postgresql.largeobject.LargeObjectManager;
|
||||
|
||||
@SuppressWarnings("SynchronizeOnNonFinalField")
|
||||
public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver {
|
||||
|
||||
private static final String POSTGRES_OID_KEY = "POSTGRES_OID_KEY";
|
||||
|
@ -33,7 +34,8 @@ public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFa
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
|
||||
public void createFile(JDBCSequentialFile file) throws SQLException {
|
||||
synchronized (connection) {
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
|
||||
|
@ -55,9 +57,11 @@ public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFa
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
|
||||
public void loadFile(JDBCSequentialFile file) throws SQLException {
|
||||
synchronized (connection) {
|
||||
connection.setAutoCommit(false);
|
||||
readLargeObject.setInt(1, file.getId());
|
||||
|
||||
|
@ -71,9 +75,11 @@ public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFa
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
|
||||
public int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
|
||||
synchronized (connection) {
|
||||
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
|
||||
LargeObject largeObject = null;
|
||||
|
||||
|
@ -91,12 +97,14 @@ public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFa
|
|||
}
|
||||
return data.length;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
|
||||
public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
|
||||
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
|
||||
LargeObject largeObject = null;
|
||||
long oid = getOID(file);
|
||||
synchronized (connection) {
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
largeObject = lobjManager.open(oid, LargeObjectManager.READ);
|
||||
|
@ -118,10 +126,12 @@ public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFa
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized Long getOID(JDBCSequentialFile file) throws SQLException {
|
||||
private Long getOID(JDBCSequentialFile file) throws SQLException {
|
||||
Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY);
|
||||
if (oid == null) {
|
||||
synchronized (connection) {
|
||||
connection.setAutoCommit(false);
|
||||
readLargeObject.setInt(1, file.getId());
|
||||
try (ResultSet rs = readLargeObject.executeQuery()) {
|
||||
|
@ -134,18 +144,20 @@ public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFa
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
if ((Long) file.getMetaData(POSTGRES_OID_KEY) == 0) {
|
||||
System.out.println("FD");
|
||||
}
|
||||
return (Long) file.getMetaData(POSTGRES_OID_KEY);
|
||||
}
|
||||
|
||||
private synchronized int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLException {
|
||||
private int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLException {
|
||||
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
|
||||
|
||||
int size = 0;
|
||||
Long oid = getOID(file);
|
||||
if (oid != null) {
|
||||
synchronized (connection) {
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.READ);
|
||||
|
@ -157,6 +169,7 @@ public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFa
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
return size;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
|
|||
|
||||
private String largeMessagesTableName = ActiveMQDefaultConfiguration.getDefaultLargeMessagesTableName();
|
||||
|
||||
private String pageStoreTableName = ActiveMQDefaultConfiguration.getDefaultPageStoreTableName();
|
||||
|
||||
private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl();
|
||||
|
||||
private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName();
|
||||
|
@ -67,6 +69,14 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
|
|||
this.largeMessagesTableName = largeMessagesTableName;
|
||||
}
|
||||
|
||||
public String getPageStoreTableName() {
|
||||
return pageStoreTableName;
|
||||
}
|
||||
|
||||
public void setPageStoreTableName(String pageStoreTableName) {
|
||||
this.pageStoreTableName = pageStoreTableName;
|
||||
}
|
||||
|
||||
public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
|
||||
this.jdbcConnectionUrl = jdbcConnectionUrl;
|
||||
}
|
||||
|
|
|
@ -1286,6 +1286,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
conf.setBindingsTableName(getString(storeNode, "bindings-table-name", conf.getBindingsTableName(), Validators.NO_CHECK));
|
||||
conf.setMessageTableName(getString(storeNode, "message-table-name", conf.getMessageTableName(), Validators.NO_CHECK));
|
||||
conf.setLargeMessageTableName(getString(storeNode, "large-message-table-name", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
|
||||
conf.setPageStoreTableName(getString(storeNode, "page-store-table-name", conf.getPageStoreTableName(), Validators.NO_CHECK));
|
||||
conf.setJdbcConnectionUrl(getString(storeNode, "jdbc-connection-url", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
|
||||
conf.setJdbcDriverClassName(getString(storeNode, "jdbc-driver-class-name", conf.getJdbcDriverClassName(), Validators.NO_CHECK));
|
||||
return conf;
|
||||
|
|
|
@ -0,0 +1,213 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.paging.impl;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
|
||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.paging.PagingManager;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
|
||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
|
||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
|
||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
|
||||
/**
|
||||
* Integration point between Paging and JDBC
|
||||
*/
|
||||
public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
||||
|
||||
// Constants -----------------------------------------------------
|
||||
|
||||
private static final String ADDRESS_FILE = "address.txt";
|
||||
|
||||
private static final String DIRECTORY_NAME = "directory.txt";
|
||||
|
||||
// Attributes ----------------------------------------------------
|
||||
|
||||
protected final boolean syncNonTransactional;
|
||||
|
||||
private PagingManager pagingManager;
|
||||
|
||||
private final ScheduledExecutorService scheduledExecutor;
|
||||
|
||||
private final long syncTimeout;
|
||||
|
||||
protected final StorageManager storageManager;
|
||||
|
||||
private JDBCSequentialFileFactoryDriver dbDriver;
|
||||
|
||||
private DatabaseStorageConfiguration dbConf;
|
||||
|
||||
private ExecutorFactory executorFactory;
|
||||
|
||||
private JDBCSequentialFileFactory pagingFactoryFileFactory;
|
||||
|
||||
private JDBCSequentialFile directoryList;
|
||||
|
||||
public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
|
||||
final StorageManager storageManager,
|
||||
final long syncTimeout,
|
||||
final ScheduledExecutorService scheduledExecutor,
|
||||
final ExecutorFactory executorFactory,
|
||||
final boolean syncNonTransactional,
|
||||
final IOCriticalErrorListener critialErrorListener) throws Exception {
|
||||
this.storageManager = storageManager;
|
||||
this.executorFactory = executorFactory;
|
||||
this.syncNonTransactional = syncNonTransactional;
|
||||
this.scheduledExecutor = scheduledExecutor;
|
||||
this.syncTimeout = syncTimeout;
|
||||
this.dbConf = dbConf;
|
||||
|
||||
if (dbConf.getDataSource() != null) {
|
||||
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
|
||||
if (sqlProviderFactory == null) {
|
||||
sqlProviderFactory = new GenericSQLProvider.Factory();
|
||||
}
|
||||
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getPageStoreTableName()), executorFactory.getExecutor());
|
||||
} else {
|
||||
String driverClassName = dbConf.getJdbcDriverClassName();
|
||||
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName()), executorFactory.getExecutor());
|
||||
}
|
||||
pagingFactoryFileFactory.start();
|
||||
directoryList = (JDBCSequentialFile) pagingFactoryFileFactory.createSequentialFile(DIRECTORY_NAME);
|
||||
directoryList.open();
|
||||
}
|
||||
|
||||
// Public --------------------------------------------------------
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
pagingFactoryFileFactory.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void injectMonitor(FileStoreMonitor monitor) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageCursorProvider newCursorProvider(PagingStore store,
|
||||
StorageManager storageManager,
|
||||
AddressSettings addressSettings,
|
||||
Executor executor) {
|
||||
return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) {
|
||||
|
||||
return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor(), syncNonTransactional);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized SequentialFileFactory newFileFactory(final SimpleString address) throws Exception {
|
||||
String guid = UUIDGenerator.getInstance().generateStringUUID();
|
||||
SequentialFileFactory factory = newFileFactory(guid, true);
|
||||
factory.start();
|
||||
|
||||
SequentialFile file = factory.createSequentialFile(PagingStoreFactoryDatabase.ADDRESS_FILE);
|
||||
file.open();
|
||||
|
||||
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(SimpleString.sizeofNullableString(address));
|
||||
buffer.writeSimpleString(address);
|
||||
file.write(buffer, true);
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPagingManager(final PagingManager pagingManager) {
|
||||
this.pagingManager = pagingManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized List<PagingStore> reloadStores(final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception {
|
||||
// We assume the directory list < Integer.MAX_VALUE (this is only a list of addresses).
|
||||
int size = ((Long) directoryList.size()).intValue();
|
||||
ActiveMQBuffer buffer = readActiveMQBuffer(directoryList, size);
|
||||
|
||||
ArrayList<PagingStore> storesReturn = new ArrayList<>();
|
||||
|
||||
while (buffer.readableBytes() > 0) {
|
||||
SimpleString guid = buffer.readSimpleString();
|
||||
|
||||
JDBCSequentialFileFactory factory = (JDBCSequentialFileFactory) newFileFactory(guid.toString(), false);
|
||||
factory.start();
|
||||
|
||||
JDBCSequentialFile addressFile = (JDBCSequentialFile) factory.createSequentialFile(ADDRESS_FILE);
|
||||
addressFile.open();
|
||||
|
||||
size = ((Long) addressFile.size()).intValue();
|
||||
if (size == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ActiveMQBuffer addrBuffer = readActiveMQBuffer(addressFile, size);
|
||||
SimpleString address = addrBuffer.readSimpleString();
|
||||
|
||||
AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
|
||||
|
||||
PagingStore store = new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, factory, this, address, settings, executorFactory.getExecutor(), syncNonTransactional);
|
||||
|
||||
storesReturn.add(store);
|
||||
}
|
||||
return storesReturn;
|
||||
}
|
||||
|
||||
private synchronized SequentialFileFactory newFileFactory(final String directoryName, boolean writeToDirectory) throws Exception {
|
||||
SimpleString simpleString = SimpleString.toSimpleString(directoryName);
|
||||
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(simpleString.sizeof());
|
||||
buffer.writeSimpleString(simpleString);
|
||||
if (writeToDirectory) directoryList.write(buffer, true);
|
||||
return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName)), executorFactory.getExecutor());
|
||||
}
|
||||
|
||||
private String getTableNameForGUID(String guid) {
|
||||
return dbConf.getPageStoreTableName() + guid.replace("-", "");
|
||||
}
|
||||
|
||||
private ActiveMQBuffer readActiveMQBuffer(SequentialFile file, int size) throws Exception {
|
||||
ByteBuffer byteBuffer = ByteBuffer.allocate(size);
|
||||
byteBuffer.mark();
|
||||
file.read(byteBuffer);
|
||||
byteBuffer.reset();
|
||||
|
||||
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(byteBuffer);
|
||||
buffer.writerIndex(size);
|
||||
return buffer;
|
||||
}
|
||||
}
|
|
@ -70,7 +70,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
|
|||
|
||||
private final long syncTimeout;
|
||||
|
||||
private final StorageManager storageManager;
|
||||
protected final StorageManager storageManager;
|
||||
|
||||
private final IOCriticalErrorListener critialErrorListener;
|
||||
|
||||
|
@ -187,6 +187,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
|
|||
}
|
||||
|
||||
private SequentialFileFactory newFileFactory(final String directoryName) {
|
||||
|
||||
return new NIOSequentialFileFactory(new File(directory, directoryName), false, critialErrorListener, 1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.artemis.core.persistence.impl.journal;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.sql.Connection;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -34,6 +35,8 @@ import org.apache.activemq.artemis.utils.ExecutorFactory;
|
|||
|
||||
public class JDBCJournalStorageManager extends JournalStorageManager {
|
||||
|
||||
private Connection connection;
|
||||
|
||||
public JDBCJournalStorageManager(Configuration config,
|
||||
ExecutorFactory executorFactory,
|
||||
ExecutorFactory ioExecutorFactory,
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
|
|||
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
|
||||
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
|
||||
import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
|
||||
|
@ -70,8 +71,10 @@ import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
|||
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
|
||||
import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
|
||||
import org.apache.activemq.artemis.core.paging.PagingManager;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
|
||||
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryDatabase;
|
||||
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
|
||||
import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
|
||||
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
|
||||
|
@ -1899,11 +1902,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
this.queueFactory = factory;
|
||||
}
|
||||
|
||||
protected PagingManager createPagingManager() {
|
||||
protected PagingManager createPagingManager() throws Exception {
|
||||
return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize());
|
||||
}
|
||||
|
||||
protected PagingStoreFactoryNIO getPagingStoreFactory() {
|
||||
protected PagingStoreFactory getPagingStoreFactory() throws Exception {
|
||||
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
|
||||
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
|
||||
return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, false, shutdownOnCriticalIO);
|
||||
}
|
||||
return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO);
|
||||
}
|
||||
|
||||
|
|
|
@ -1719,6 +1719,13 @@
|
|||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
<xsd:element name="page-store-table-name" type="xsd:string" minOccurs="1" maxOccurs="1">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
The table name used to large message files
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
</xsd:all>
|
||||
</xsd:complexType>
|
||||
|
||||
|
|
|
@ -455,12 +455,13 @@ public abstract class ActiveMQTestBase extends Assert {
|
|||
return configuration;
|
||||
}
|
||||
|
||||
private void setDBStoreType(Configuration configuration) {
|
||||
protected void setDBStoreType(Configuration configuration) {
|
||||
DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration();
|
||||
dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl());
|
||||
dbStorageConfiguration.setBindingsTableName("BINDINGS");
|
||||
dbStorageConfiguration.setMessageTableName("MESSAGE");
|
||||
dbStorageConfiguration.setLargeMessageTableName("LARGE_MESSAGE");
|
||||
dbStorageConfiguration.setPageStoreTableName("PAGE_STORE");
|
||||
dbStorageConfiguration.setJdbcDriverClassName(getJDBCClassName());
|
||||
|
||||
configuration.setStoreConfiguration(dbStorageConfiguration);
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
<bindings-table-name>BINDINGS_TABLE</bindings-table-name>
|
||||
<message-table-name>MESSAGE_TABLE</message-table-name>
|
||||
<large-message-table-name>LARGE_MESSAGE_TABLE</large-message-table-name>
|
||||
<page-store-table-name>PAGE_STORE_TABLE</page-store-table-name>
|
||||
<jdbc-driver-class-name>org.apache.derby.jdbc.EmbeddedDriver</jdbc-driver-class-name>
|
||||
</database-store>
|
||||
</store>
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
|
@ -36,9 +37,16 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class GlobalPagingTest extends PagingTest {
|
||||
|
||||
public GlobalPagingTest(StoreConfiguration.StoreType storeType) {
|
||||
super(storeType);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
@ -69,6 +77,8 @@ public class GlobalPagingTest extends PagingTest {
|
|||
|
||||
@Test
|
||||
public void testPagingOverFullDisk() throws Exception {
|
||||
if (storeType == StoreConfiguration.StoreType.DATABASE) return;
|
||||
|
||||
clearDataRecreateServerDirs();
|
||||
|
||||
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
|
||||
|
|
|
@ -24,6 +24,8 @@ import java.io.OutputStream;
|
|||
import java.lang.management.ManagementFactory;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
|
@ -51,6 +53,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
|||
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||
|
@ -86,7 +89,10 @@ import org.junit.After;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class PagingTest extends ActiveMQTestBase {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(PagingTest.class);
|
||||
|
@ -104,8 +110,19 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
|
||||
protected static final int PAGE_SIZE = 10 * 1024;
|
||||
|
||||
protected final StoreConfiguration.StoreType storeType;
|
||||
|
||||
static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
|
||||
|
||||
public PagingTest(StoreConfiguration.StoreType storeType) {
|
||||
this.storeType = storeType;
|
||||
}
|
||||
|
||||
@Parameterized.Parameters(name = "storeType={0}")
|
||||
public static Collection<Object[]> data() {
|
||||
Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}};
|
||||
return Arrays.asList(params);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void checkLoggerStart() throws Exception {
|
||||
|
@ -123,8 +140,6 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
@ -1446,6 +1461,8 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
|
||||
@Test
|
||||
public void testMissingTXEverythingAcked() throws Exception {
|
||||
if (storeType == StoreConfiguration.StoreType.DATABASE) return;
|
||||
|
||||
clearDataRecreateServerDirs();
|
||||
|
||||
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
|
||||
|
@ -5633,7 +5650,11 @@ public class PagingTest extends ActiveMQTestBase {
|
|||
|
||||
@Override
|
||||
protected Configuration createDefaultInVMConfig() throws Exception {
|
||||
return super.createDefaultInVMConfig().setJournalSyncNonTransactional(false);
|
||||
Configuration configuration = super.createDefaultInVMConfig().setJournalSyncNonTransactional(false);
|
||||
if (storeType == StoreConfiguration.StoreType.DATABASE) {
|
||||
setDBStoreType(configuration);
|
||||
}
|
||||
return configuration;
|
||||
}
|
||||
|
||||
private static final class DummyOperationContext implements OperationContext {
|
||||
|
|
Loading…
Reference in New Issue