ARTEMIS-560 Add Support For JDBC Paging

(cherry picked from commit 118c272c77)
This commit is contained in:
Martyn Taylor 2017-01-12 11:16:48 +00:00 committed by Clebert Suconic
parent 0db6345058
commit 115ccf874d
19 changed files with 623 additions and 253 deletions

View File

@ -426,6 +426,9 @@ public final class ActiveMQDefaultConfiguration {
// Default large messages table name, used with Database storage type // Default large messages table name, used with Database storage type
private static final String DEFAULT_LARGE_MESSAGES_TABLE_NAME = "LARGE_MESSAGES"; private static final String DEFAULT_LARGE_MESSAGES_TABLE_NAME = "LARGE_MESSAGES";
// Default large messages table name, used with Database storage type
private static final String DEFAULT_PAGE_STORE_TABLE_NAME = "PAGE_STORE";
// Default period to wait between connection TTL checks // Default period to wait between connection TTL checks
public static final long DEFAULT_CONNECTION_TTL_CHECK_INTERVAL = 2000; public static final long DEFAULT_CONNECTION_TTL_CHECK_INTERVAL = 2000;
@ -1165,6 +1168,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_LARGE_MESSAGES_TABLE_NAME; return DEFAULT_LARGE_MESSAGES_TABLE_NAME;
} }
public static String getDefaultPageStoreTableName() {
return DEFAULT_PAGE_STORE_TABLE_NAME;
}
public static long getDefaultConnectionTtlCheckInterval() { public static long getDefaultConnectionTtlCheckInterval() {
return DEFAULT_CONNECTION_TTL_CHECK_INTERVAL; return DEFAULT_CONNECTION_TTL_CHECK_INTERVAL;
} }

View File

@ -36,6 +36,7 @@ import org.jboss.logging.Logger;
/** /**
* Class to hold common database functionality such as drivers and connections * Class to hold common database functionality such as drivers and connections
*/ */
@SuppressWarnings("SynchronizeOnNonFinalField")
public abstract class AbstractJDBCDriver { public abstract class AbstractJDBCDriver {
private static final Logger logger = Logger.getLogger(AbstractJDBCDriver.class); private static final Logger logger = Logger.getLogger(AbstractJDBCDriver.class);
@ -66,17 +67,26 @@ public abstract class AbstractJDBCDriver {
public void start() throws SQLException { public void start() throws SQLException {
connect(); connect();
createSchema(); synchronized (connection) {
prepareStatements(); createSchema();
prepareStatements();
}
}
public AbstractJDBCDriver(Connection connection, SQLProvider sqlProvider) {
this.connection = connection;
this.sqlProvider = sqlProvider;
} }
public void stop() throws SQLException { public void stop() throws SQLException {
if (sqlProvider.closeConnectionOnShutdown()) { synchronized (connection) {
try { if (sqlProvider.closeConnectionOnShutdown()) {
connection.close(); try {
} catch (SQLException e) { connection.close();
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e)); } catch (SQLException e) {
throw e; logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
throw e;
}
} }
} }
} }
@ -90,30 +100,32 @@ public abstract class AbstractJDBCDriver {
} }
private void connect() throws SQLException { private void connect() throws SQLException {
if (dataSource != null) { if (connection == null) {
try { if (dataSource != null) {
connection = dataSource.getConnection(); try {
} catch (SQLException e) { connection = dataSource.getConnection();
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e)); } catch (SQLException e) {
throw e; logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
} throw e;
} else {
try {
if (jdbcDriverClass == null || jdbcDriverClass.isEmpty()) {
throw new IllegalStateException("jdbcDriverClass is null or empty!");
} }
if (jdbcConnectionUrl == null || jdbcConnectionUrl.isEmpty()) { } else {
throw new IllegalStateException("jdbcConnectionUrl is null or empty!"); try {
if (jdbcDriverClass == null || jdbcDriverClass.isEmpty()) {
throw new IllegalStateException("jdbcDriverClass is null or empty!");
}
if (jdbcConnectionUrl == null || jdbcConnectionUrl.isEmpty()) {
throw new IllegalStateException("jdbcConnectionUrl is null or empty!");
}
final Driver dbDriver = getDriver(jdbcDriverClass);
connection = dbDriver.connect(jdbcConnectionUrl, new Properties());
if (connection == null) {
throw new IllegalStateException("the driver: " + jdbcDriverClass + " isn't able to connect to the requested url: " + jdbcConnectionUrl);
}
} catch (SQLException e) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl);
throw e;
} }
final Driver dbDriver = getDriver(jdbcDriverClass);
connection = dbDriver.connect(jdbcConnectionUrl, new Properties());
if (connection == null) {
throw new IllegalStateException("the driver: " + jdbcDriverClass + " isn't able to connect to the requested url: " + jdbcConnectionUrl);
}
} catch (SQLException e) {
logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e));
ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl);
throw e;
} }
} }
} }
@ -206,8 +218,10 @@ public abstract class AbstractJDBCDriver {
return connection; return connection;
} }
public void setConnection(Connection connection) { public final void setConnection(Connection connection) {
this.connection = connection; if (connection == null) {
this.connection = connection;
}
} }
public void setSqlProvider(SQLProvider sqlProvider) { public void setSqlProvider(SQLProvider sqlProvider) {
@ -225,4 +239,5 @@ public abstract class AbstractJDBCDriver {
public void setDataSource(DataSource dataSource) { public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource; this.dataSource = dataSource;
} }
} }

View File

@ -29,7 +29,7 @@ public class DerbySQLProvider extends GenericSQLProvider {
private final String appendToFileSQL; private final String appendToFileSQL;
private DerbySQLProvider(String tableName) { private DerbySQLProvider(String tableName) {
super(tableName); super(tableName.toUpperCase());
createFileTableSQL = "CREATE TABLE " + tableName + createFileTableSQL = "CREATE TABLE " + tableName +
"(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," + "(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +

View File

@ -18,6 +18,7 @@
package org.apache.activemq.artemis.jdbc.store.file; package org.apache.activemq.artemis.jdbc.store.file;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider; import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
@ -45,4 +46,15 @@ class JDBCFileUtils {
} }
return dbDriver; return dbDriver;
} }
static JDBCSequentialFileFactoryDriver getDBFileDriver(Connection connection, SQLProvider provider) throws SQLException {
JDBCSequentialFileFactoryDriver dbDriver;
if (provider instanceof PostgresSQLProvider) {
dbDriver = new PostgresSequentialSequentialFileDriver();
dbDriver.setConnection(connection);
} else {
dbDriver = new JDBCSequentialFileFactoryDriver(connection, provider);
}
return dbDriver;
}
} }

View File

@ -88,7 +88,12 @@ public class JDBCSequentialFile implements SequentialFile {
@Override @Override
public boolean exists() { public boolean exists() {
return isCreated; if (isCreated) return true;
try {
return fileFactory.listFiles(extension).contains(filename);
} catch (Exception e) {
return false;
}
} }
@Override @Override

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.jdbc.store.file;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.io.File; import java.io.File;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -60,6 +61,17 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider); dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
} }
public JDBCSequentialFileFactory(final Connection connection,
final SQLProvider sqlProvider,
final Executor executor) throws Exception {
this.executor = executor;
this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider);
}
public JDBCSequentialFileFactoryDriver getDbDriver() {
return dbDriver;
}
@Override @Override
public SequentialFileFactory setDatasync(boolean enabled) { public SequentialFileFactory setDatasync(boolean enabled) {

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.jdbc.store.file;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.sql.Blob; import java.sql.Blob;
import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
@ -29,6 +30,7 @@ import java.util.List;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
@SuppressWarnings("SynchronizeOnNonFinalField")
public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
protected PreparedStatement deleteFile; protected PreparedStatement deleteFile;
@ -55,6 +57,10 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
super(dataSource, provider); super(dataSource, provider);
} }
JDBCSequentialFileFactoryDriver(Connection connection, SQLProvider sqlProvider) {
super(connection, sqlProvider);
}
@Override @Override
protected void createSchema() throws SQLException { protected void createSchema() throws SQLException {
createTable(sqlProvider.getCreateFileTableSQL()); createTable(sqlProvider.getCreateFileTableSQL());
@ -72,22 +78,24 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL()); this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
} }
public synchronized List<String> listFiles(String extension) throws Exception { public List<String> listFiles(String extension) throws Exception {
List<String> fileNames = new ArrayList<>(); synchronized (connection) {
try { List<String> fileNames = new ArrayList<>();
connection.setAutoCommit(false); try {
selectFileNamesByExtension.setString(1, extension); connection.setAutoCommit(false);
try (ResultSet rs = selectFileNamesByExtension.executeQuery()) { selectFileNamesByExtension.setString(1, extension);
while (rs.next()) { try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
fileNames.add(rs.getString(1)); while (rs.next()) {
fileNames.add(rs.getString(1));
}
} }
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
} }
connection.commit(); return fileNames;
} catch (SQLException e) {
connection.rollback();
throw e;
} }
return fileNames;
} }
/** /**
@ -113,16 +121,23 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @return * @return
* @throws SQLException * @throws SQLException
*/ */
public synchronized int fileExists(JDBCSequentialFile file) throws SQLException { public int fileExists(JDBCSequentialFile file) throws SQLException {
connection.setAutoCommit(false); try {
selectFileByFileName.setString(1, file.getFileName()); synchronized (connection) {
try (ResultSet rs = selectFileByFileName.executeQuery()) { connection.setAutoCommit(false);
int id = rs.next() ? rs.getInt(1) : -1; selectFileByFileName.setString(1, file.getFileName());
connection.commit(); try (ResultSet rs = selectFileByFileName.executeQuery()) {
return id; int id = rs.next() ? rs.getInt(1) : -1;
} catch (Exception e) { connection.commit();
connection.rollback(); return id;
throw e; } catch (Exception e) {
connection.rollback();
throw e;
}
}
} catch (NullPointerException npe) {
npe.printStackTrace();
throw npe;
} }
} }
@ -132,18 +147,20 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @param file * @param file
* @throws SQLException * @throws SQLException
*/ */
public synchronized void loadFile(JDBCSequentialFile file) throws SQLException { public void loadFile(JDBCSequentialFile file) throws SQLException {
connection.setAutoCommit(false); synchronized (connection) {
readLargeObject.setInt(1, file.getId()); connection.setAutoCommit(false);
readLargeObject.setInt(1, file.getId());
try (ResultSet rs = readLargeObject.executeQuery()) { try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) { if (rs.next()) {
file.setWritePosition((int) rs.getBlob(1).length()); file.setWritePosition((int) rs.getBlob(1).length());
}
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
} }
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
} }
} }
@ -153,21 +170,23 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @param file * @param file
* @throws SQLException * @throws SQLException
*/ */
public synchronized void createFile(JDBCSequentialFile file) throws SQLException { public void createFile(JDBCSequentialFile file) throws SQLException {
try { synchronized (connection) {
connection.setAutoCommit(false); try {
createFile.setString(1, file.getFileName()); connection.setAutoCommit(false);
createFile.setString(2, file.getExtension()); createFile.setString(1, file.getFileName());
createFile.setBytes(3, new byte[0]); createFile.setString(2, file.getExtension());
createFile.executeUpdate(); createFile.setBytes(3, new byte[0]);
try (ResultSet keys = createFile.getGeneratedKeys()) { createFile.executeUpdate();
keys.next(); try (ResultSet keys = createFile.getGeneratedKeys()) {
file.setId(keys.getInt(1)); keys.next();
file.setId(keys.getInt(1));
}
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
} }
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
} }
} }
@ -178,16 +197,18 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @param newFileName * @param newFileName
* @throws SQLException * @throws SQLException
*/ */
public synchronized void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException { public void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException {
try { synchronized (connection) {
connection.setAutoCommit(false); try {
renameFile.setString(1, newFileName); connection.setAutoCommit(false);
renameFile.setInt(2, file.getId()); renameFile.setString(1, newFileName);
renameFile.executeUpdate(); renameFile.setInt(2, file.getId());
connection.commit(); renameFile.executeUpdate();
} catch (SQLException e) { connection.commit();
connection.rollback(); } catch (SQLException e) {
throw e; connection.rollback();
throw e;
}
} }
} }
@ -197,15 +218,17 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @param file * @param file
* @throws SQLException * @throws SQLException
*/ */
public synchronized void deleteFile(JDBCSequentialFile file) throws SQLException { public void deleteFile(JDBCSequentialFile file) throws SQLException {
try { synchronized (connection) {
connection.setAutoCommit(false); try {
deleteFile.setInt(1, file.getId()); connection.setAutoCommit(false);
deleteFile.executeUpdate(); deleteFile.setInt(1, file.getId());
connection.commit(); deleteFile.executeUpdate();
} catch (SQLException e) { connection.commit();
connection.rollback(); } catch (SQLException e) {
throw e; connection.rollback();
throw e;
}
} }
} }
@ -217,17 +240,19 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @return * @return
* @throws SQLException * @throws SQLException
*/ */
public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException { public int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
try { synchronized (connection) {
connection.setAutoCommit(false); try {
appendToLargeObject.setBytes(1, data); connection.setAutoCommit(false);
appendToLargeObject.setInt(2, file.getId()); appendToLargeObject.setBytes(1, data);
appendToLargeObject.executeUpdate(); appendToLargeObject.setInt(2, file.getId());
connection.commit(); appendToLargeObject.executeUpdate();
return data.length; connection.commit();
} catch (SQLException e) { return data.length;
connection.rollback(); } catch (SQLException e) {
throw e; connection.rollback();
throw e;
}
} }
} }
@ -239,22 +264,24 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @return * @return
* @throws SQLException * @throws SQLException
*/ */
public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException { public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
connection.setAutoCommit(false); synchronized (connection) {
readLargeObject.setInt(1, file.getId()); connection.setAutoCommit(false);
int readLength = 0; readLargeObject.setInt(1, file.getId());
try (ResultSet rs = readLargeObject.executeQuery()) { int readLength = 0;
if (rs.next()) { try (ResultSet rs = readLargeObject.executeQuery()) {
Blob blob = rs.getBlob(1); if (rs.next()) {
readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position()); Blob blob = rs.getBlob(1);
byte[] data = blob.getBytes(file.position() + 1, readLength); readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position());
bytes.put(data); byte[] data = blob.getBytes(file.position() + 1, readLength);
bytes.put(data);
}
connection.commit();
return readLength;
} catch (Throwable e) {
connection.rollback();
throw e;
} }
connection.commit();
return readLength;
} catch (Throwable e) {
connection.rollback();
throw e;
} }
} }
@ -265,16 +292,18 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* @param fileTo * @param fileTo
* @throws SQLException * @throws SQLException
*/ */
public synchronized void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException { public void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException {
try { synchronized (connection) {
connection.setAutoCommit(false); try {
copyFileRecord.setInt(1, fileFrom.getId()); connection.setAutoCommit(false);
copyFileRecord.setInt(2, fileTo.getId()); copyFileRecord.setInt(1, fileFrom.getId());
copyFileRecord.executeUpdate(); copyFileRecord.setInt(2, fileTo.getId());
connection.commit(); copyFileRecord.executeUpdate();
} catch (SQLException e) { connection.commit();
connection.rollback(); } catch (SQLException e) {
throw e; connection.rollback();
throw e;
}
} }
} }
@ -282,16 +311,18 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
* Drop all tables and data * Drop all tables and data
*/ */
@Override @Override
public synchronized void destroy() throws SQLException { public void destroy() throws SQLException {
try { synchronized (connection) {
connection.setAutoCommit(false); try {
try (Statement statement = connection.createStatement()) { connection.setAutoCommit(false);
statement.executeUpdate(sqlProvider.getDropFileTableSQL()); try (Statement statement = connection.createStatement()) {
statement.executeUpdate(sqlProvider.getDropFileTableSQL());
}
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
} }
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
} }
} }

View File

@ -24,6 +24,7 @@ import org.postgresql.PGConnection;
import org.postgresql.largeobject.LargeObject; import org.postgresql.largeobject.LargeObject;
import org.postgresql.largeobject.LargeObjectManager; import org.postgresql.largeobject.LargeObjectManager;
@SuppressWarnings("SynchronizeOnNonFinalField")
public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver { public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver {
private static final String POSTGRES_OID_KEY = "POSTGRES_OID_KEY"; private static final String POSTGRES_OID_KEY = "POSTGRES_OID_KEY";
@ -33,100 +34,22 @@ public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFa
} }
@Override @Override
public synchronized void createFile(JDBCSequentialFile file) throws SQLException { public void createFile(JDBCSequentialFile file) throws SQLException {
try { synchronized (connection) {
connection.setAutoCommit(false); try {
connection.setAutoCommit(false);
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI(); LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
long oid = lobjManager.createLO(); long oid = lobjManager.createLO();
createFile.setString(1, file.getFileName()); createFile.setString(1, file.getFileName());
createFile.setString(2, file.getExtension()); createFile.setString(2, file.getExtension());
createFile.setLong(3, oid); createFile.setLong(3, oid);
createFile.executeUpdate(); createFile.executeUpdate();
try (ResultSet keys = createFile.getGeneratedKeys()) { try (ResultSet keys = createFile.getGeneratedKeys()) {
keys.next(); keys.next();
file.setId(keys.getInt(1)); file.setId(keys.getInt(1));
}
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
}
}
@Override
public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
connection.setAutoCommit(false);
readLargeObject.setInt(1, file.getId());
try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) {
file.setWritePosition(getPostGresLargeObjectSize(file));
}
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
}
}
@Override
public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
LargeObject largeObject = null;
Long oid = getOID(file);
try {
connection.setAutoCommit(false);
largeObject = lobjManager.open(oid, LargeObjectManager.WRITE);
largeObject.seek(largeObject.size());
largeObject.write(data);
largeObject.close();
connection.commit();
} catch (Exception e) {
connection.rollback();
throw e;
}
return data.length;
}
@Override
public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
LargeObject largeObject = null;
long oid = getOID(file);
try {
connection.setAutoCommit(false);
largeObject = lobjManager.open(oid, LargeObjectManager.READ);
int readLength = (int) calculateReadLength(largeObject.size(), bytes.remaining(), file.position());
if (readLength > 0) {
if (file.position() > 0)
largeObject.seek((int) file.position());
byte[] data = largeObject.read(readLength);
bytes.put(data);
}
largeObject.close();
connection.commit();
return readLength;
} catch (SQLException e) {
connection.rollback();
throw e;
}
}
private synchronized Long getOID(JDBCSequentialFile file) throws SQLException {
Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY);
if (oid == null) {
connection.setAutoCommit(false);
readLargeObject.setInt(1, file.getId());
try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) {
file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1));
} }
connection.commit(); connection.commit();
} catch (SQLException e) { } catch (SQLException e) {
@ -134,27 +57,117 @@ public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFa
throw e; throw e;
} }
} }
}
@Override
public void loadFile(JDBCSequentialFile file) throws SQLException {
synchronized (connection) {
connection.setAutoCommit(false);
readLargeObject.setInt(1, file.getId());
try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) {
file.setWritePosition(getPostGresLargeObjectSize(file));
}
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
}
}
}
@Override
public int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
synchronized (connection) {
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
LargeObject largeObject = null;
Long oid = getOID(file);
try {
connection.setAutoCommit(false);
largeObject = lobjManager.open(oid, LargeObjectManager.WRITE);
largeObject.seek(largeObject.size());
largeObject.write(data);
largeObject.close();
connection.commit();
} catch (Exception e) {
connection.rollback();
throw e;
}
return data.length;
}
}
@Override
public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
LargeObject largeObject = null;
long oid = getOID(file);
synchronized (connection) {
try {
connection.setAutoCommit(false);
largeObject = lobjManager.open(oid, LargeObjectManager.READ);
int readLength = (int) calculateReadLength(largeObject.size(), bytes.remaining(), file.position());
if (readLength > 0) {
if (file.position() > 0)
largeObject.seek((int) file.position());
byte[] data = largeObject.read(readLength);
bytes.put(data);
}
largeObject.close();
connection.commit();
return readLength;
} catch (SQLException e) {
connection.rollback();
throw e;
}
}
}
private Long getOID(JDBCSequentialFile file) throws SQLException {
Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY);
if (oid == null) {
synchronized (connection) {
connection.setAutoCommit(false);
readLargeObject.setInt(1, file.getId());
try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) {
file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1));
}
connection.commit();
} catch (SQLException e) {
connection.rollback();
throw e;
}
}
}
if ((Long) file.getMetaData(POSTGRES_OID_KEY) == 0) { if ((Long) file.getMetaData(POSTGRES_OID_KEY) == 0) {
System.out.println("FD"); System.out.println("FD");
} }
return (Long) file.getMetaData(POSTGRES_OID_KEY); return (Long) file.getMetaData(POSTGRES_OID_KEY);
} }
private synchronized int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLException { private int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLException {
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI(); LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
int size = 0; int size = 0;
Long oid = getOID(file); Long oid = getOID(file);
if (oid != null) { if (oid != null) {
try { synchronized (connection) {
connection.setAutoCommit(false); try {
LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.READ); connection.setAutoCommit(false);
size = largeObject.size(); LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.READ);
largeObject.close(); size = largeObject.size();
connection.commit(); largeObject.close();
} catch (SQLException e) { connection.commit();
connection.rollback(); } catch (SQLException e) {
throw e; connection.rollback();
throw e;
}
} }
} }
return size; return size;

View File

@ -30,6 +30,8 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
private String largeMessagesTableName = ActiveMQDefaultConfiguration.getDefaultLargeMessagesTableName(); private String largeMessagesTableName = ActiveMQDefaultConfiguration.getDefaultLargeMessagesTableName();
private String pageStoreTableName = ActiveMQDefaultConfiguration.getDefaultPageStoreTableName();
private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl(); private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl();
private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName(); private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName();
@ -67,6 +69,14 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
this.largeMessagesTableName = largeMessagesTableName; this.largeMessagesTableName = largeMessagesTableName;
} }
public String getPageStoreTableName() {
return pageStoreTableName;
}
public void setPageStoreTableName(String pageStoreTableName) {
this.pageStoreTableName = pageStoreTableName;
}
public void setJdbcConnectionUrl(String jdbcConnectionUrl) { public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
this.jdbcConnectionUrl = jdbcConnectionUrl; this.jdbcConnectionUrl = jdbcConnectionUrl;
} }

View File

@ -1158,6 +1158,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
conf.setBindingsTableName(getString(storeNode, "bindings-table-name", conf.getBindingsTableName(), Validators.NO_CHECK)); conf.setBindingsTableName(getString(storeNode, "bindings-table-name", conf.getBindingsTableName(), Validators.NO_CHECK));
conf.setMessageTableName(getString(storeNode, "message-table-name", conf.getMessageTableName(), Validators.NO_CHECK)); conf.setMessageTableName(getString(storeNode, "message-table-name", conf.getMessageTableName(), Validators.NO_CHECK));
conf.setLargeMessageTableName(getString(storeNode, "large-message-table-name", conf.getJdbcConnectionUrl(), Validators.NO_CHECK)); conf.setLargeMessageTableName(getString(storeNode, "large-message-table-name", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
conf.setPageStoreTableName(getString(storeNode, "page-store-table-name", conf.getPageStoreTableName(), Validators.NO_CHECK));
conf.setJdbcConnectionUrl(getString(storeNode, "jdbc-connection-url", conf.getJdbcConnectionUrl(), Validators.NO_CHECK)); conf.setJdbcConnectionUrl(getString(storeNode, "jdbc-connection-url", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
conf.setJdbcDriverClassName(getString(storeNode, "jdbc-driver-class-name", conf.getJdbcDriverClassName(), Validators.NO_CHECK)); conf.setJdbcDriverClassName(getString(storeNode, "jdbc-driver-class-name", conf.getJdbcDriverClassName(), Validators.NO_CHECK));
return conf; return conf;

View File

@ -0,0 +1,213 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.paging.impl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.UUIDGenerator;
/**
* Integration point between Paging and JDBC
*/
public class PagingStoreFactoryDatabase implements PagingStoreFactory {
// Constants -----------------------------------------------------
private static final String ADDRESS_FILE = "address.txt";
private static final String DIRECTORY_NAME = "directory.txt";
// Attributes ----------------------------------------------------
protected final boolean syncNonTransactional;
private PagingManager pagingManager;
private final ScheduledExecutorService scheduledExecutor;
private final long syncTimeout;
protected final StorageManager storageManager;
private JDBCSequentialFileFactoryDriver dbDriver;
private DatabaseStorageConfiguration dbConf;
private ExecutorFactory executorFactory;
private JDBCSequentialFileFactory pagingFactoryFileFactory;
private JDBCSequentialFile directoryList;
public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
final StorageManager storageManager,
final long syncTimeout,
final ScheduledExecutorService scheduledExecutor,
final ExecutorFactory executorFactory,
final boolean syncNonTransactional,
final IOCriticalErrorListener critialErrorListener) throws Exception {
this.storageManager = storageManager;
this.executorFactory = executorFactory;
this.syncNonTransactional = syncNonTransactional;
this.scheduledExecutor = scheduledExecutor;
this.syncTimeout = syncTimeout;
this.dbConf = dbConf;
if (dbConf.getDataSource() != null) {
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
if (sqlProviderFactory == null) {
sqlProviderFactory = new GenericSQLProvider.Factory();
}
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getPageStoreTableName()), executorFactory.getExecutor());
} else {
String driverClassName = dbConf.getJdbcDriverClassName();
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName()), executorFactory.getExecutor());
}
pagingFactoryFileFactory.start();
directoryList = (JDBCSequentialFile) pagingFactoryFileFactory.createSequentialFile(DIRECTORY_NAME);
directoryList.open();
}
// Public --------------------------------------------------------
@Override
public void stop() {
pagingFactoryFileFactory.stop();
}
@Override
public void injectMonitor(FileStoreMonitor monitor) throws Exception {
}
@Override
public PageCursorProvider newCursorProvider(PagingStore store,
StorageManager storageManager,
AddressSettings addressSettings,
Executor executor) {
return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
}
@Override
public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) {
return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor(), syncNonTransactional);
}
@Override
public synchronized SequentialFileFactory newFileFactory(final SimpleString address) throws Exception {
String guid = UUIDGenerator.getInstance().generateStringUUID();
SequentialFileFactory factory = newFileFactory(guid, true);
factory.start();
SequentialFile file = factory.createSequentialFile(PagingStoreFactoryDatabase.ADDRESS_FILE);
file.open();
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(SimpleString.sizeofNullableString(address));
buffer.writeSimpleString(address);
file.write(buffer, true);
return factory;
}
@Override
public void setPagingManager(final PagingManager pagingManager) {
this.pagingManager = pagingManager;
}
@Override
public synchronized List<PagingStore> reloadStores(final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception {
// We assume the directory list < Integer.MAX_VALUE (this is only a list of addresses).
int size = ((Long) directoryList.size()).intValue();
ActiveMQBuffer buffer = readActiveMQBuffer(directoryList, size);
ArrayList<PagingStore> storesReturn = new ArrayList<>();
while (buffer.readableBytes() > 0) {
SimpleString guid = buffer.readSimpleString();
JDBCSequentialFileFactory factory = (JDBCSequentialFileFactory) newFileFactory(guid.toString(), false);
factory.start();
JDBCSequentialFile addressFile = (JDBCSequentialFile) factory.createSequentialFile(ADDRESS_FILE);
addressFile.open();
size = ((Long) addressFile.size()).intValue();
if (size == 0) {
continue;
}
ActiveMQBuffer addrBuffer = readActiveMQBuffer(addressFile, size);
SimpleString address = addrBuffer.readSimpleString();
AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
PagingStore store = new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, factory, this, address, settings, executorFactory.getExecutor(), syncNonTransactional);
storesReturn.add(store);
}
return storesReturn;
}
private synchronized SequentialFileFactory newFileFactory(final String directoryName, boolean writeToDirectory) throws Exception {
SimpleString simpleString = SimpleString.toSimpleString(directoryName);
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(simpleString.sizeof());
buffer.writeSimpleString(simpleString);
if (writeToDirectory) directoryList.write(buffer, true);
return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName)), executorFactory.getExecutor());
}
private String getTableNameForGUID(String guid) {
return dbConf.getPageStoreTableName() + guid.replace("-", "");
}
private ActiveMQBuffer readActiveMQBuffer(SequentialFile file, int size) throws Exception {
ByteBuffer byteBuffer = ByteBuffer.allocate(size);
byteBuffer.mark();
file.read(byteBuffer);
byteBuffer.reset();
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(byteBuffer);
buffer.writerIndex(size);
return buffer;
}
}

View File

@ -70,7 +70,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
private final long syncTimeout; private final long syncTimeout;
private final StorageManager storageManager; protected final StorageManager storageManager;
private final IOCriticalErrorListener critialErrorListener; private final IOCriticalErrorListener critialErrorListener;
@ -187,6 +187,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
} }
private SequentialFileFactory newFileFactory(final String directoryName) { private SequentialFileFactory newFileFactory(final String directoryName) {
return new NIOSequentialFileFactory(new File(directory, directoryName), false, critialErrorListener, 1); return new NIOSequentialFileFactory(new File(directory, directoryName), false, critialErrorListener, 1);
} }
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.persistence.impl.journal; package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.sql.Connection;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -34,6 +35,8 @@ import org.apache.activemq.artemis.utils.ExecutorFactory;
public class JDBCJournalStorageManager extends JournalStorageManager { public class JDBCJournalStorageManager extends JournalStorageManager {
private Connection connection;
public JDBCJournalStorageManager(Configuration config, public JDBCJournalStorageManager(Configuration config,
ExecutorFactory executorFactory, ExecutorFactory executorFactory,
ExecutorFactory ioExecutorFactory, ExecutorFactory ioExecutorFactory,

View File

@ -59,6 +59,7 @@ import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser; import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl; import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
@ -68,8 +69,10 @@ import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl; import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl; import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryDatabase;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO; import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.OperationContext;
@ -1836,11 +1839,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
this.queueFactory = factory; this.queueFactory = factory;
} }
protected PagingManager createPagingManager() { protected PagingManager createPagingManager() throws Exception {
return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize()); return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize());
} }
protected PagingStoreFactoryNIO getPagingStoreFactory() { protected PagingStoreFactory getPagingStoreFactory() throws Exception {
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, false, shutdownOnCriticalIO);
}
return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO); return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO);
} }

View File

@ -1696,6 +1696,13 @@
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="page-store-table-name" type="xsd:string" minOccurs="1" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The table name used to large message files
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all> </xsd:all>
</xsd:complexType> </xsd:complexType>

View File

@ -455,12 +455,13 @@ public abstract class ActiveMQTestBase extends Assert {
return configuration; return configuration;
} }
private void setDBStoreType(Configuration configuration) { protected void setDBStoreType(Configuration configuration) {
DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration(); DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration();
dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl()); dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl());
dbStorageConfiguration.setBindingsTableName("BINDINGS"); dbStorageConfiguration.setBindingsTableName("BINDINGS");
dbStorageConfiguration.setMessageTableName("MESSAGE"); dbStorageConfiguration.setMessageTableName("MESSAGE");
dbStorageConfiguration.setLargeMessageTableName("LARGE_MESSAGE"); dbStorageConfiguration.setLargeMessageTableName("LARGE_MESSAGE");
dbStorageConfiguration.setPageStoreTableName("PAGE_STORE");
dbStorageConfiguration.setJdbcDriverClassName(getJDBCClassName()); dbStorageConfiguration.setJdbcDriverClassName(getJDBCClassName());
configuration.setStoreConfiguration(dbStorageConfiguration); configuration.setStoreConfiguration(dbStorageConfiguration);

View File

@ -25,6 +25,7 @@
<bindings-table-name>BINDINGS_TABLE</bindings-table-name> <bindings-table-name>BINDINGS_TABLE</bindings-table-name>
<message-table-name>MESSAGE_TABLE</message-table-name> <message-table-name>MESSAGE_TABLE</message-table-name>
<large-message-table-name>LARGE_MESSAGE_TABLE</large-message-table-name> <large-message-table-name>LARGE_MESSAGE_TABLE</large-message-table-name>
<page-store-table-name>PAGE_STORE_TABLE</page-store-table-name>
<jdbc-driver-class-name>org.apache.derby.jdbc.EmbeddedDriver</jdbc-driver-class-name> <jdbc-driver-class-name>org.apache.derby.jdbc.EmbeddedDriver</jdbc-driver-class-name>
</database-store> </database-store>
</store> </store>

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
@ -36,9 +37,16 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class GlobalPagingTest extends PagingTest { public class GlobalPagingTest extends PagingTest {
public GlobalPagingTest(StoreConfiguration.StoreType storeType) {
super(storeType);
}
@Override @Override
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
@ -69,6 +77,8 @@ public class GlobalPagingTest extends PagingTest {
@Test @Test
public void testPagingOverFullDisk() throws Exception { public void testPagingOverFullDisk() throws Exception {
if (storeType == StoreConfiguration.StoreType.DATABASE) return;
clearDataRecreateServerDirs(); clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);

View File

@ -24,6 +24,8 @@ import java.io.OutputStream;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
@ -51,6 +53,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
@ -84,7 +87,10 @@ import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class PagingTest extends ActiveMQTestBase { public class PagingTest extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(PagingTest.class); private static final Logger logger = Logger.getLogger(PagingTest.class);
@ -102,8 +108,19 @@ public class PagingTest extends ActiveMQTestBase {
protected static final int PAGE_SIZE = 10 * 1024; protected static final int PAGE_SIZE = 10 * 1024;
protected final StoreConfiguration.StoreType storeType;
static final SimpleString ADDRESS = new SimpleString("SimpleAddress"); static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
public PagingTest(StoreConfiguration.StoreType storeType) {
this.storeType = storeType;
}
@Parameterized.Parameters(name = "storeType={0}")
public static Collection<Object[]> data() {
Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}};
return Arrays.asList(params);
}
@Before @Before
public void checkLoggerStart() throws Exception { public void checkLoggerStart() throws Exception {
@ -121,8 +138,6 @@ public class PagingTest extends ActiveMQTestBase {
} }
} }
@Override @Override
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
@ -1444,6 +1459,8 @@ public class PagingTest extends ActiveMQTestBase {
@Test @Test
public void testMissingTXEverythingAcked() throws Exception { public void testMissingTXEverythingAcked() throws Exception {
if (storeType == StoreConfiguration.StoreType.DATABASE) return;
clearDataRecreateServerDirs(); clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
@ -5631,7 +5648,11 @@ public class PagingTest extends ActiveMQTestBase {
@Override @Override
protected Configuration createDefaultInVMConfig() throws Exception { protected Configuration createDefaultInVMConfig() throws Exception {
return super.createDefaultInVMConfig().setJournalSyncNonTransactional(false); Configuration configuration = super.createDefaultInVMConfig().setJournalSyncNonTransactional(false);
if (storeType == StoreConfiguration.StoreType.DATABASE) {
setDBStoreType(configuration);
}
return configuration;
} }
private static final class DummyOperationContext implements OperationContext { private static final class DummyOperationContext implements OperationContext {